Skip to content

Commit

Permalink
Merge branch 'kp/delete-stale-keys' into feature/tiered-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Dec 19, 2023
2 parents dc3fd1e + fce110d commit f375874
Show file tree
Hide file tree
Showing 6 changed files with 801 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.time.Duration;
import java.time.Instant;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
@Override
Expand Down Expand Up @@ -234,6 +236,172 @@ private void setDiskCacheEnabledSetting(Client client, boolean newSetting) {
assertAcked(client.admin().cluster().updateSettings(clusterSettingUpdate).actionGet());
}

public void testDiskTierInvalidationByCleanCacheAPI() throws Exception {
int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// call clear cache api
client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get();
// fetch the stats again
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK);
}

// When entire disk tier is stale, test whether cache cleaner cleans up everything from disk
public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Instant start = Instant.now();
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// index a doc and force refresh so that the cache cleaner can clean the cache
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500;
Thread.sleep(sleepTime);

// by now cache cleaner would have run and cleaned up stale keys
// fetch the stats again
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK);
}

// When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk
public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Instant start = Instant.now();
int heapSizeBytes = 987;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=text").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// force refresh so that it creates stale keys in the cache for the cache cleaner to pick up.
flushAndRefresh("index");
client().prepareIndex("index").setId("1").setSource("k", "good bye");
ensureSearchable("index");

for (int i = 0; i < 6; i++) { // index 6 items with the new readerCacheKeyId
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
}

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500;
Thread.sleep(sleepTime);

// make sure we have 5 entries in disk.
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 5, TierType.DISK);
}

private long getCacheSizeBytes(Client client, String index, TierType tierType) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@

package org.opensearch.common.cache.tier;

import org.ehcache.Cache;
import org.ehcache.CachePersistenceException;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.serialization.SerializerException;
import org.opensearch.OpenSearchException;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -29,23 +45,6 @@
import java.util.Optional;
import java.util.function.Supplier;

import org.ehcache.Cache;
import org.ehcache.CachePersistenceException;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.serialization.SerializerException;

/**
* An ehcache-based disk tier implementation.
* @param <K> The key type of cache entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
)
)
);
Expand Down Expand Up @@ -729,7 +729,9 @@ public void apply(Settings value, Settings current, Settings previous) {
EhCacheDiskCachingTier.REQUEST_CACHE_DISK_MAX_THREADS,
EhCacheDiskCachingTier.REQUEST_CACHE_DISK_WRITE_CONCURRENCY,
EhCacheDiskCachingTier.REQUEST_CACHE_DISK_SEGMENTS,
DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING
DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING,
IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING,
IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING
)
);
}
Loading

0 comments on commit f375874

Please sign in to comment.