58
58
*
59
59
* @author Mark Paluch
60
60
* @author Christoph Strobl
61
+ * @author Petromir Dzhunev
61
62
* @since 2.0
62
63
* @param <K> the Redis key type against which the template works (usually a String)
63
64
* @param <V> the Redis value type against which the template works
@@ -192,7 +193,14 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean
192
193
193
194
Assert .notNull (action , "Callback object must not be null" );
194
195
195
- return Flux .usingWhen (getConnection (exposeConnection ), conn -> {
196
+ Mono <ReactiveRedisConnection > connection = getConnection ();
197
+
198
+ if (!exposeConnection ) {
199
+ connection = connection .map (this ::createRedisConnectionProxy );
200
+ }
201
+
202
+ return Flux .usingWhen (connection , conn -> {
203
+
196
204
Publisher <T > result = action .doInRedis (conn );
197
205
198
206
return postProcessResult (result , conn , false );
@@ -201,18 +209,16 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean
201
209
}
202
210
203
211
/**
204
- * Creates a Mono which generates a new connection. The successors of {@link ReactiveRedisTemplate} might override
205
- * the default behaviour .
212
+ * Creates a {@link Mono} which emits a new {@link ReactiveRedisConnection}. Can be overridden in subclasses to
213
+ * provide a different mechanism for connection allocation for the given method .
206
214
*
207
- * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
208
- * return a {@link Mono} wrapping the {@link ReactiveRedisConnection}.
215
+ * @since 2.5.5
209
216
*/
210
- protected Mono <ReactiveRedisConnection > getConnection (boolean exposeConnection ) {
217
+ protected Mono <ReactiveRedisConnection > getConnection () {
218
+
211
219
ReactiveRedisConnectionFactory factory = getConnectionFactory ();
212
- ReactiveRedisConnection conn = factory .getReactiveConnection ();
213
- ReactiveRedisConnection connToUse = preProcessConnection (conn , false );
214
220
215
- return Mono .just ( exposeConnection ? connToUse : createRedisConnectionProxy ( connToUse ));
221
+ return Mono .fromSupplier (() -> preProcessConnection ( factory . getReactiveConnection (), false ));
216
222
}
217
223
218
224
/*
0 commit comments