Skip to content

Commit

Permalink
spotlessApply
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Mar 1, 2024
1 parent 60df761 commit ea33af8
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final ICache<K, V> onHeapCache;

// TODO: Listeners for removals from the two tiers
//private final RemovalListener<ICacheKey<K>, V> onDiskRemovalListener;
//private final RemovalListener<ICacheKey<K>, V> onHeapRemovalListener;
// private final RemovalListener<ICacheKey<K>, V> onDiskRemovalListener;
// private final RemovalListener<ICacheKey<K>, V> onHeapRemovalListener;

// The listener for removals from the spillover cache as a whole
private final RemovalListener<ICacheKey<K>, V> removalListener;
Expand All @@ -67,21 +67,20 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
this.onHeapCache = builder.onHeapCacheFactory.create(new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
builder.cacheType,
builder.cacheFactories

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;

public class TieredSpilloverCacheTests extends OpenSearchTestCase {
// TODO: TSC has no stats implementation yet - fix these tests once it does
// TODO: These tests are uncommented in the second stats rework PR, which adds a TSC stats implementation
/*public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
int keyValueSize = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,22 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.spi.serialization.SerializerException;
import org.opensearch.OpenSearchException;
import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.ICacheKeySerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.stats.CacheStatsDimension;
import org.opensearch.common.cache.stats.MultiDimensionCacheStats;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.serializer.ICacheKeySerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -60,13 +57,15 @@
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;
import org.ehcache.spi.loaderwriter.CacheLoadingException;
import org.ehcache.spi.loaderwriter.CacheWritingException;
import org.ehcache.spi.serialization.SerializerException;

import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_ALIAS_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY;
Expand Down Expand Up @@ -153,7 +152,8 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.ehCacheEventListener = new EhCacheEventListener<K, V>(
Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null"),
Objects.requireNonNull(builder.getWeigher(), "Weigher function can't be null"),
this.valueSerializer);
this.valueSerializer
);
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
this.stats = new MultiDimensionCacheStats(dimensionNames, TIER_DIMENSION_VALUE);
Expand Down Expand Up @@ -456,9 +456,11 @@ class EhCacheEventListener<K, V> implements CacheEventListener<ICacheKey<K>, byt
private ToLongBiFunction<ICacheKey<K>, V> weigher;
private Serializer<V, byte[]> valueSerializer;

EhCacheEventListener(RemovalListener<ICacheKey<K>, V> removalListener,
ToLongBiFunction<ICacheKey<K>, V> weigher,
Serializer<V, byte[]> valueSerializer) {
EhCacheEventListener(
RemovalListener<ICacheKey<K>, V> removalListener,
ToLongBiFunction<ICacheKey<K>, V> weigher,
Serializer<V, byte[]> valueSerializer
) {
this.removalListener = removalListener;
this.weigher = weigher;
this.valueSerializer = valueSerializer;
Expand All @@ -481,20 +483,30 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends byte[]> event)
assert event.getOldValue() == null;
break;
case EVICTED:
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EVICTED));
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EVICTED)
);
stats.decrementEntriesByDimensions(event.getKey().dimensions);
stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event));
stats.incrementEvictionsByDimensions(event.getKey().dimensions);
assert event.getNewValue() == null;
break;
case REMOVED:
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EXPLICIT));
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EXPLICIT)
);
stats.decrementEntriesByDimensions(event.getKey().dimensions);
stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
case EXPIRED:
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.INVALIDATED));
this.removalListener.onRemoval(
new RemovalNotification<>(
event.getKey(),
valueSerializer.deserialize(event.getOldValue()),
RemovalReason.INVALIDATED
)
);
stats.decrementEntriesByDimensions(event.getKey().dimensions);
stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event));
assert event.getNewValue() == null;
Expand All @@ -512,6 +524,7 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends byte[]> event)

private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
private ICacheKeySerializer<K> serializer;

public KeySerializerWrapper(Serializer<K, byte[]> internalKeySerializer) {
this.serializer = new ICacheKeySerializer<>(internalKeySerializer);
}
Expand All @@ -520,6 +533,7 @@ public KeySerializerWrapper(Serializer<K, byte[]> internalKeySerializer) {
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
public ByteBuffer serialize(ICacheKey object) throws SerializerException {
return ByteBuffer.wrap(serializer.serialize(object));
Expand Down Expand Up @@ -705,7 +719,7 @@ public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
return this;
}

//@Override
// @Override
public EhcacheDiskCache<K, V> build() {
return new EhcacheDiskCache<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.stats.CacheStatsDimension;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStatsDimension;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -39,9 +39,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToLongBiFunction;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_MAX_SIZE_IN_BYTES_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;

public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -500,7 +500,7 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception {
// Try to hit different request with the same key concurrently. Loader throws exception.
for (int i = 0; i < numberOfRequest; i++) {
threads[i] = new Thread(() -> {
LoadAwareCacheLoader<ICacheKey<String >, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() {
LoadAwareCacheLoader<ICacheKey<String>, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() {
boolean isLoaded;

@Override
Expand Down Expand Up @@ -543,7 +543,10 @@ public void testMemoryTracking() throws Exception {
ToLongBiFunction<ICacheKey<String>, String> weigher = getWeigher();
int initialKeyLength = 40;
int initialValueLength = 40;
long sizeForOneInitialEntry = weigher.applyAsLong(new ICacheKey<>(generateRandomString(initialKeyLength), getMockDimensions()), generateRandomString(initialValueLength));
long sizeForOneInitialEntry = weigher.applyAsLong(
new ICacheKey<>(generateRandomString(initialKeyLength), getMockDimensions()),
generateRandomString(initialValueLength)
);
int maxEntries = 2000;
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -603,7 +606,7 @@ public void testMemoryTracking() throws Exception {
}
// TODO: Ehcache incorrectly evicts at 30-40% of max size. Fix this test once we figure out why.
// Since the EVICTED and EXPIRED cases use the same code as REMOVED, we should be ok on testing them for now.
//assertEquals(maxEntries * sizeForOneInitialEntry, ehcacheTest.stats().getTotalMemorySize());
// assertEquals(maxEntries * sizeForOneInitialEntry, ehcacheTest.stats().getTotalMemorySize());

ehcacheTest.close();
}
Expand Down Expand Up @@ -632,8 +635,18 @@ public void testGetStatsByTierName() throws Exception {
for (int i = 0; i < randomKeys; i++) {
ehcacheTest.put(getICacheKey(UUID.randomUUID().toString()), UUID.randomUUID().toString());
}
assertEquals(randomKeys, ehcacheTest.stats().getEntriesByDimensions(List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, EhcacheDiskCache.TIER_DIMENSION_VALUE))));
assertEquals(0, ehcacheTest.stats().getEntriesByDimensions(List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, "other_tier_value"))));
assertEquals(
randomKeys,
ehcacheTest.stats()
.getEntriesByDimensions(
List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, EhcacheDiskCache.TIER_DIMENSION_VALUE))
)
);
assertEquals(
0,
ehcacheTest.stats()
.getEntriesByDimensions(List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, "other_tier_value")))
);

ehcacheTest.close();
}
Expand Down Expand Up @@ -676,6 +689,7 @@ private ToLongBiFunction<ICacheKey<String>, String> getWeigher() {

class MockRemovalListener<K, V> implements RemovalListener<ICacheKey<K>, V> {
AtomicInteger onRemovalCount = new AtomicInteger();

@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
onRemovalCount.incrementAndGet();
Expand All @@ -684,6 +698,7 @@ public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {

static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.common.cache;


import org.opensearch.common.cache.stats.CacheStatsDimension;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,43 @@ public interface CacheStats extends Writeable {

// Methods to get all 5 values at once, either in total or for a specific set of dimensions.
CacheStatsResponse getTotalStats();

CacheStatsResponse getStatsByDimensions(List<CacheStatsDimension> dimensions);

// Methods to get total values.
long getTotalHits();

long getTotalMisses();

long getTotalEvictions();

long getTotalMemorySize();

long getTotalEntries();

// Methods to get values for a specific set of dimensions.
// Returns the sum of values for cache entries that match all dimensions in the list.
long getHitsByDimensions(List<CacheStatsDimension> dimensions);

long getMissesByDimensions(List<CacheStatsDimension> dimensions);

long getEvictionsByDimensions(List<CacheStatsDimension> dimensions);

long getMemorySizeByDimensions(List<CacheStatsDimension> dimensions);
long getEntriesByDimensions(List<CacheStatsDimension> dimensions);

long getEntriesByDimensions(List<CacheStatsDimension> dimensions);

void incrementHitsByDimensions(List<CacheStatsDimension> dimensions);

void incrementMissesByDimensions(List<CacheStatsDimension> dimensions);

void incrementEvictionsByDimensions(List<CacheStatsDimension> dimensions);

// Can also use to decrement, with negative values
void incrementMemorySizeByDimensions(List<CacheStatsDimension> dimensions, long amountBytes);

void incrementEntriesByDimensions(List<CacheStatsDimension> dimensions);

void decrementEntriesByDimensions(List<CacheStatsDimension> dimensions);

// Resets memory and entries stats but leaves the others; called when the cache clears itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class CacheStatsDimension implements Writeable {
public static final String TIER_DIMENSION_NAME = "tier";
public final String dimensionName;
public final String dimensionValue;

public CacheStatsDimension(String dimensionName, String dimensionValue) {
this.dimensionName = dimensionName;
this.dimensionValue = dimensionValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public CacheStatsResponse(StreamInput in) throws IOException {
}

public CacheStatsResponse() {
this(0,0,0,0,0);
this(0, 0, 0, 0, 0);
}

public synchronized void add(CacheStatsResponse other) {
Expand Down
Loading

0 comments on commit ea33af8

Please sign in to comment.