From 672e252be118ae0905b5e44044315a52bcadf963 Mon Sep 17 00:00:00 2001 From: chengchen Date: Fri, 10 Jul 2020 13:20:47 +0200 Subject: [PATCH] Provide an example of using scan as an option --- .../redis/cache/DefaultRedisCacheWriter.java | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 698f147788..0471502dcb 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -15,6 +15,7 @@ */ package org.springframework.data.redis.cache; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; @@ -27,9 +28,12 @@ import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands.SetOption; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.Expiration; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import reactor.core.publisher.Flux; /** * {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone} @@ -179,12 +183,31 @@ public void clean(String name, byte[] pattern) { wasLocked = true; } - byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) - .toArray(new byte[0][]); +// byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) +// .toArray(new byte[0][]); +// +// if (keys.length > 0) { +// connection.del(keys); +// } - if (keys.length > 0) { - connection.del(keys); + int batchSize = 1000; // maybe make it configurable + ScanOptions options = ScanOptions.scanOptions().match(new String(pattern)).build(); + + try (Cursor cursor = connection.scan(options)) { + while(cursor.hasNext()) { + byte[][] keys = new byte[batchSize][]; + + for(int i = 0; i < batchSize && cursor.hasNext(); i++) { + keys[i] = cursor.next(); + } + + connection.del(keys); + } + } catch (IOException e) { + // not quite sure about the exception convention in this project + throw new RuntimeException(e); } + } finally { if (wasLocked && isLockingCacheWriter()) { @@ -234,25 +257,15 @@ private boolean isLockingCacheWriter() { } private T execute(String name, Function callback) { - - RedisConnection connection = connectionFactory.getConnection(); - try { - + try (RedisConnection connection = connectionFactory.getConnection()) { checkAndPotentiallyWaitUntilUnlocked(name, connection); return callback.apply(connection); - } finally { - connection.close(); } } private void executeLockFree(Consumer callback) { - - RedisConnection connection = connectionFactory.getConnection(); - - try { + try (RedisConnection connection = connectionFactory.getConnection()) { callback.accept(connection); - } finally { - connection.close(); } }