Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Stale Keys from Disk AND RBM #13

Merged
merged 22 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
52b9dfd
Update IndicesRequestCache.java
kiranprakash154 Nov 14, 2023
5bab65d
More updates to disk cache cleanup
kiranprakash154 Nov 17, 2023
4500090
register INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING in cluste…
kiranprakash154 Nov 29, 2023
4199bc2
make INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING dynamic property
kiranprakash154 Nov 29, 2023
6e05b9d
minor changes
kiranprakash154 Nov 29, 2023
aa3c39c
change the way we calculate disk key staleness
kiranprakash154 Nov 29, 2023
83d70f7
fix breaking tests
kiranprakash154 Nov 29, 2023
caae291
Fix test to include IndexShard
kiranprakash154 Nov 30, 2023
9d0eab1
UT for testing invalidate of DiskTier is called
kiranprakash154 Nov 30, 2023
5c472ae
Introduce CleanupStatus to keysToClean
kiranprakash154 Dec 5, 2023
be10a16
use that cleanupStatus and logic to update staleKeyEntries
kiranprakash154 Dec 5, 2023
4c8b240
register INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING to cluster…
kiranprakash154 Dec 5, 2023
eeb973e
Add removal listener to update eh cache stats
kiranprakash154 Dec 5, 2023
f22cdaf
re-organize imports
kiranprakash154 Dec 5, 2023
280cb07
Add IT tests
kiranprakash154 Dec 5, 2023
27f122b
Merge branch 'feature/tiered-caching' into kp/delete-stale-keys
kiranprakash154 Dec 5, 2023
8f0b5bc
refactor cleanCache & cleanDiskCache to share methods
kiranprakash154 Dec 15, 2023
8c996b3
null checks for indexShard
kiranprakash154 Dec 15, 2023
4bd1503
some bugs i found
kiranprakash154 Dec 15, 2023
1911fa6
use assertNumCacheEntries
kiranprakash154 Dec 15, 2023
ae0b471
update print statement
kiranprakash154 Dec 15, 2023
fce110d
edit sleep time
kiranprakash154 Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.time.Duration;
import java.time.Instant;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
public void testDiskTierStats() throws Exception {
Expand Down Expand Up @@ -111,7 +113,172 @@ public void testDiskTierStats() throws Exception {
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 3, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests + 1, TierType.DISK, false);
assertDiskTierSpecificStats(client, "index", 2, tookTimeSoFar, tookTimeSoFar);
}

public void testDiskTierInvalidationByCleanCacheAPI() throws Exception {
int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// call clear cache api
client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get();
// fetch the stats again
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK);
}

// When entire disk tier is stale, test whether cache cleaner cleans up everything from disk
public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set lower so the test doesn't take so long to run? Or does it flake if you do that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it flakes, i'm setting the threshold low enough so that by the time the cleaner runs, we have inserted 2 entries.
and then put the tests to sleep until the cachecleaner runs the job. when test thread wakes up the entries would have been deleted.

Instant start = Instant.now();
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// index a doc and force refresh so that the cache cleaner can clean the cache
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500;
Thread.sleep(sleepTime);

// by now cache cleaner would have run and cleaned up stale keys
// fetch the stats again
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK);
}

// When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk
public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Instant start = Instant.now();
int heapSizeBytes = 987;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=text").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// force refresh so that it creates stale keys in the cache for the cache cleaner to pick up.
flushAndRefresh("index");
client().prepareIndex("index").setId("1").setSource("k", "good bye");
ensureSearchable("index");

for (int i = 0; i < 6; i++) { // index 6 items with the new readerCacheKeyId
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
}

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500;
Thread.sleep(sleepTime);

// make sure we have 5 entries in disk.
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 5, TierType.DISK);
}

private long getCacheSizeBytes(Client client, String index, TierType tierType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@

package org.opensearch.common.cache.tier;

import org.ehcache.Cache;
import org.ehcache.CachePersistenceException;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.serialization.SerializerException;
import org.opensearch.OpenSearchException;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -27,23 +43,6 @@
import java.util.Optional;
import java.util.function.Supplier;

import org.ehcache.Cache;
import org.ehcache.CachePersistenceException;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.serialization.SerializerException;

/**
* An ehcache-based disk tier implementation.
* @param <K> The key type of cache entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,13 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING,
IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING,
IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING,
DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING
)
)
Expand Down
Loading