Skip to content

Commit

Permalink
Merge pull request ehcache#3130 from nnares/issue-3097
Browse files Browse the repository at this point in the history
issue-3097 : re-writting getAll() impl
  • Loading branch information
chrisdennis authored Aug 18, 2023
2 parents cef93dc + c4ca535 commit 5f43856
Show file tree
Hide file tree
Showing 11 changed files with 762 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,11 @@ public Map<K, ValueHolder<V>> bulkComputeIfAbsent(final Set<? extends K> keys, f
}
}

@Override
public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
return bulkComputeIfAbsent((Set<? extends K>) keys,mappingFunction).entrySet();
}

@Override
public List<CacheConfigurationChangeListener> getConfigurationChangeListeners() {
// TODO: Make appropriate ServerStoreProxy call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.ehcache.spi.service.ServiceConfiguration;

import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -75,7 +76,25 @@ public interface AuthoritativeTier<K, V> extends Store<K, V> {
*/
void setInvalidationValve(InvalidationValve valve);


/**
* Bulk method to compute a value for every key passed in the {@link Iterable} <code>keys</code> argument using the <code>mappingFunction</code>
* to compute the value.
* <p>
* The function takes an {@link Iterable} of {@link java.util.Map.Entry} key/value pairs, where each entry's value is its currently stored value
* for each key that is not mapped in the store. It is expected that the function should return an {@link Iterable} of {@link java.util.Map.Entry}
* key/value pairs containing an entry for each key that was passed to it.
* <p>
* Note: This method guarantees atomicity of computations for each individual key in {@code keys}. Implementations may choose to provide coarser grained atomicity.
*
* @param keys the keys to compute a new value for, if they're not in the store.
* @param mappingFunction the function that generates new values.
* @return a {@code Map} of key/value pairs for each key in <code>keys</code> to the previously missing value.
* @throws StoreAccessException when a failure occurs when accessing the store
*/
Iterable<? extends Map.Entry<? extends K,? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K,? extends V>>> mappingFunction) throws StoreAccessException;

/**
* Invalidation valve, that is the mechanism through which an {@link AuthoritativeTier} can request invalidations
* from the {@link CachingTier}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.ehcache.spi.service.ServiceConfiguration;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -107,6 +108,22 @@ public interface CachingTier<K, V> extends ConfigurationChangeSupport {
*/
void setInvalidationListener(InvalidationListener<K, V> invalidationListener);

/**
* Bulk method which takes {@link Set} of <code>keys</code> as argument and returns a {@link Map} of its mapped value from CachingTier,
* For all the missing entries from CachingTier using <code>mappingFunction</code> to compute its value
* <p>
* The function takes an {@link Iterable} of missing keys, where each entry's mapping is missing from CachingTier.
* It is expected that the function should return an {@link Iterable} of {@link java.util.Map.Entry} key/value pairs containing an entry for each key that was passed to it.
* <p>
* Note: This method guarantees atomicity of computations for each individual key in {@code keys}. Implementations may choose to provide coarser grained atomicity.
*
* @param keys the keys to compute a new value for, if they're not in the store.
* @param mappingFunction the function that generates new values.
* @return a {@code Map} of key/value pairs for each key in <code>keys</code>.
* @throws StoreAccessException when a failure occurs when accessing the store.
*/
Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>>> mappingFunction) throws StoreAccessException;

