Skip to content

Commit 0dc89b8

Browse files
mp911dechristophstrobl
authored andcommitted
Add support for configurable batch strategies using RedisCache.
We now support a configurable BatchStrategy for RedisCache. The implementations consist of KEYS (default) and SCAN. Since SCAN is not supported with Jedis Cluster and SCAN requires a batch size, we default to KEYS. Closes: #1721 Original Pull Request: #2051
1 parent a525fc1 commit 0dc89b8

File tree

7 files changed

+258
-24
lines changed

7 files changed

+258
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.cache;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.NoSuchElementException;
23+
import java.util.Optional;
24+
25+
import org.springframework.data.redis.connection.RedisConnection;
26+
import org.springframework.data.redis.core.Cursor;
27+
import org.springframework.data.redis.core.ScanOptions;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* Batch strategies to be used with {@link RedisCacheWriter}.
32+
* <p/>
33+
* Primarily used to clear the cache.
34+
*
35+
* @author Mark Paluch
36+
* @since 2.6
37+
*/
38+
public abstract class BatchStrategy {
39+
40+
/**
41+
* Batching strategy using a single {@code KEYS} and {@code DEL} command to remove all matching keys. {@code KEYS}
42+
* scans the entire keyspace of the Redis database and can block the Redis worker thread for a long time when the
43+
* keyspace has a significant size.
44+
* <p/>
45+
* {@code KEYS} is supported for standalone and clustered (sharded) Redis operation modes.
46+
*
47+
* @return batching strategy using {@code KEYS}.
48+
*/
49+
public static BatchStrategy keys() {
50+
return Keys.INSTANCE;
51+
}
52+
53+
/**
54+
* Batching strategy using a {@code SCAN} cursors and potentially multiple {@code DEL} commands to remove all matching
55+
* keys. This strategy allows a configurable batch size to optimize for scan batching.
56+
* <p/>
57+
* Note that using the {@code SCAN} strategy might be not supported on all drivers and Redis operation modes.
58+
*
59+
* @return batching strategy using {@code SCAN}.
60+
*/
61+
public static BatchStrategy scan(int batchSize) {
62+
63+
Assert.isTrue(batchSize > 0, "Batch size must be greater than zero");
64+
65+
return new Scan(batchSize);
66+
}
67+
68+
/**
69+
* Remove all keys following the given pattern.
70+
*
71+
* @param the connection to use.
72+
* @param name The cache name must not be {@literal null}.
73+
* @param pattern The pattern for the keys to remove. Must not be {@literal null}.
74+
* @return number of removed keys.
75+
*/
76+
abstract int cleanCache(RedisConnection connection, String name, byte[] pattern);
77+
78+
/**
79+
* {@link BatchStrategy} using {@code KEYS}.
80+
*/
81+
static class Keys extends BatchStrategy {
82+
83+
static Keys INSTANCE = new Keys();
84+
85+
@Override
86+
int cleanCache(RedisConnection connection, String name, byte[] pattern) {
87+
88+
byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
89+
.toArray(new byte[0][]);
90+
91+
if (keys.length > 0) {
92+
connection.del(keys);
93+
}
94+
95+
return keys.length;
96+
}
97+
}
98+
99+
/**
100+
* {@link BatchStrategy} using {@code SCAN}.
101+
*/
102+
static class Scan extends BatchStrategy {
103+
104+
private final int batchSize;
105+
106+
public Scan(int batchSize) {
107+
this.batchSize = batchSize;
108+
}
109+
110+
@Override
111+
int cleanCache(RedisConnection connection, String name, byte[] pattern) {
112+
113+
Cursor<byte[]> cursor = connection.scan(ScanOptions.scanOptions().count(batchSize).match(pattern).build());
114+
115+
PartitionIterator<byte[]> partitions = new PartitionIterator<>(cursor, batchSize);
116+
117+
int count = 0;
118+
119+
while (partitions.hasNext()) {
120+
121+
List<byte[]> keys = partitions.next();
122+
count += keys.size();
123+
124+
if (keys.size() > 0) {
125+
connection.del(keys.toArray(new byte[0][]));
126+
}
127+
}
128+
129+
return count;
130+
}
131+
}
132+
133+
/**
134+
* Utility to split and buffer outcome from a {@link Iterator} into {@link List lists} of {@code T} with a maximum
135+
* chunks {@code size}.
136+
*
137+
* @param <T>
138+
*/
139+
static class PartitionIterator<T> implements Iterator<List<T>> {
140+
141+
private final Iterator<T> iterator;
142+
private final int size;
143+
144+
public PartitionIterator(Iterator<T> iterator, int size) {
145+
this.iterator = iterator;
146+
this.size = size;
147+
}
148+
149+
@Override
150+
public boolean hasNext() {
151+
return iterator.hasNext();
152+
}
153+
154+
@Override
155+
public List<T> next() {
156+
157+
if (!hasNext()) {
158+
throw new NoSuchElementException();
159+
}
160+
161+
List<T> list = new ArrayList<>(size);
162+
while (list.size() < size && iterator.hasNext()) {
163+
list.add(iterator.next());
164+
}
165+
166+
return list;
167+
}
168+
}
169+
170+
}

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import java.nio.charset.StandardCharsets;
1919
import java.time.Duration;
20-
import java.util.Collections;
21-
import java.util.Optional;
2220
import java.util.concurrent.TimeUnit;
2321
import java.util.function.Consumer;
2422
import java.util.function.Function;
@@ -53,39 +51,45 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
5351
private final RedisConnectionFactory connectionFactory;
5452
private final Duration sleepTime;
5553
private final CacheStatisticsCollector statistics;
54+
private final BatchStrategy batchStrategy;
5655

