@@ -232,21 +232,29 @@ <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeC
232
232
233
233
Assert .notNull (action , "Callback object must not be null" );
234
234
235
- return Flux .usingWhen (Mono .fromSupplier (() -> {
236
-
237
- ReactiveRedisConnectionFactory factory = getConnectionFactory ();
238
- ReactiveRedisConnection conn = factory .getReactiveConnection ();
239
- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
240
-
241
- return (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
242
- }), conn -> {
235
+ return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
243
236
Publisher <T > result = action .doInRedis (conn );
244
237
245
238
return postProcessResult (result , conn , false );
246
239
247
240
}, ReactiveRedisConnection ::closeLater );
248
241
}
249
242
243
+ /**
244
+ * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
245
+ * the default behaviour.
246
+ *
247
+ * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
248
+ * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
249
+ */
250
+ protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
251
+ ReactiveRedisConnectionFactory factory = getConnectionFactory ();
252
+ ReactiveRedisConnection conn = factory .getReactiveConnection ();
253
+ ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
254
+
255
+ return Mono .just (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
256
+ }
257
+
250
258
/*
251
259
* (non-Javadoc)
252
260
* @see org.springframework.data.redis.core.ReactiveRedisOperations#convertAndSend(java.lang.String, java.lang.Object)
0 commit comments