14
14
import org .opensearch .common .cache .RemovalListener ;
15
15
import org .opensearch .common .cache .RemovalNotification ;
16
16
import org .opensearch .common .cache .RemovalReason ;
17
+ import org .opensearch .common .cache .tier .keystore .RBMIntKeyLookupStore ;
17
18
import org .opensearch .common .metrics .CounterMetric ;
18
19
import org .opensearch .common .settings .Setting ;
19
20
import org .opensearch .common .settings .Settings ;
42
43
import org .ehcache .event .EventType ;
43
44
import org .ehcache .expiry .ExpiryPolicy ;
44
45
import org .ehcache .impl .config .store .disk .OffHeapDiskStoreConfiguration ;
46
+ import org .opensearch .core .common .unit .ByteSizeValue ;
45
47
46
48
/**
47
49
* @param <K> The key type of cache entries
@@ -92,6 +94,7 @@ public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {
92
94
// Defines how many segments the disk cache is separated into. Higher number achieves greater concurrency but
93
95
// will hold that many file pointers.
94
96
public final Setting <Integer > DISK_SEGMENTS ;
97
+ private final RBMIntKeyLookupStore keystore ;
95
98
96
99
private final Serializer <K , byte []> keySerializer ;
97
100
private final Serializer <V , byte []> valueSerializer ;
@@ -124,6 +127,11 @@ private EhCacheDiskCachingTier(Builder<K, V> builder) {
124
127
close ();
125
128
cacheManager = buildCacheManager ();
126
129
this .cache = buildCache (Duration .ofMillis (expireAfterAccess .getMillis ()), builder );
130
+
131
+ // IndicesRequestCache gets 1%, of which we allocate 5% to the keystore = 0.05%
132
+ // TODO: how do we change this automatically based on INDICES_CACHE_QUERY_SIZE setting?
133
+ Setting <ByteSizeValue > keystoreSizeSetting = Setting .memorySizeSetting (builder .settingPrefix + ".tiered.disk.keystore_size" , "0.05%" );
134
+ this .keystore = new RBMIntKeyLookupStore (keystoreSizeSetting .get (this .settings ).getBytes ());
127
135
}
128
136
129
137
private PersistentCacheManager buildCacheManager () {
@@ -193,12 +201,16 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<
193
201
194
202
@ Override
195
203
public V get (K key ) {
196
- return valueSerializer .deserialize (cache .get (key ));
204
+ if (keystore .contains (key .hashCode ())) { // Check in-memory store of key hashes to avoid unnecessary disk seek
205
+ return valueSerializer .deserialize (cache .get (key ));
206
+ }
207
+ return null ;
197
208
}
198
209
199
210
@ Override
200
211
public void put (K key , V value ) {
201
212
cache .put (key , valueSerializer .serialize (value ));
213
+ keystore .add (key .hashCode ());
202
214
}
203
215
204
216
@ Override
@@ -211,6 +223,7 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception
211
223
public void invalidate (K key ) {
212
224
// There seems to be a thread leak issue while calling this and then closing cache.
213
225
cache .remove (key );
226
+ keystore .remove (key .hashCode ());
214
227
}
215
228
216
229
@ Override
@@ -227,6 +240,7 @@ public void setRemovalListener(RemovalListener<K, V> removalListener) {
227
240
@ Override
228
241
public void invalidateAll () {
229
242
// Clear up files.
243
+ keystore .clear ();
230
244
}
231
245
232
246
@ Override
0 commit comments