5756
/**
5857
* @param connectionFactory must not be {@literal null}.
58+
* @param batchStrategy must not be {@literal null}.
5959
*/
60-
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory) {
61-
this(connectionFactory, Duration.ZERO);
60+
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, BatchStrategy batchStrategy) {
61+
this(connectionFactory, Duration.ZERO, batchStrategy);
6262
}
6363

6464
/**
6565
* @param connectionFactory must not be {@literal null}.
6666
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
6767
* to disable locking.
68+
* @param batchStrategy must not be {@literal null}.
6869
*/
69-
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime) {
70-
this(connectionFactory, sleepTime, CacheStatisticsCollector.none());
70+
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
71+
this(connectionFactory, sleepTime, CacheStatisticsCollector.none(), batchStrategy);
7172
}
7273

7374
/**
7475
* @param connectionFactory must not be {@literal null}.
7576
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
7677
* to disable locking.
7778
* @param cacheStatisticsCollector must not be {@literal null}.
79+
* @param batchStrategy must not be {@literal null}.
7880
*/
7981
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime,
80-
CacheStatisticsCollector cacheStatisticsCollector) {
82+
CacheStatisticsCollector cacheStatisticsCollector, BatchStrategy batchStrategy) {
8183

8284
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
8385
Assert.notNull(sleepTime, "SleepTime must not be null!");
8486
Assert.notNull(cacheStatisticsCollector, "CacheStatisticsCollector must not be null!");
87+
Assert.notNull(batchStrategy, "BatchStrategy must not be null!");
8588

8689
this.connectionFactory = connectionFactory;
8790
this.sleepTime = sleepTime;
8891
this.statistics = cacheStatisticsCollector;
92+
this.batchStrategy = batchStrategy;
8993
}
9094

9195
/*
@@ -213,13 +217,9 @@ public void clean(String name, byte[] pattern) {
213217
wasLocked = true;
214218
}
215219

216-
byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
217-
.toArray(new byte[0][]);
218220

219-
if (keys.length > 0) {
220-
statistics.incDeletesBy(name, keys.length);
221-
connection.del(keys);
222-
}
221+
statistics.incDeletesBy(name, batchStrategy.cleanCache(connection, name, pattern));
222+
223223
} finally {
224224

225225
if (wasLocked && isLockingCacheWriter()) {
@@ -255,7 +255,7 @@ public void clearStatistics(String name) {
255255
*/
256256
@Override
257257
public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheStatisticsCollector) {
258-
return new DefaultRedisCacheWriter(connectionFactory, sleepTime, cacheStatisticsCollector);
258+
return new DefaultRedisCacheWriter(connectionFactory, sleepTime, cacheStatisticsCollector, this.batchStrategy);
259259
}
260260

