diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index fba47e095d..3d462e50f8 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -192,14 +192,7 @@ private Publisher doInConnection(ReactiveRedisCallback action, boolean Assert.notNull(action, "Callback object must not be null"); - return Flux.usingWhen(Mono.fromSupplier(() -> { - - ReactiveRedisConnectionFactory factory = getConnectionFactory(); - ReactiveRedisConnection conn = factory.getReactiveConnection(); - ReactiveRedisConnection connToUse = preProcessConnection(conn, false); - - return (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); - }), conn -> { + return Flux.usingWhen(getConnection(exposeConnection), conn -> { Publisher result = action.doInRedis(conn); return postProcessResult(result, conn, false); @@ -207,6 +200,21 @@ private Publisher doInConnection(ReactiveRedisCallback action, boolean }, ReactiveRedisConnection::closeLater); } + /** + * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override + * the default behaviour. + * + * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code + * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}. + */ + protected Mono getConnection(boolean exposeConnection) { + ReactiveRedisConnectionFactory factory = getConnectionFactory(); + ReactiveRedisConnection conn = factory.getReactiveConnection(); + ReactiveRedisConnection connToUse = preProcessConnection(conn, false); + + return Mono.just(exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); + } + /* * (non-Javadoc) * @see org.springframework.data.redis.core.ReactiveRedisOperations#convertAndSend(java.lang.String, java.lang.Object)