@@ -192,21 +192,29 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean
192
192
193
193
Assert .notNull (action , "Callback object must not be null" );
194
194
195
- return Flux .usingWhen (Mono .fromSupplier (() -> {
196
-
197
- ReactiveRedisConnectionFactory factory = getConnectionFactory ();
198
- ReactiveRedisConnection conn = factory .getReactiveConnection ();
199
- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
200
-
201
- return (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
202
- }), conn -> {
195
+ return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
203
196
Publisher <T > result = action .doInRedis (conn );
204
197
205
198
return postProcessResult (result , conn , false );
206
199
207
200
}, ReactiveRedisConnection ::closeLater );
208
201
}
209
202
203
+ /**
204
+ * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
205
+ * the default behaviour.
206
+ *
207
+ * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
208
+ * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
209
+ */
210
+ protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
211
+ ReactiveRedisConnectionFactory factory = getConnectionFactory ();
212
+ ReactiveRedisConnection conn = factory .getReactiveConnection ();
213
+ ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
214
+
215
+ return Mono .just (exposeConnection ? connToUse : createRedisConnectionProxy (connToUse ));
216
+ }
217
+
210
218
/*
211
219
* (non-Javadoc)
212
220
* @see org.springframework.data.redis.core.ReactiveRedisOperations#convertAndSend(java.lang.String, java.lang.Object)
0 commit comments