261261
/**

src/main/java/org/springframework/data/redis/cache/RedisCacheManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public static RedisCacheManager create(RedisConnectionFactory connectionFactory)
186186

187187
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
188188

189-
return new RedisCacheManager(new DefaultRedisCacheWriter(connectionFactory),
189+
return new RedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory),
190190
RedisCacheConfiguration.defaultCacheConfig());
191191
}
192192

@@ -311,7 +311,7 @@ public static RedisCacheManagerBuilder fromConnectionFactory(RedisConnectionFact
311311

312312
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
313313

314-
return new RedisCacheManagerBuilder(new DefaultRedisCacheWriter(connectionFactory));
314+
return new RedisCacheManagerBuilder(RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory));
315315
}
316316

317317
/**

src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
* caching. <br />
2727
* The {@link RedisCacheWriter} may be shared by multiple cache implementations and is responsible for writing / reading
2828
* binary data to / from Redis. The implementation honors potential cache lock flags that might be set.
29+
* <p>
30+
* The default {@link RedisCacheWriter} implementation can be customized with {@link BatchStrategy} to tune performance
31+
* behavior.
2932
*
3033
* @author Christoph Strobl
3134
* @author Mark Paluch
@@ -40,10 +43,24 @@ public interface RedisCacheWriter extends CacheStatisticsProvider {
4043
* @return new instance of {@link DefaultRedisCacheWriter}.
4144
*/
4245
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
46+
return nonLockingRedisCacheWriter(connectionFactory, BatchStrategy.keys());
47+
}
48+
49+
/**
50+
* Create new {@link RedisCacheWriter} without locking behavior.
51+
*
52+
* @param connectionFactory must not be {@literal null}.
53+
* @param batchStrategy must not be {@literal null}.
54+
* @return new instance of {@link DefaultRedisCacheWriter}.
55+
* @since 2.6
56+
*/
57+
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
58+
BatchStrategy batchStrategy) {
4359

4460
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
61+
Assert.notNull(batchStrategy, "BatchStrategy must not be null!");
4562

46-
return new DefaultRedisCacheWriter(connectionFactory);
63+
return new DefaultRedisCacheWriter(connectionFactory, batchStrategy);
4764
}
4865

4966
/**
@@ -53,10 +70,23 @@ static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connec
5370
* @return new instance of {@link DefaultRedisCacheWriter}.
5471
*/
5572
static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
73+
return lockingRedisCacheWriter(connectionFactory, BatchStrategy.keys());
74+
}
75+
76+
/**
77+
* Create new {@link RedisCacheWriter} with locking behavior.
78+
*
79+
* @param connectionFactory must not be {@literal null}.
80+
* @param batchStrategy must not be {@literal null}.
81+
* @return new instance of {@link DefaultRedisCacheWriter}.
82+
* @since 2.6
83+
*/
84+
static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory,
85+
BatchStrategy batchStrategy) {
5686

5787
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
5888

59-
return new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50));
89+
return new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50), batchStrategy);
6090
}
6191

6292
/**

src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws Inte
306306

307307
Thread th = new Thread(() -> {
308308

309-
DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50)) {
309+
DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50),
310+
BatchStrategy.keys()) {
310311

311312
@Override
312313
boolean doCheckLock(String name, RedisConnection connection) {

src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ private RedisCache createCache() {
102102
cacheConfiguration = cacheConfiguration.disableCachingNullValues();
103103
}
104104

105-
return new RedisCache(CACHE_NAME, new DefaultRedisCacheWriter(connectionFactory), cacheConfiguration);
105+
return new RedisCache(CACHE_NAME, RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory),
106+
cacheConfiguration);
106107
}
107108

108109
protected Object getValue() {

0 commit comments

Comments
 (0)