Skip to content

Commit c196d7a

Browse files
committed
Adds ExecutorService to ClusterPipeline for improved thread management
This allows passing an ExecutorService when creating a ClusterPipeline. The previous parallelization approach for pipeline syncing/closing would create a new executor service for each sync operation, resulting in excessive thread creation and termination. On an EC2 m5.12xlarge instance with ~100k single writes/sec, this thread creation consumed 40% CPU and increased operation latency. The change also optimizes thread usage when no ExecutorService is provided. Previously, even a single pipeline within a multipipeline would create 3 threads for syncing. This improvement removes that overhead, though callers are encouraged to provide their own ExecutorService for optimal CPU usage and latency.
1 parent 8f8fe1f commit c196d7a

File tree

4 files changed

+82
-34
lines changed

4 files changed

+82
-34
lines changed

src/main/java/redis/clients/jedis/ClusterPipeline.java

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.time.Duration;
44
import java.util.Set;
5+
import java.util.concurrent.ExecutorService;
6+
57
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
68
import redis.clients.jedis.providers.ClusterConnectionProvider;
79
import redis.clients.jedis.util.IOUtils;
@@ -40,6 +42,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
4042
this.provider = provider;
4143
}
4244

45+
public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
46+
ExecutorService executorService) {
47+
super(commandObjects, executorService);
48+
this.provider = provider;
49+
}
50+
4351
private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
4452
ClusterCommandObjects cco = new ClusterCommandObjects();
4553
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);

src/main/java/redis/clients/jedis/JedisCluster.java

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Collections;
55
import java.util.Map;
66
import java.util.Set;
7+
import java.util.concurrent.ExecutorService;
78

89
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
910

@@ -379,6 +380,10 @@ public ClusterPipeline pipelined() {
379380
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
380381
}
381382

383+
public ClusterPipeline pipelined(ExecutorService executorService) {
384+
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, executorService);
385+
}
386+
382387
/**
383388
* @param doMulti param
384389
* @return nothing

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

+45-34
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package redis.clients.jedis;
22

3-
import java.util.Iterator;
43
import java.util.LinkedHashMap;
54
import java.util.LinkedList;
65
import java.util.List;
76
import java.util.Map;
87
import java.util.Queue;
9-
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ExecutionException;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.Executors;
1212

@@ -31,6 +31,7 @@ public abstract class MultiNodePipelineBase extends PipelineBase {
3131

3232
private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
3333
private final Map<HostAndPort, Connection> connections;
34+
private ExecutorService executorService;
3435
private volatile boolean syncing = false;
3536

3637
public MultiNodePipelineBase(CommandObjects commandObjects) {
@@ -39,6 +40,13 @@ public MultiNodePipelineBase(CommandObjects commandObjects) {
3940
connections = new LinkedHashMap<>();
4041
}
4142

43+
public MultiNodePipelineBase(CommandObjects commandObjects, ExecutorService executorService) {
44+
super(commandObjects);
45+
this.executorService = executorService;
46+
pipelinedResponses = new LinkedHashMap<>();
47+
connections = new LinkedHashMap<>();
48+
}
49+
4250
/**
4351
* Sub-classes must call this method, if graph commands are going to be used.
4452
* @param connectionProvider connection provider
@@ -96,44 +104,47 @@ public final void sync() {
96104
return;
97105
}
98106
syncing = true;
99-
100-
ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
101-
102-
CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
103-
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
104-
= pipelinedResponses.entrySet().iterator();
105-
while (pipelinedResponsesIterator.hasNext()) {
106-
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
107-
HostAndPort nodeKey = entry.getKey();
108-
Queue<Response<?>> queue = entry.getValue();
109-
Connection connection = connections.get(nodeKey);
110-
executorService.submit(() -> {
111-
try {
112-
List<Object> unformatted = connection.getMany(queue.size());
113-
for (Object o : unformatted) {
114-
queue.poll().set(o);
115-
}
116-
} catch (JedisConnectionException jce) {
117-
log.error("Error with connection to " + nodeKey, jce);
118-
// cleanup the connection
119-
pipelinedResponsesIterator.remove();
120-
connections.remove(nodeKey);
121-
IOUtils.closeQuietly(connection);
122-
} finally {
123-
countDownLatch.countDown();
124-
}
125-
});
126-
}
127-
107+
ExecutorService executorService = getExecutorService();
108+
CompletableFuture[] futures
109+
= pipelinedResponses.entrySet().stream()
110+
.map(e -> CompletableFuture.runAsync(() -> closeConnection(e), executorService))
111+
.toArray(CompletableFuture[]::new);
112+
CompletableFuture awaitAllCompleted = CompletableFuture.allOf(futures);
128113
try {
129-
countDownLatch.await();
114+
awaitAllCompleted.get();
115+
if (executorService != this.executorService) {
116+
executorService.shutdown();
117+
}
118+
} catch (ExecutionException e) {
119+
log.error("Failed execution.", e);
130120
} catch (InterruptedException e) {
131121
log.error("Thread is interrupted during sync.", e);
122+
Thread.currentThread().interrupt();
132123
}
124+
syncing = false;
125+
}
133126

134-
executorService.shutdownNow();
127+
private ExecutorService getExecutorService() {
128+
if (executorService == null) {
129+
return Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS));
130+
}
131+
return executorService;
132+
}
135133

136-
syncing = false;
134+
private void closeConnection(Map.Entry<HostAndPort, Queue<Response<?>>> entry) {
135+
HostAndPort nodeKey = entry.getKey();
136+
Queue<Response<?>> queue = entry.getValue();
137+
Connection connection = connections.get(nodeKey);
138+
try {
139+
List<Object> unformatted = connection.getMany(queue.size());
140+
for (Object o : unformatted) {
141+
queue.poll().set(o);
142+
}
143+
} catch (JedisConnectionException jce) {
144+
log.error("Error with connection to " + nodeKey, jce);
145+
connections.remove(nodeKey);
146+
IOUtils.closeQuietly(connection);
147+
}
137148
}
138149

139150
@Deprecated

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS;
77

88
import java.util.*;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.ThreadPoolExecutor;
912

1013
import org.hamcrest.MatcherAssert;
1114
import org.hamcrest.Matchers;
@@ -1081,6 +1084,27 @@ public void transaction() {
10811084
}
10821085
}
10831086

1087+
@Test(timeout = 10_000L)
1088+
public void pipelineMergingWithExecutorService() {
1089+
final int maxTotal = 100;
1090+
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
1091+
poolConfig.setMaxTotal(maxTotal);
1092+
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
1093+
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
1094+
ClusterPipeline pipeline = cluster.pipelined(executorService);
1095+
for (int i = 0; i < maxTotal; i++) {
1096+
String s = Integer.toString(i);
1097+
pipeline.set(s, s);
1098+
}
1099+
pipeline.close();
1100+
// The sync results in one pipeline per node needing closing.
1101+
assertEquals(nodes.size(), executorService.getTaskCount());
1102+
assertFalse(executorService.isShutdown());
1103+
} finally {
1104+
executorService.shutdown();
1105+
}
1106+
}
1107+
10841108
@Test(timeout = 10_000L)
10851109
public void multiple() {
10861110
final int maxTotal = 100;

0 commit comments

Comments
 (0)