Skip to content

Commit

Permalink
attempt to fix IT bug
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Jan 3, 2024
1 parent 14fbbc5 commit d64cd26
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,42 +118,31 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
private final TimeValue expire;
private final TieredCacheService<Key, BytesReference> tieredCacheService;
private final IndicesService indicesService;
private final Settings settings;

IndicesRequestCache(Settings settings, IndicesService indicesService) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
this.indicesService = indicesService;
this.settings = settings;

// Initialize onHeap cache tier first.
OnHeapCachingTier<Key, BytesReference> openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder<Key, BytesReference>().setWeigher(
(k, v) -> k.ramBytesUsed() + v.ramBytesUsed()
).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build();

// TODO: Enable/disable switch for disk tier, in cluster settings PR
long CACHE_SIZE_IN_BYTES = 10000000L; // Set to 10 MB for now, will be changed in cluster settings PR
String SETTING_PREFIX = "indices.request.cache";
String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache";

EhCacheDiskCachingTier<Key, BytesReference> ehcacheDiskTier = new EhCacheDiskCachingTier.Builder<Key, BytesReference>()
.setKeyType(Key.class)
.setValueType(BytesReference.class)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setSettings(settings)
.setThreadPoolAlias("ehcacheTest")
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES)
.setStoragePath(STORAGE_PATH)
.setSettingPrefix(SETTING_PREFIX)
.setKeySerializer(new IRCKeyWriteableSerializer(this))
.setValueSerializer(new BytesReferenceSerializer())
.build();
// Initialize tiered cache service.
TieredCacheSpilloverStrategyService.Builder<Key, BytesReference> tieredCacheServiceBuilder =
new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>()
.setOnHeapCachingTier(openSearchOnHeapCache)
.setTieredCacheEventListener(this);

// Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on.

tieredCacheService = new TieredCacheSpilloverStrategyService.Builder<Key, BytesReference>()
.setOnHeapCachingTier(openSearchOnHeapCache)
.setOnDiskCachingTier(ehcacheDiskTier)
.setTieredCacheEventListener(this)
.build();
EhCacheDiskCachingTier<Key, BytesReference> ehcacheDiskTier = createNewDiskTier();
tieredCacheServiceBuilder.setOnDiskCachingTier(ehcacheDiskTier);
tieredCacheService = tieredCacheServiceBuilder.build();
}

@Override
Expand Down Expand Up @@ -215,9 +204,6 @@ BytesReference getOrCompute(
}
}
}
// else {
// key.entity.onHit();
// }
return value;
}

Expand All @@ -243,7 +229,6 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference
* @opensearch.internal
*/
private static class Loader implements TieredCacheLoader<Key, BytesReference> {

private final CacheEntity entity;
private final CheckedSupplier<BytesReference, IOException> loader;
private boolean loaded;
Expand Down Expand Up @@ -309,7 +294,7 @@ interface CacheEntity extends Accountable, Writeable {
*
* @opensearch.internal
*/
class Key implements Accountable, Writeable {
class Key implements Accountable, Writeable {
private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

public final CacheEntity entity; // use as identity equality
Expand Down Expand Up @@ -446,4 +431,28 @@ long count() {
int numRegisteredCloseListeners() { // for testing
return registeredClosedListeners.size();
}

/**
* Creates a new disk tier instance. Should only be run if the instance will actually be used!
* Otherwise, it may not be closed properly.
* @return A new disk tier instance
*/
public EhCacheDiskCachingTier<Key, BytesReference> createNewDiskTier() {
//assert FeatureFlags.isEnabled(FeatureFlags.TIERED_CACHING);
long CACHE_SIZE_IN_BYTES = 10000000L; //INDICES_CACHE_DISK_TIER_SIZE.get(settings).getBytes();
String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache";

return new EhCacheDiskCachingTier.Builder<Key, BytesReference>()
.setKeyType(Key.class)
.setValueType(BytesReference.class)
.setExpireAfterAccess(TimeValue.MAX_VALUE) // TODO: Is this meant to be the same as IRC expire or different?
.setThreadPoolAlias("ehcacheThreadpool")
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES)
.setStoragePath(STORAGE_PATH)
.setKeySerializer(new IRCKeyWriteableSerializer(this))
.setValueSerializer(new BytesReferenceSerializer())
.setSettings(settings)
.setSettingPrefix("indices.requests.tier")
.build();
}
}

0 comments on commit d64cd26

Please sign in to comment.