diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index 7932a23c11020..aa1b64e4bcae8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -47,12 +47,14 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; +import java.time.Duration; +import java.time.Instant; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; // This is a separate file from IndicesRequestCacheIT because we only want to run our test // on a node with a maximum request cache size that we set. - @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { @Override @@ -234,6 +236,172 @@ private void setDiskCacheEnabledSetting(Client client, boolean newSetting) { assertAcked(client.admin().cluster().updateSettings(clusterSettingUpdate).actionGet()); } + public void testDiskTierInvalidationByCleanCacheAPI() throws Exception { + int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups + int heapSizeBytes = 9876; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query + // as the cache size setting is not dynamic + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); + + // call clear cache api + client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get(); + // fetch the stats again + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK); + } + + // When entire disk tier is stale, test whether cache cleaner cleans up everything from disk + public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception { + int thresholdInMillis = 4_000; + Instant start = Instant.now(); + int heapSizeBytes = 9876; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); + + // index a doc and force refresh so that the cache cleaner can clean the cache + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + + // sleep for the threshold time, so that the cache cleaner can clean the cache + Instant end = Instant.now(); + long elapsedTimeMillis = Duration.between(start, end).toMillis(); + // if this test is flaky, increase the sleep time. + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500; + Thread.sleep(sleepTime); + + // by now cache cleaner would have run and cleaned up stale keys + // fetch the stats again + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK); + } + + // When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk + public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Exception { + int thresholdInMillis = 4_000; + Instant start = Instant.now(); + int heapSizeBytes = 987; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=text").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); + + // force refresh so that it creates stale keys in the cache for the cache cleaner to pick up. + flushAndRefresh("index"); + client().prepareIndex("index").setId("1").setSource("k", "good bye"); + ensureSearchable("index"); + + for (int i = 0; i < 6; i++) { // index 6 items with the new readerCacheKeyId + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + } + + // sleep for the threshold time, so that the cache cleaner can clean the cache + Instant end = Instant.now(); + long elapsedTimeMillis = Duration.between(start, end).toMillis(); + // if this test is flaky, increase the sleep time. + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500; + Thread.sleep(sleepTime); + + // make sure we have 5 entries in disk. + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 5, TierType.DISK); + } + private long getCacheSizeBytes(Client client, String index, TierType tierType) { RequestCacheStats requestCacheStats = client.admin() .indices() diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index afdd7ea42d768..7b1e7ac9c724e 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -8,6 +8,22 @@ package org.opensearch.common.cache.tier; +import org.ehcache.Cache; +import org.ehcache.CachePersistenceException; +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; +import org.ehcache.core.spi.service.FileBasedPersistenceContext; +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; +import org.ehcache.expiry.ExpiryPolicy; +import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; +import org.ehcache.spi.serialization.SerializerException; import org.opensearch.OpenSearchException; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; @@ -29,23 +45,6 @@ import java.util.Optional; import java.util.function.Supplier; -import org.ehcache.Cache; -import org.ehcache.CachePersistenceException; -import org.ehcache.PersistentCacheManager; -import org.ehcache.config.builders.CacheConfigurationBuilder; -import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; -import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.config.units.MemoryUnit; -import org.ehcache.core.spi.service.FileBasedPersistenceContext; -import org.ehcache.event.CacheEvent; -import org.ehcache.event.CacheEventListener; -import org.ehcache.event.EventType; -import org.ehcache.expiry.ExpiryPolicy; -import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; -import org.ehcache.spi.serialization.SerializerException; - /** * An ehcache-based disk tier implementation. * @param The key type of cache entries diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a1d7f2f4154fb..c572c732aafb6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -690,11 +690,11 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, + IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, - CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING + CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT ) ) ); @@ -729,7 +729,9 @@ public void apply(Settings value, Settings current, Settings previous) { EhCacheDiskCachingTier.REQUEST_CACHE_DISK_MAX_THREADS, EhCacheDiskCachingTier.REQUEST_CACHE_DISK_WRITE_CONCURRENCY, EhCacheDiskCachingTier.REQUEST_CACHE_DISK_SEGMENTS, - DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING + DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING, + IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING, + IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING ) ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 416810b73a6e1..371664dd8a64e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -44,6 +44,7 @@ import org.opensearch.common.cache.tier.CachePolicyInfoWrapper; import org.opensearch.common.cache.tier.CacheValue; import org.opensearch.common.cache.tier.DiskTierProvider; +import org.opensearch.common.cache.tier.CachingTier; import org.opensearch.common.cache.tier.DiskTierTookTimePolicy; import org.opensearch.common.cache.tier.EhCacheDiskCachingTier; import org.opensearch.common.cache.tier.OnHeapCachingTier; @@ -67,16 +68,20 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; + import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -146,7 +151,10 @@ public final class IndicesRequestCache implements TieredCacheEventListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); - private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); + private final Map keysToClean = ConcurrentCollections.newConcurrentMap(); + // A map to keep track of the number of keys to be cleaned for a given ShardId and readerCacheKeyId + private final ConcurrentMap> diskCleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + private final AtomicInteger staleKeysInDiskCount = new AtomicInteger(0); private final ByteSizeValue size; private final TimeValue expire; private final TieredCacheService tieredCacheService; @@ -192,7 +200,22 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService + ) { + this.size = INDICES_CACHE_QUERY_SIZE.get(settings); + this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; + long sizeInBytes = size.getBytes(); + this.indicesService = indicesService; + this.tieredCacheService = tieredCacheService; + this.settings = settings; + this.clusterSettings = clusterSettings; } @Override @@ -204,8 +227,16 @@ public void close() { } void clear(CacheEntity entity) { - keysToClean.add(new CleanupKey(entity, null)); + CleanupKey cleanupKey = new CleanupKey(entity, null); + keysToClean.put(cleanupKey, new CleanupStatus()); + updateStaleKeysInDiskCount(cleanupKey); cleanCache(); + /* + this would be triggered by the cache clear API call + we need to make sure we clean the disk cache as well + hence passing threshold as 0 + */ + cleanDiskCache(0); } @Override @@ -226,6 +257,77 @@ public void onHit(Key key, CacheValue cacheValue) { @Override public void onCached(Key key, BytesReference value, TierType tierType) { key.entity.onCached(key, value, tierType); + updateDiskCleanupKeyToCountMap(new CleanupKey(key.entity, key.readerCacheKeyId), tierType); + } + + /** + * Updates the diskCleanupKeyToCountMap with the given CleanupKey and TierType. + * If the TierType is not DISK, the method returns without making any changes. + * If the ShardId associated with the CleanupKey does not exist in the map, a new entry is created. + * The method increments the count of the CleanupKey in the map. + * + * Why use ShardID as the key ? + * CacheEntity mainly contains IndexShard, both of these classes do not override equals() and hashCode() methods. + * ShardID class properly overrides equals() and hashCode() methods. + * Therefore, to avoid modifying CacheEntity and IndexShard classes to override these methods, we use ShardID as the key. + * + * @param cleanupKey the CleanupKey to be updated in the map + * @param tierType the TierType of the CleanupKey + */ + private void updateDiskCleanupKeyToCountMap(CleanupKey cleanupKey, TierType tierType) { + if(!tierType.equals(TierType.DISK)) { + return; + } + IndexShard indexShard = (IndexShard)cleanupKey.entity.getCacheIdentity(); + if(indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning tier : {}", + cleanupKey.readerCacheKeyId, tierType.getStringValue()); + return; + } + ShardId shardId = indexShard.shardId(); + + diskCleanupKeyToCountMap + .computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) + .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + } + + /** + * Updates the count of stale keys in the disk cache. + * This method is called when a CleanupKey is added to the keysToClean set. + * It increments the staleKeysInDiskCount by the count of the CleanupKey in the diskCleanupKeyToCountMap. + * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysInDiskCount + * by the total count of keys associated with the CleanupKey's ShardId in the diskCleanupKeyToCountMap and removes the ShardId from the map. + * + * @param cleanupKey the CleanupKey that has been marked for cleanup + */ + private void updateStaleKeysInDiskCount(CleanupKey cleanupKey) { + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if(indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {}", cleanupKey.readerCacheKeyId); + return; + } + ShardId shardId = indexShard.shardId(); + + ConcurrentMap countMap = diskCleanupKeyToCountMap.get(shardId); + if (countMap == null) { + return; + } + + if (cleanupKey.readerCacheKeyId == null) { + int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); + staleKeysInDiskCount.addAndGet(totalSum); + diskCleanupKeyToCountMap.remove(shardId); + return; + } + Integer count = countMap.get(cleanupKey.readerCacheKeyId); + if (count == null) { + return; + } + staleKeysInDiskCount.addAndGet(count); + countMap.remove(cleanupKey.readerCacheKeyId); + if (countMap.isEmpty()) { + diskCleanupKeyToCountMap.remove(shardId); + } } BytesReference getOrCompute( @@ -245,7 +347,7 @@ BytesReference getOrCompute( Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - // see if its the first time we see this reader, and make sure to register a cleanup key + // see if it's the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); @@ -259,9 +361,10 @@ BytesReference getOrCompute( /** * Invalidates the given the cache entry for the given key and it's context + * * @param cacheEntity the cache entity to invalidate for - * @param reader the reader to invalidate the cache entry for - * @param cacheKey the cache key to invalidate + * @param reader the reader to invalidate the cache entry for + * @param cacheKey the cache key to invalidate */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; @@ -306,6 +409,11 @@ public BytesReference load(Key key) throws Exception { } } + public class CleanupStatus { + public boolean cleanedInHeap; + public boolean cleanedOnDisk; + } + /** * Basic interface to make this cache testable. */ @@ -420,7 +528,8 @@ private CleanupKey(CacheEntity entity, String readerCacheKeyId) { public void onClose(IndexReader.CacheKey cacheKey) { Boolean remove = registeredClosedListeners.remove(this); if (remove != null) { - keysToClean.add(this); + keysToClean.put(this, new CleanupStatus()); + updateStaleKeysInDiskCount(this); } } @@ -448,33 +557,130 @@ public int hashCode() { * Logic to clean up in-memory cache. */ synchronized void cleanCache() { + cleanCache(TierType.ON_HEAP); + tieredCacheService.getOnHeapCachingTier().refresh(); + } + + /** + * Logic to clean up disk based cache. + *

+ * TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic. + */ + synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { + if (!canSkipDiskCacheCleanup(diskCachesCleanThresholdPercent)) { + cleanCache(TierType.DISK); + } + } + + private synchronized void cleanCache(TierType cacheType) { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); - currentKeysToClean.clear(); - currentFullClean.clear(); - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { - // null indicates full cleanup, as does a closed shard + + /* + Stores the keys that need to be removed from keysToClean + This is done to avoid ConcurrentModificationException + */ + final Set keysCleanedFromAllCaches = new HashSet<>(); + + for (Map.Entry entry : keysToClean.entrySet()) { + CleanupKey cleanupKey = entry.getKey(); + CleanupStatus cleanupStatus = entry.getValue(); + + if (cleanupStatus.cleanedInHeap && cleanupStatus.cleanedOnDisk) { + keysCleanedFromAllCaches.add(cleanupKey); + continue; + } + + if (cacheType == TierType.ON_HEAP && cleanupStatus.cleanedInHeap) continue; + if (cacheType == TierType.DISK && cleanupStatus.cleanedOnDisk) continue; + + if (needsFullClean(cleanupKey)) { currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { currentKeysToClean.add(cleanupKey); } + + if (cacheType == TierType.ON_HEAP) { + cleanupStatus.cleanedInHeap = true; + } else if (cacheType == TierType.DISK) { + cleanupStatus.cleanedOnDisk = true; + } + } + + // Remove keys that have been cleaned from all caches + keysToClean.keySet().removeAll(keysCleanedFromAllCaches); + + // Early exit if no cleanup is needed + if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { + return; + } + + CachingTier cachingTier; + + if (cacheType == TierType.ON_HEAP) { + cachingTier = tieredCacheService.getOnHeapCachingTier(); + } else { + cachingTier = tieredCacheService.getDiskCachingTier().get(); + } + + cleanUpKeys( + cachingTier, + currentKeysToClean, + currentFullClean + ); + } + + private synchronized boolean canSkipDiskCacheCleanup(double diskCachesCleanThresholdPercent) { + if (tieredCacheService.getDiskCachingTier().isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since disk caching tier is not present"); + } + return true; + } + if (tieredCacheService.getDiskCachingTier().get().count() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since disk caching tier is empty"); + } + return true; + } + if (diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since the percentage of stale keys in disk cache is less than the threshold"); + } + return true; } - if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { - Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { - iterator.remove(); - } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { - iterator.remove(); - } + return false; + } + + synchronized double diskCleanupKeysPercentage() { + int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier() + .map(CachingTier::count) + .orElse(0); + if (totalKeysInDiskCache == 0 || staleKeysInDiskCount.get() == 0) { + return 0; + } + return ((double) staleKeysInDiskCount.get() / totalKeysInDiskCache); + } + + synchronized void cleanUpKeys( + CachingTier cachingTier, + Set currentKeysToClean, + Set currentFullClean + ) { + for (Key key : cachingTier.keys()) { + CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); + if (currentFullClean.contains(key.entity.getCacheIdentity()) || currentKeysToClean.contains(cleanupKey)) { + cachingTier.invalidate(key); + if(cachingTier.getTierType() == TierType.DISK) { + staleKeysInDiskCount.decrementAndGet(); } } } - tieredCacheService.getOnHeapCachingTier().refresh(); + } + + private boolean needsFullClean(CleanupKey cleanupKey) { + // null indicates full cleanup, as does a closed shard + return cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen(); } /** @@ -484,6 +690,7 @@ long count() { return tieredCacheService.count(); } + // to be used for testing only int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } @@ -511,4 +718,23 @@ public EhCacheDiskCachingTier createNewDiskTier() { .setValueSerializer(new BytesReferenceSerializer()) .build(); } + // to be used for testing only + void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing + keysToClean.put(new CleanupKey(entity, readerCacheKeyId), new CleanupStatus()); + } + + // to be used for testing only + int getKeysToCleanSizeForTesting() { // for testing + return keysToClean.size(); + } + + // to be used for testing only + Key createKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing + return new Key(entity, null, readerCacheKeyId); + } + + // to be used for testing only + void setStaleKeysInDiskCountForTesting(int count) { + staleKeysInDiskCount.set(count); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index c8150103b29b5..55ee2854ba6c0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -215,11 +215,25 @@ public class IndicesService extends AbstractLifecycleComponent private static final Logger logger = LogManager.getLogger(IndicesService.class); public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY = "indices.requests.cache.tiered.disk.cleanup_threshold_percentage"; + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_DEFAULT_VALUE = "50%"; + + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING_KEY = "indices.requests.cache.tiered.disk.cleanup_interval"; + + public static final Setting INDICES_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( "indices.cache.cleanup_interval", TimeValue.timeValueMinutes(1), Property.NodeScope ); + + public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( + INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING_KEY, + TimeValue.timeValueMinutes(1), + Property.Dynamic, + Property.NodeScope + ); + public static final Setting INDICES_ID_FIELD_DATA_ENABLED_SETTING = Setting.boolSetting( "indices.id_field_data.enabled", true, @@ -227,6 +241,30 @@ public class IndicesService extends AbstractLifecycleComponent Property.NodeScope ); + public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING = + Setting.simpleString( + "cluster.request_cache.disk.cleanup_threshold_percentage", + INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_DEFAULT_VALUE, + value -> { + String errorLogBase = "Setting 'cluster.request_cache.disk.cleanup_threshold_percentage' " + + " must be "; + if (!value.endsWith("%")) { + throw new IllegalArgumentException(errorLogBase + "a percentage"); + } + String rawValue = value.substring(0, value.length() - 1); + try { + double doubleValue = Double.parseDouble(rawValue); + if (doubleValue < 0 || doubleValue > 100) { + throw new IllegalArgumentException(errorLogBase + "between 0% and 100%"); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(errorLogBase + "a valid percentage", e); + } + }, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting WRITE_DANGLING_INDICES_INFO_SETTING = Setting.boolSetting( "gateway.write_dangling_indices_info", true, @@ -324,6 +362,7 @@ public class IndicesService extends AbstractLifecycleComponent private final IndexScopedSettings indexScopedSettings; private final IndicesFieldDataCache indicesFieldDataCache; private final CacheCleaner cacheCleaner; + private final DiskCacheCleaner diskCacheCleaner; private final ThreadPool threadPool; private final CircuitBreakerService circuitBreakerService; private final BigArrays bigArrays; @@ -337,7 +376,9 @@ public class IndicesService extends AbstractLifecycleComponent private final MapperRegistry mapperRegistry; private final NamedWriteableRegistry namedWriteableRegistry; private final IndexingMemoryController indexingMemoryController; - private final TimeValue cleanInterval; + private final TimeValue onHeapCachesCleanInterval; + private final TimeValue diskCachesCleanInterval; + private final double diskCachesCleanThreshold; final IndicesRequestCache indicesRequestCache; // pkg-private for testing private final IndicesQueryCache indicesQueryCache; private final MetaStateService metaStateService; @@ -365,8 +406,9 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doStart() { - // Start thread that will manage cleaning the field data cache periodically - threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); + // Start threads that will manage cleaning the field data and request caches periodically + threadPool.schedule(this.cacheCleaner, this.onHeapCachesCleanInterval, ThreadPool.Names.SAME); + threadPool.schedule(this.diskCacheCleaner, this.diskCachesCleanInterval, ThreadPool.Names.SAME); } public IndicesService( @@ -434,8 +476,11 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); } }); - this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); - this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval); + this.onHeapCachesCleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); + this.diskCachesCleanInterval = INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.get(settings); + this.diskCachesCleanThreshold = getCleanupKeysThresholdPercentage(settings);; + this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.onHeapCachesCleanInterval); + this.diskCacheCleaner = new DiskCacheCleaner(indicesRequestCache, logger, threadPool, this.diskCachesCleanInterval, this.diskCachesCleanThreshold); this.metaStateService = metaStateService; this.engineFactoryProviders = engineFactoryProviders; @@ -495,6 +540,12 @@ protected void closeInternal() { this.recoverySettings = recoverySettings; } + private double getCleanupKeysThresholdPercentage(Settings settings) { + String threshold = INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.get(settings); + String rawValue = threshold.substring(0, threshold.length() - 1); + return Double.parseDouble(rawValue) / 100; + } + /** * The changes to dynamic cluster setting {@code cluster.default.index.refresh_interval} needs to be updated. This * method gets called whenever the setting changes. We set the instance variable with the updated value as this is @@ -1563,21 +1614,37 @@ public AnalysisRegistry getAnalysis() { return analysisRegistry; } + private static abstract class AbstractCacheCleaner implements Runnable, Releasable { + + protected final Logger logger; + protected final ThreadPool threadPool; + protected final TimeValue interval; + protected final AtomicBoolean closed = new AtomicBoolean(false); + + AbstractCacheCleaner( + Logger logger, + ThreadPool threadPool, + TimeValue interval + ) { + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + } + + @Override + public void close() { + closed.compareAndSet(false, true); + } + } + /** - * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache - * periodically. In this case it is the field data cache, because a cache that - * has an entry invalidated may not clean up the entry if it is not read from - * or written to after invalidation. + * CacheCleaner is a scheduled Runnable used to clean Field Data Caches and/or request caches periodically. * * @opensearch.internal */ - private static final class CacheCleaner implements Runnable, Releasable { + private static final class CacheCleaner extends AbstractCacheCleaner implements Runnable, Releasable { private final IndicesFieldDataCache cache; - private final Logger logger; - private final ThreadPool threadPool; - private final TimeValue interval; - private final AtomicBoolean closed = new AtomicBoolean(false); private final IndicesRequestCache requestCache; CacheCleaner( @@ -1587,11 +1654,9 @@ private static final class CacheCleaner implements Runnable, Releasable { ThreadPool threadPool, TimeValue interval ) { + super(logger, threadPool, interval); this.cache = cache; this.requestCache = requestCache; - this.logger = logger; - this.threadPool = threadPool; - this.interval = interval; } @Override @@ -1619,13 +1684,43 @@ public void run() { } // Reschedule itself to run again if not closed if (closed.get() == false) { - threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + this.threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); } } + } + + private static final class DiskCacheCleaner extends AbstractCacheCleaner implements Runnable, Releasable { + + private final IndicesRequestCache requestCache; + private final double diskCachesCleanThresholdPercent; + + DiskCacheCleaner( + IndicesRequestCache requestCache, + Logger logger, + ThreadPool threadPool, + TimeValue interval, + double diskCachesCleanThresholdPercent + ) { + super(logger, threadPool, interval); + this.diskCachesCleanThresholdPercent = diskCachesCleanThresholdPercent; + this.requestCache = requestCache; + } @Override - public void close() { - closed.compareAndSet(false, true); + public void run() { + long startTimeNS = System.nanoTime(); + if (logger.isTraceEnabled()) { + logger.trace("running periodic disk based request cache cleanup"); + } + try { + this.requestCache.cleanDiskCache(diskCachesCleanThresholdPercent); + } catch (Exception e) { + logger.warn("Exception during periodic request cache cleanup:", e); + } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + } } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8c49fc36076e2..5823ec8ba3c1d 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -47,11 +47,18 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; import org.opensearch.action.OriginalIndices; import org.opensearch.action.OriginalIndicesTests; import org.opensearch.action.search.SearchRequest; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.UUIDs; +import org.opensearch.common.cache.tier.DiskCachingTier; +import org.opensearch.common.cache.tier.EhCacheDiskCachingTier; +import org.opensearch.common.cache.tier.TieredCacheService; +import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; @@ -71,6 +78,8 @@ import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesRequestCache.CacheEntity; +import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.AliasFilter; @@ -81,10 +90,24 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + public IndexShard getIndexShardCache() { + IndexShard indexShard = mock(IndexShard.class); + ShardId shardId = mock(ShardId.class); + when(indexShard.shardId()).thenReturn(shardId); + return indexShard; + } public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -101,7 +124,7 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); // initial cache TestEntity entity = new TestEntity(requestCacheStats, indexShard); @@ -131,7 +154,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); cache.clear(entity); } cache.cleanCache(); @@ -148,12 +171,8 @@ public void testBasicOperationsCache() throws Exception { public void testCacheDifferentReaders() throws Exception { ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - getInstanceFromNode(IndicesService.class), - dummyClusterSettings - ); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -231,7 +250,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); cache.clear(secondEntity); } cache.cleanCache(); @@ -249,12 +268,8 @@ public void testEviction() throws Exception { final ByteSizeValue size; ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); { - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - getInstanceFromNode(IndicesService.class), - dummyClusterSettings - ); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -283,7 +298,7 @@ public void testEviction() throws Exception { getInstanceFromNode(IndicesService.class), dummyClusterSettings ); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -302,7 +317,7 @@ public void testEviction() throws Exception { writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + TestEntity thirdEntity = new TestEntity(requestCacheStats, indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); @@ -310,22 +325,17 @@ public void testEviction() throws Exception { BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); - assertEquals(2, cache.count()); + assertEquals(2, requestCacheStats.stats().getEntries()); assertEquals(1, requestCacheStats.stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } public void testClearAllEntityIdentity() throws Exception { ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - getInstanceFromNode(IndicesService.class), - dummyClusterSettings - ); - AtomicBoolean indexShard = new AtomicBoolean(true); - + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -344,8 +354,8 @@ public void testClearAllEntityIdentity() throws Exception { writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + IndexShard differentIdentity = getIndexShardCache(); + TestEntity thirdEntity = new TestEntity(requestCacheStats, differentIdentity); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); @@ -353,16 +363,16 @@ public void testClearAllEntityIdentity() throws Exception { BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); final long hitCount = requestCacheStats.stats().getHitCount(); - // clear all for the indexShard Idendity even though is't still open + // clear all for the indexShard Identity even though it isn't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity - value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); assertEquals("baz", value3.streamInput().readString()); @@ -372,8 +382,8 @@ public void testClearAllEntityIdentity() throws Exception { public Iterable newDoc(int id, String value) { return Arrays.asList( - newField("id", Integer.toString(id), StringField.TYPE_STORED), - newField("value", value, StringField.TYPE_STORED) + newField("id", Integer.toString(id), StringField.TYPE_STORED), + newField("value", value, StringField.TYPE_STORED) ); } @@ -421,7 +431,7 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); // initial cache TestEntity entity = new TestEntity(requestCacheStats, indexShard); @@ -465,7 +475,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -480,8 +490,8 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndexShard trueBoolean = getIndexShardCache(); + IndexShard falseBoolean = getIndexShardCache(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); @@ -621,10 +631,11 @@ public boolean isFragment() { } private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; + private final IndexShard standInForIndexShard; private final ShardRequestCache shardRequestCache; + private boolean isOpen = true; - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { + private TestEntity(ShardRequestCache shardRequestCache, IndexShard standInForIndexShard) { this.standInForIndexShard = standInForIndexShard; this.shardRequestCache = shardRequestCache; } @@ -636,7 +647,11 @@ protected ShardRequestCache stats() { @Override public boolean isOpen() { - return standInForIndexShard.get(); + return this.isOpen; + } + + public void setIsOpen(boolean isOpen) { + this.isOpen = isOpen; } @Override @@ -650,6 +665,195 @@ public long ramBytesUsed() { } @Override - public void writeTo(StreamOutput out) throws IOException {} + public void writeTo(StreamOutput out) throws IOException { + } + } + + public static class CleanDiskCacheTests { + private IndicesRequestCache indicesRequestCache; + private TieredCacheService tieredCacheService; + private DiskCachingTier diskCachingTier; + private IndicesService indicesService; + + @Before + public void setup() { + tieredCacheService = mock(TieredCacheService.class); + diskCachingTier = mock(DiskCachingTier.class); + indicesService = mock(IndicesService.class); + indicesRequestCache = new IndicesRequestCache( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + indicesService, + tieredCacheService + ); + } + + @Test + public void shouldNotCleanDiskCacheWhenEmpty() { + final int DISK_CACHE_COUNT = 0; + final double CLEANUP_THRESHOLD = 50.0; + + when(tieredCacheService.getDiskCachingTier()).thenReturn( + (Optional>) Optional.of(diskCachingTier) + ); + when(diskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + + indicesRequestCache.cleanDiskCache(CLEANUP_THRESHOLD); + + verify(diskCachingTier, never()).keys(); + } + + @Test + public void shouldNotCleanDiskCacheWhenCleanupKeysPercentageIsBelowThreshold() { + final int DISK_CACHE_COUNT = 1; + final double CLEANUP_THRESHOLD = 49.0; + + when(tieredCacheService.getDiskCachingTier()).thenReturn( + (Optional>) Optional.of(diskCachingTier) + ); + when(diskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + + indicesRequestCache.cleanDiskCache(CLEANUP_THRESHOLD); + + verify(diskCachingTier, never()).keys(); + } + + @Test + public void cleanDiskCacheWhenCleanupKeysPercentageIsGreaterThanOrEqualToThreshold() { + final int DISK_CACHE_COUNT = 100; + final double CLEANUP_THRESHOLD = 50.0; + final int STALE_KEYS_IN_DISK_COUNT = 51; + + // Mock dependencies + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + Iterator mockIterator = mock(Iterator.class); + Iterable mockIterable = () -> mockIterator; + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + mockIndicesService, + mockTieredCacheService + ); + + // Set up mocks + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + when(mockDiskCachingTier.keys()).thenReturn(mockIterable); + when(mockIterator.hasNext()).thenReturn(true, true, false); + + // Create mock Keys and return them when next() is called + CacheEntity mockEntity = mock(CacheEntity.class); + Key firstMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId1"); + Key secondMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId2"); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + when(mockIterator.next()).thenReturn(firstMockKey, secondMockKey); + + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(STALE_KEYS_IN_DISK_COUNT); + cache.cleanDiskCache(CLEANUP_THRESHOLD); + + // Verify interactions + verify(mockDiskCachingTier).keys(); + verify(mockIterator, times(2)).next(); + } + + @Test + public void cleanDiskCacheAndCallInvalidateOfDiskTier() { + final int DISK_CACHE_COUNT = 100; + final double CLEANUP_THRESHOLD = 50.0; + final int STALE_KEYS_IN_DISK_COUNT = 51; + + // Mock dependencies + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheService.class); + DiskCachingTier mockDiskCachingTier = mock(EhCacheDiskCachingTier.class); + Iterator mockIterator = mock(Iterator.class); + Iterable mockIterable = () -> mockIterator; + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + mockIndicesService, + mockTieredCacheService + ); + + // Set up mocks + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + when(mockDiskCachingTier.keys()).thenReturn(mockIterable); + when(mockIterator.hasNext()).thenReturn(true, true, false); + + // Create mock Keys and return them when next() is called + CacheEntity mockEntity = mock(CacheEntity.class); + Key firstMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId1"); + Key secondMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId2"); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + when(mockIterator.next()).thenReturn(firstMockKey, secondMockKey); + + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(STALE_KEYS_IN_DISK_COUNT); + cache.cleanDiskCache(CLEANUP_THRESHOLD); + + // Verify interactions + verify(mockDiskCachingTier).keys(); + verify(mockDiskCachingTier, times(1)).invalidate(any(IndicesRequestCache.Key.class)); + verify(mockIterator, times(2)).next(); + } + + @Test + public void diskCleanupKeysPercentageWhenDiskCacheIsEmpty() { + when(tieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(diskCachingTier)); + when(diskCachingTier.count()).thenReturn(0); + + double result = indicesRequestCache.diskCleanupKeysPercentage(); + + assertEquals(0, result, 0); + } + + @Test + public void diskCleanupKeysPercentageWhenKeysToCleanIsEmpty() { + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheSpilloverStrategyService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(100); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + mockIndicesService, + mockTieredCacheService + ); + + double result = cache.diskCleanupKeysPercentage(); + + assertEquals(0, result, 0.001); + } + + @Test + public void diskCleanupKeysPercentageWhenDiskCacheAndKeysToCleanAreNotEmpty() { + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheSpilloverStrategyService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(100); + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + mockIndicesService, + mockTieredCacheService + ); + + IndicesRequestCache.CacheEntity mockEntity = Mockito.mock(IndicesRequestCache.CacheEntity.class); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(1); + + double result = cache.diskCleanupKeysPercentage(); + assertEquals(1.0, result, 0); + } } }