58
58
*
59
59
* @author Mark Paluch
60
60
* @author Christoph Strobl
61
+ * @author Petromir Dzhunev
61
62
* @since 2.0
62
63
* @param <K> the Redis key type against which the template works (usually a String)
63
64
* @param <V> the Redis value type against which the template works
@@ -232,7 +233,14 @@ <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeC
232
233
233
234
Assert .notNull (action , "Callback object must not be null" );
234
235
235
- return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
236
+ Mono <ReactiveRedisConnection > connection = getConnection ();
237
+
238
+ if (!exposeConnection ) {
239
+ connection = connection .map (this ::createRedisConnectionProxy );
240
+ }
241
+
242
+ return Flux .usingWhen (connection , conn -> {
243
+
236
244
Publisher <T > result = action .doInRedis (conn );
237
245
238
246
return postProcessResult (result , conn , false );
@@ -241,18 +249,16 @@ <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeC
241
249
}
242
250
243
251
/**
244
- * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
245
- * the default behaviour .
252
+ * Creates a {@link Mono} which emits a new {@link ReactiveRedisConnection}. Can be overridden in subclasses to
253
+ * provide a different mechanism for connection allocation for the given method .
246
254
*
247
- * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
248
- * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
255
+ * @since 2.5.5
249
256
*/
250
- protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
257
+ protected Mono <ReactiveRedisConnection > getConnection () {
258
+
251
259
ReactiveRedisConnectionFactory factory = getConnectionFactory ();
252
- ReactiveRedisConnection conn = factory .getReactiveConnection ();
253
- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
254
260
255
- return Mono .just ( exposeConnection ? connToUse : createRedisConnectionProxy ( connToUse ));
261
+ return Mono .fromSupplier (() -> preProcessConnection ( factory . getReactiveConnection (), false ));
256
262
}
257
263
258
264
/*
0 commit comments