/**
* Caching tier invalidation listener.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,13 @@ public Map<K, ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Functio
}
return map;
}

@Override
public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
Map<K, ValueHolder<V>> map = new HashMap<>();
for(K key : keys) {
map.put(key, null);
}
return map.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.ehcache.config.Eviction.noAdvice;
import static org.ehcache.core.config.ExpiryUtils.isExpiryDurationInfinite;
Expand Down Expand Up @@ -956,6 +958,40 @@ public void setInvalidationListener(InvalidationListener<K, V> providedInvalidat
};
}

@Override
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Entry<? extends K, ? extends ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
Map<K, ValueHolder<V>> result = new HashMap<>();
Set<K> missingKeys = new HashSet<>();

for (K key : keys) {
ValueHolder<V> cachingFetch = get(key);
if (null == cachingFetch) {
missingKeys.add(key);
} else {
result.put(key, cachingFetch);
}
}

try {
List<? extends Entry<? extends K, ? extends ValueHolder<V>>> fetchedEntries =
StreamSupport.stream(mappingFunction.apply(missingKeys).spliterator(), false)
.filter(e -> missingKeys.contains(e.getKey()))
.collect(Collectors.toList());

long availableSize = capacity - result.size();
for (Entry<? extends K, ? extends ValueHolder<V>> entry : fetchedEntries) {
// populating AuthoritativeTier entries to TieredStore for getAll()
if(availableSize-- > 0){
getOrComputeIfAbsent(entry.getKey(), keyParam -> entry.getValue());
}
result.put(entry.getKey(), entry.getValue());
}
return result;
} catch (RuntimeException re) {
throw new StoreAccessException(re);
}
}

@Override
public void invalidateAllWithHash(long hash) {
invalidateAllWithHashObserver.begin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,25 @@ public ValueHolder<V> getAndFault(K key) throws StoreAccessException {
return mappedValue;
}

public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
Map<K, ValueHolder<V>> result = new HashMap<>();
for (K key : keys) {
checkKey(key);
Function<K, V> function = k -> {
java.util.Iterator<? extends Map.Entry<? extends K, ? extends V>> iterator = mappingFunction.apply(Collections.singleton(k)).iterator();
Map.Entry<? extends K, ? extends V> result1 = iterator.next();
if (result1 != null) {
checkKey(result1.getKey());
return result1.getValue();
} else {
return null;
}
};
result.put(key, computeIfAbsentAndFault(key, function));
}
return result.entrySet();
}

@Override
public ValueHolder<V> computeIfAbsentAndFault(K key, Function<? super K, ? extends V> mappingFunction) throws StoreAccessException {
return internalComputeIfAbsent(key, mappingFunction, true, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -200,6 +202,38 @@ public void setInvalidationListener(InvalidationListener<K, V> invalidationListe
lower.setInvalidationListener(invalidationListener);
}

@Override
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
try {
return higher.bulkGetOrComputeIfAbsent(keys, keyParam -> {
try {
Map<K, Store.ValueHolder<V>> result = new HashMap<>();
Set<K> missingKeys = new HashSet<>();

for (K key : keys) {
Store.ValueHolder<V> cachingFetch = lower.getAndRemove(key);
if (null == cachingFetch) {
missingKeys.add(key);
} else {
result.put(key, cachingFetch);
}
}

Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>> fetchedEntries = mappingFunction.apply(missingKeys);
for (Map.Entry<? extends K, ? extends Store.ValueHolder<V>> entry : fetchedEntries) {
result.put(entry.getKey(), entry.getValue());
}
return result.entrySet();

} catch (StoreAccessException cae) {
throw new ComputationException(cae);
}
});
} catch (ComputationException ce) {
throw ce.getStoreAccessException();
}
}

@Override
public List<CacheConfigurationChangeListener> getConfigurationChangeListeners() {
List<CacheConfigurationChangeListener> listeners = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -356,13 +358,18 @@ public Map<K, ValueHolder<V>> bulkCompute(Set<? extends K> keys, Function<Iterab
}

@Override
public Map<K, ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
public Map<K, Store.ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {

try {
return authoritativeTier.bulkComputeIfAbsent(keys, mappingFunction);
} finally {
for (K key : keys) {
cachingTier().invalidate(key);
}
return cachingTier().bulkGetOrComputeIfAbsent(keys, missingKeys -> {
try {
return authoritativeTier.bulkComputeIfAbsentAndFault(missingKeys, mappingFunction);
} catch (StoreAccessException cae) {
throw new StorePassThroughException(cae);
}
});
} catch (StoreAccessException ce) {
return handleStoreAccessException(ce);
}
}

Expand All @@ -379,7 +386,7 @@ private CachingTier<K, V> cachingTier() {
return cachingTierRef.get();
}

private ValueHolder<V> handleStoreAccessException(StoreAccessException ce) throws StoreAccessException {
private <R> R handleStoreAccessException(StoreAccessException ce) throws StoreAccessException {
Throwable cause = ce.getCause();
if (cause instanceof StorePassThroughException) {
throw (StoreAccessException) cause.getCause();
Expand Down Expand Up @@ -604,6 +611,15 @@ public void setInvalidationListener(final InvalidationListener<K, V> invalidatio
// noop
}

@Override
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
Map<K, ValueHolder<V>> map = new HashMap<>();
for(K key : keys) {
map.put(key, null);
}
return map;
}

@Override
public void invalidateAllWithHash(long hash) {
// noop
Expand Down
Loading

0 comments on commit 5f43856

Please sign in to comment.