credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
- ClientSetInfoConfig clientSetInfoConfig) {
+ ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForRedisClusterReplicas) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
@@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
+ this.readOnlyForRedisClusterReplicas = readOnlyForRedisClusterReplicas;
}
@Override
@@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}
+ @Override
+ public boolean isReadOnlyForRedisClusterReplicas() {
+ return readOnlyForRedisClusterReplicas;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -149,6 +157,8 @@ public static class Builder {
private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;
+ private boolean readOnlyForRedisClusterReplicas = false;
+
private Builder() {
}
@@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() {
return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
- sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
+ sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig,
+ readOnlyForRedisClusterReplicas);
}
/**
@@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}
+
+ public Builder readOnlyForRedisClusterReplicas() {
+ this.readOnlyForRedisClusterReplicas = true;
+ return this;
+ }
}
public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
@@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
- clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
+ clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null,
+ false);
}
public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
@@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
- copy.getClientSetInfoConfig());
+ copy.getClientSetInfoConfig(), copy.isReadOnlyForRedisClusterReplicas());
}
}
diff --git a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
index a2d963e2214..0d41693d0fd 100644
--- a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
+++ b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
@@ -60,7 +60,7 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce
Collections.shuffle(hosts);
}
- JedisConnectionException jce = new JedisConnectionException("Failed to connect to any host resolved for DNS name.");
+ JedisConnectionException jce = new JedisConnectionException("Failed to connect to " + hostAndPort + ".");
for (InetAddress host : hosts) {
try {
Socket socket = new Socket();
@@ -94,11 +94,13 @@ public Socket createSocket() throws JedisConnectionException {
if (null == _sslSocketFactory) {
_sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
}
+ Socket plainSocket = socket;
socket = _sslSocketFactory.createSocket(socket, _hostAndPort.getHost(), _hostAndPort.getPort(), true);
if (null != sslParameters) {
((SSLSocket) socket).setSSLParameters(sslParameters);
}
+ socket = new SSLSocketWrapper((SSLSocket) socket, plainSocket);
if (null != hostnameVerifier
&& !hostnameVerifier.verify(_hostAndPort.getHost(), ((SSLSocket) socket).getSession())) {
diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java
index 66d64fa6eb0..750eccf5531 100644
--- a/src/main/java/redis/clients/jedis/Jedis.java
+++ b/src/main/java/redis/clients/jedis/Jedis.java
@@ -3431,7 +3431,7 @@ public String shutdownAbort() {
* All the fields are in the form field:value
*
*
- * edis_version:0.07
+ * redis_version:0.07
* connected_clients:1
* connected_slaves:0
* used_memory:3187
diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java
index 0ad6e979f61..abe1f352376 100644
--- a/src/main/java/redis/clients/jedis/JedisClientConfig.java
+++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java
@@ -58,7 +58,7 @@ default String getClientName() {
}
/**
- * @return true
- to create a TLS connection. false
- otherwise.
+ * @return {@code true} - to create TLS connection(s). {@code false} - otherwise.
*/
default boolean isSsl() {
return false;
@@ -80,6 +80,17 @@ default HostAndPortMapper getHostAndPortMapper() {
return null;
}
+ /**
+ * Execute READONLY command to connections.
+ *
+ * READONLY command is specific to Redis Cluster replica nodes. So this config param is only
+ * intended for Redis Cluster connections.
+ * @return {@code true} - to execute READONLY command to connection(s). {@code false} - otherwise.
+ */
+ default boolean isReadOnlyForRedisClusterReplicas() {
+ return false;
+ }
+
/**
* Modify the behavior of internally executing CLIENT SETINFO command.
* @return CLIENT SETINFO config
diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java
index 6c5843c16ed..db8d17ee158 100644
--- a/src/main/java/redis/clients/jedis/JedisCluster.java
+++ b/src/main/java/redis/clients/jedis/JedisCluster.java
@@ -7,7 +7,12 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.annots.Experimental;
+import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
+import redis.clients.jedis.csc.Cache;
+import redis.clients.jedis.csc.CacheConfig;
+import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.util.JedisClusterCRC16;
public class JedisCluster extends UnifiedJedis {
@@ -18,16 +23,38 @@ public class JedisCluster extends UnifiedJedis {
* Default timeout in milliseconds.
*/
public static final int DEFAULT_TIMEOUT = 2000;
+
+ /**
+ * Default amount of attempts for executing a command
+ */
public static final int DEFAULT_MAX_ATTEMPTS = 5;
+ /**
+ * Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster.
+ * Here, the default timeout of {@value JedisCluster#DEFAULT_TIMEOUT} ms is being used with {@value JedisCluster#DEFAULT_MAX_ATTEMPTS} maximum attempts.
+ * @param node Node to first connect to.
+ */
public JedisCluster(HostAndPort node) {
this(Collections.singleton(node));
}
+ /**
+ * Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster.
+ * Here, the default timeout of {@value JedisCluster#DEFAULT_TIMEOUT} ms is being used with {@value JedisCluster#DEFAULT_MAX_ATTEMPTS} maximum attempts.
+ * @param node Node to first connect to.
+ * @param timeout connection and socket timeout in milliseconds.
+ */
public JedisCluster(HostAndPort node, int timeout) {
this(Collections.singleton(node), timeout);
}
+ /**
+ * Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster.
+ * You can specify the timeout and the maximum attempts.
+ * @param node Node to first connect to.
+ * @param timeout connection and socket timeout in milliseconds.
+ * @param maxAttempts maximum attempts for executing a command.
+ */
public JedisCluster(HostAndPort node, int timeout, int maxAttempts) {
this(Collections.singleton(node), timeout, maxAttempts);
}
@@ -88,14 +115,32 @@ public JedisCluster(HostAndPort node, final JedisClientConfig clientConfig, int
this(Collections.singleton(node), clientConfig, maxAttempts, poolConfig);
}
+ /**
+ * Creates a JedisCluster with multiple entry points.
+ * Here, the default timeout of {@value JedisCluster#DEFAULT_TIMEOUT} ms is being used with {@value JedisCluster#DEFAULT_MAX_ATTEMPTS} maximum attempts.
+ * @param nodes Nodes to connect to.
+ */
public JedisCluster(Set nodes) {
this(nodes, DEFAULT_TIMEOUT);
}
+ /**
+ * Creates a JedisCluster with multiple entry points.
+ * Here, the default timeout of {@value JedisCluster#DEFAULT_TIMEOUT} ms is being used with {@value JedisCluster#DEFAULT_MAX_ATTEMPTS} maximum attempts.
+ * @param nodes Nodes to connect to.
+ * @param timeout connection and socket timeout in milliseconds.
+ */
public JedisCluster(Set nodes, int timeout) {
this(nodes, DefaultJedisClientConfig.builder().timeoutMillis(timeout).build());
}
+ /**
+ * Creates a JedisCluster with multiple entry points.
+ * You can specify the timeout and the maximum attempts.
+ * @param nodes Nodes to connect to.
+ * @param timeout connection and socket timeout in milliseconds.
+ * @param maxAttempts maximum attempts for executing a command.
+ */
public JedisCluster(Set nodes, int timeout, int maxAttempts) {
this(nodes, DefaultJedisClientConfig.builder().timeoutMillis(timeout).build(), maxAttempts);
}
@@ -181,6 +226,19 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts));
}
+ /**
+ * Creates a JedisCluster with multiple entry points.
+ * You can specify the timeout and the maximum attempts.
+ *
+ * Additionally, you are free to provide a {@link JedisClientConfig} instance.
+ * You can use the {@link DefaultJedisClientConfig#builder()} builder pattern to customize your configuration, including socket timeouts,
+ * username and passwords as well as SSL related parameters.
+ *
+ * @param clusterNodes Nodes to connect to.
+ * @param clientConfig Client configuration parameters.
+ * @param maxAttempts maximum attempts for executing a command.
+ * @param maxTotalRetriesDuration Maximum time used for reconnecting.
+ */
public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration,
@@ -198,6 +256,12 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
+ Duration maxTotalRetriesDuration, GenericObjectPoolConfig poolConfig) {
+ this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
+ clientConfig.getRedisProtocol());
+ }
+
public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
@@ -205,15 +269,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}
- public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
- Duration maxTotalRetriesDuration, GenericObjectPoolConfig poolConfig) {
- this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
- clientConfig.getRedisProtocol());
- }
-
// Uses a fetched connection to process protocol. Should be avoided if possible.
- public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
- Duration maxTotalRetriesDuration) {
+ public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
}
@@ -222,10 +279,67 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}
+ @Experimental
+ public JedisCluster(Set hnp, JedisClientConfig jedisClientConfig, CacheConfig cacheConfig) {
+ this(hnp, jedisClientConfig, CacheFactory.getCache(cacheConfig));
+ }
+
+ @Experimental
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache) {
+ this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
+ Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
+ }
+
+ @Experimental
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
+ int maxAttempts, Duration maxTotalRetriesDuration) {
+ this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts,
+ maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
+ }
+
+ @Experimental
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
+ int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig poolConfig) {
+ this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
+ maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
+ }
+
+ @Experimental
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
+ GenericObjectPoolConfig poolConfig) {
+ this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
+ DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
+ clientConfig.getRedisProtocol(), clientSideCache);
+ }
+
+ @Experimental
+ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
+ GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
+ Duration maxTotalRetriesDuration) {
+ this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
+ maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
+ }
+
+ @Experimental
+ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
+ RedisProtocol protocol, Cache clientSideCache) {
+ super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
+ }
+
+ /**
+ * Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).
+ * Key is the HOST:PORT and the value is the connection pool.
+ * @return the map of all connections.
+ */
public Map getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
+ /**
+ * Returns the connection for one of the 16,384 slots.
+ * @param slot the slot to retrieve the connection for.
+ * @return connection of the provided slot. {@code close()} of this connection must be called after use.
+ */
public Connection getConnectionFromSlot(int slot) {
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
}
@@ -266,4 +380,11 @@ public ClusterPipeline pipelined() {
public AbstractTransaction transaction(boolean doMulti) {
throw new UnsupportedOperationException();
}
+
+ public final T executeCommandToReplica(CommandObject commandObject) {
+ if (!(executor instanceof ClusterCommandExecutor)) {
+ throw new UnsupportedOperationException("Support only execute to replica in ClusterCommandExecutor");
+ }
+ return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject);
+ }
}
diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
index da88462ef49..ec63c5206ad 100644
--- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
+++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
@@ -23,7 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
+import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;
@@ -38,6 +40,7 @@ public class JedisClusterInfoCache {
private final Map nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
+ private final List[] replicaSlots;
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
@@ -46,6 +49,7 @@ public class JedisClusterInfoCache {
private final GenericObjectPoolConfig poolConfig;
private final JedisClientConfig clientConfig;
+ private final Cache clientSideCache;
private final Set startNodes;
private static final int MASTER_NODE_INDEX = 2;
@@ -65,19 +69,39 @@ public void run() {
}
public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) {
- this(clientConfig, null, startNodes);
+ this(clientConfig, null, null, startNodes);
+ }
+
+ @Experimental
+ public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
+ final Set startNodes) {
+ this(clientConfig, clientSideCache, null, startNodes);
}
public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig poolConfig, final Set startNodes) {
- this(clientConfig, poolConfig, startNodes, null);
+ this(clientConfig, null, poolConfig, startNodes);
+ }
+
+ @Experimental
+ public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
+ final GenericObjectPoolConfig poolConfig, final Set startNodes) {
+ this(clientConfig, clientSideCache, poolConfig, startNodes, null);
}
public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig poolConfig, final Set startNodes,
final Duration topologyRefreshPeriod) {
+ this(clientConfig, null, poolConfig, startNodes, topologyRefreshPeriod);
+ }
+
+ @Experimental
+ public JedisClusterInfoCache(final JedisClientConfig clientConfig, Cache clientSideCache,
+ final GenericObjectPoolConfig poolConfig, final Set startNodes,
+ final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
+ this.clientSideCache = clientSideCache;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
@@ -85,6 +109,11 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
+ if (clientConfig.isReadOnlyForRedisClusterReplicas()) {
+ replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
+ } else {
+ replicaSlots = null;
+ }
}
/**
@@ -144,6 +173,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
+ } else if (clientConfig.isReadOnlyForRedisClusterReplicas()) {
+ assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
@@ -213,6 +244,9 @@ private void discoverClusterSlots(Connection jedis) {
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
+ if (clientSideCache != null) {
+ clientSideCache.flush();
+ }
Set hostAndPortKeys = new HashSet<>();
for (Object slotInfoObj : slotsInfo) {
@@ -236,6 +270,8 @@ private void discoverClusterSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
+ } else if (clientConfig.isReadOnlyForRedisClusterReplicas()) {
+ assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
@@ -274,8 +310,7 @@ public ConnectionPool setupNodeIfNotExist(final HostAndPort node) {
ConnectionPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;
- ConnectionPool nodePool = poolConfig == null ? new ConnectionPool(node, clientConfig)
- : new ConnectionPool(node, clientConfig, poolConfig);
+ ConnectionPool nodePool = createNodePool(node);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
@@ -283,6 +318,22 @@ public ConnectionPool setupNodeIfNotExist(final HostAndPort node) {
}
}
+ private ConnectionPool createNodePool(HostAndPort node) {
+ if (poolConfig == null) {
+ if (clientSideCache == null) {
+ return new ConnectionPool(node, clientConfig);
+ } else {
+ return new ConnectionPool(node, clientConfig, clientSideCache);
+ }
+ } else {
+ if (clientSideCache == null) {
+ return new ConnectionPool(node, clientConfig, poolConfig);
+ } else {
+ return new ConnectionPool(node, clientConfig, clientSideCache, poolConfig);
+ }
+ }
+ }
+
public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
@@ -307,6 +358,21 @@ public void assignSlotsToNode(List targetSlots, HostAndPort targetNode)
}
}
+ public void assignSlotsToReplicaNode(List targetSlots, HostAndPort targetNode) {
+ w.lock();
+ try {
+ ConnectionPool targetPool = setupNodeIfNotExist(targetNode);
+ for (Integer slot : targetSlots) {
+ if (replicaSlots[slot] == null) {
+ replicaSlots[slot] = new ArrayList<>();
+ }
+ replicaSlots[slot].add(targetPool);
+ }
+ } finally {
+ w.unlock();
+ }
+ }
+
public ConnectionPool getNode(String nodeKey) {
r.lock();
try {
@@ -338,6 +404,15 @@ public HostAndPort getSlotNode(int slot) {
}
}
+ public List getSlotReplicaPools(int slot) {
+ r.lock();
+ try {
+ return replicaSlots[slot];
+ } finally {
+ r.unlock();
+ }
+ }
+
public Map getNodes() {
r.lock();
try {
diff --git a/src/main/java/redis/clients/jedis/JedisFactory.java b/src/main/java/redis/clients/jedis/JedisFactory.java
index 0e07ccc2860..0ff5bebe1c3 100644
--- a/src/main/java/redis/clients/jedis/JedisFactory.java
+++ b/src/main/java/redis/clients/jedis/JedisFactory.java
@@ -66,7 +66,7 @@ protected JedisFactory(final String host, final int port, final int connectionTi
}
protected JedisFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
- this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
+ this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}
@@ -83,7 +83,7 @@ protected JedisFactory(final String host, final int port, final int connectionTi
}
protected JedisFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
- this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
+ this.clientConfig = clientConfig;
this.jedisSocketFactory = jedisSocketFactory;
}
diff --git a/src/main/java/redis/clients/jedis/JedisPooled.java b/src/main/java/redis/clients/jedis/JedisPooled.java
index c6d022e0942..c3429319e7a 100644
--- a/src/main/java/redis/clients/jedis/JedisPooled.java
+++ b/src/main/java/redis/clients/jedis/JedisPooled.java
@@ -7,7 +7,10 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
+import redis.clients.jedis.annots.Experimental;
+import redis.clients.jedis.csc.Cache;
+import redis.clients.jedis.csc.CacheConfig;
+import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
@@ -27,7 +30,7 @@ public JedisPooled() {
* @param url
*/
public JedisPooled(final String url) {
- this(URI.create(url));
+ super(url);
}
/**
@@ -76,6 +79,16 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(hostAndPort, clientConfig);
}
+ @Experimental
+ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, CacheConfig cacheConfig) {
+ this(hostAndPort, clientConfig, CacheFactory.getCache(cacheConfig));
+ }
+
+ @Experimental
+ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache clientSideCache) {
+ super(hostAndPort, clientConfig, clientSideCache);
+ }
+
public JedisPooled(PooledObjectFactory factory) {
this(new PooledConnectionProvider(factory));
}
@@ -376,6 +389,19 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(new PooledConnectionProvider(hostAndPort, clientConfig, poolConfig), clientConfig.getRedisProtocol());
}
+ @Experimental
+ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, CacheConfig cacheConfig,
+ final GenericObjectPoolConfig poolConfig) {
+ this(hostAndPort, clientConfig, CacheFactory.getCache(cacheConfig), poolConfig);
+ }
+
+ @Experimental
+ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache clientSideCache,
+ final GenericObjectPoolConfig poolConfig) {
+ super(new PooledConnectionProvider(hostAndPort, clientConfig, clientSideCache, poolConfig),
+ clientConfig.getRedisProtocol(), clientSideCache);
+ }
+
public JedisPooled(final GenericObjectPoolConfig poolConfig,
final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
super(new PooledConnectionProvider(new ConnectionFactory(jedisSocketFactory, clientConfig), poolConfig),
diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java
index 586750540c6..f6a9ea705d2 100644
--- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java
+++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java
@@ -6,6 +6,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool {
private volatile HostAndPort currentHostMaster;
- private final Object initPoolLock = new Object();
+ private final Lock initPoolLock = new ReentrantLock(true);
public JedisSentinelPool(String masterName, Set sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) {
@@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() {
}
private void initMaster(HostAndPort master) {
- synchronized (initPoolLock) {
+ initPoolLock.lock();
+
+ try {
if (!master.equals(currentHostMaster)) {
currentHostMaster = master;
factory.setHostAndPort(currentHostMaster);
@@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) {
LOG.info("Created JedisSentinelPool to master at {}", master);
}
+ } finally {
+ initPoolLock.unlock();
}
}
diff --git a/src/main/java/redis/clients/jedis/JedisSentineled.java b/src/main/java/redis/clients/jedis/JedisSentineled.java
index 0ea0221c1ad..26f208a03b2 100644
--- a/src/main/java/redis/clients/jedis/JedisSentineled.java
+++ b/src/main/java/redis/clients/jedis/JedisSentineled.java
@@ -2,6 +2,10 @@
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.annots.Experimental;
+import redis.clients.jedis.csc.Cache;
+import redis.clients.jedis.csc.CacheConfig;
+import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.providers.SentineledConnectionProvider;
public class JedisSentineled extends UnifiedJedis {
@@ -12,6 +16,20 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}
+ @Experimental
+ public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, CacheConfig cacheConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig) {
+ this(masterName, masterClientConfig, CacheFactory.getCache(cacheConfig),
+ sentinels, sentinelClientConfig);
+ }
+
+ @Experimental
+ public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
+ Set sentinels, final JedisClientConfig sentinelClientConfig) {
+ super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache,
+ sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
+ }
+
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig poolConfig,
Set sentinels, final JedisClientConfig sentinelClientConfig) {
@@ -19,6 +37,14 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}
+ @Experimental
+ public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
+ final GenericObjectPoolConfig poolConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig) {
+ super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache, poolConfig,
+ sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
+ }
+
public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider) {
super(sentineledConnectionProvider);
}
diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java
index af6d39ee2d2..3ed5db41af3 100644
--- a/src/main/java/redis/clients/jedis/Pipeline.java
+++ b/src/main/java/redis/clients/jedis/Pipeline.java
@@ -29,12 +29,23 @@ public Pipeline(Connection connection) {
}
public Pipeline(Connection connection, boolean closeConnection) {
- super(new CommandObjects());
+ this(connection, closeConnection, createCommandObjects(connection));
+ }
+
+ private static CommandObjects createCommandObjects(Connection connection) {
+ CommandObjects commandObjects = new CommandObjects();
+ RedisProtocol proto = connection.getRedisProtocol();
+ if (proto != null) commandObjects.setProtocol(proto);
+ return commandObjects;
+ }
+
+ Pipeline(Connection connection, boolean closeConnection, CommandObjects commandObjects) {
+ super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
- RedisProtocol proto = this.connection.getRedisProtocol();
- if (proto != null) this.commandObjects.setProtocol(proto);
- setGraphCommands(new GraphCommandObjects(this.connection));
+ GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
+ graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
+ setGraphCommands(graphCommandObjects);
}
@Override
diff --git a/src/main/java/redis/clients/jedis/PipeliningBase.java b/src/main/java/redis/clients/jedis/PipeliningBase.java
index 928126a7047..ffe1c2a31c7 100644
--- a/src/main/java/redis/clients/jedis/PipeliningBase.java
+++ b/src/main/java/redis/clients/jedis/PipeliningBase.java
@@ -3948,6 +3948,11 @@ public Response tsAdd(String key, long timestamp, double value, TSCreatePa
return appendCommand(commandObjects.tsAdd(key, timestamp, value, createParams));
}
+ @Override
+ public Response tsAdd(String key, long timestamp, double value, TSAddParams addParams) {
+ return appendCommand(commandObjects.tsAdd(key, timestamp, value, addParams));
+ }
+
@Override
public Response> tsMAdd(Map.Entry... entries) {
return appendCommand(commandObjects.tsMAdd(entries));
@@ -3963,6 +3968,11 @@ public Response tsIncrBy(String key, double value, long timestamp) {
return appendCommand(commandObjects.tsIncrBy(key, value, timestamp));
}
+ @Override
+ public Response tsIncrBy(String key, double addend, TSIncrByParams incrByParams) {
+ return appendCommand(commandObjects.tsIncrBy(key, addend, incrByParams));
+ }
+
@Override
public Response tsDecrBy(String key, double value) {
return appendCommand(commandObjects.tsDecrBy(key, value));
@@ -3973,6 +3983,11 @@ public Response tsDecrBy(String key, double value, long timestamp) {
return appendCommand(commandObjects.tsDecrBy(key, value, timestamp));
}
+ @Override
+ public Response tsDecrBy(String key, double subtrahend, TSDecrByParams decrByParams) {
+ return appendCommand(commandObjects.tsDecrBy(key, subtrahend, decrByParams));
+ }
+
@Override
public Response> tsRange(String key, long fromTimestamp, long toTimestamp) {
return appendCommand(commandObjects.tsRange(key, fromTimestamp, toTimestamp));
diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java
index 448bd7ff123..cd6e41581fb 100644
--- a/src/main/java/redis/clients/jedis/Protocol.java
+++ b/src/main/java/redis/clients/jedis/Protocol.java
@@ -4,12 +4,16 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
+import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.RedisInputStream;
import redis.clients.jedis.util.RedisOutputStream;
@@ -49,6 +53,8 @@ public final class Protocol {
public static final byte[] POSITIVE_INFINITY_BYTES = "+inf".getBytes();
public static final byte[] NEGATIVE_INFINITY_BYTES = "-inf".getBytes();
+ static final List PROTOCOL_EMPTY_MAP = Collections.unmodifiableList(new ArrayList<>(0));
+
private static final String ASK_PREFIX = "ASK ";
private static final String MOVED_PREFIX = "MOVED ";
private static final String CLUSTERDOWN_PREFIX = "CLUSTERDOWN ";
@@ -58,6 +64,8 @@ public final class Protocol {
private static final String WRONGPASS_PREFIX = "WRONGPASS";
private static final String NOPERM_PREFIX = "NOPERM";
+ private static final byte[] INVALIDATE_BYTES = SafeEncoder.encode("invalidate");
+
private Protocol() {
throw new InstantiationError("Must not instantiate this class");
}
@@ -84,13 +92,9 @@ private static void processError(final RedisInputStream is) {
// Maybe Read only first 5 bytes instead?
if (message.startsWith(MOVED_PREFIX)) {
String[] movedInfo = parseTargetHostAndSlot(message);
-// throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1],
-// Integer.parseInt(movedInfo[2])), Integer.parseInt(movedInfo[0]));
throw new JedisMovedDataException(message, HostAndPort.from(movedInfo[1]), Integer.parseInt(movedInfo[0]));
} else if (message.startsWith(ASK_PREFIX)) {
String[] askInfo = parseTargetHostAndSlot(message);
-// throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
-// Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));
throw new JedisAskDataException(message, HostAndPort.from(askInfo[1]), Integer.parseInt(askInfo[0]));
} else if (message.startsWith(CLUSTERDOWN_PREFIX)) {
throw new JedisClusterException(message);
@@ -115,15 +119,6 @@ public static String readErrorLineIfPossible(RedisInputStream is) {
return is.readLine();
}
-// private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
-// String[] response = new String[3];
-// String[] messageInfo = clusterRedirectResponse.split(" ");
-// String[] targetHostAndPort = HostAndPort.extractParts(messageInfo[2]);
-// response[0] = messageInfo[1];
-// response[1] = targetHostAndPort[0];
-// response[2] = targetHostAndPort[1];
-// return response;
-// }
private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
String[] response = new String[2];
String[] messageInfo = clusterRedirectResponse.split(" ");
@@ -134,7 +129,7 @@ private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
- //System.out.println((char) b);
+ // System.out.println("BYTE: " + (char) b);
switch (b) {
case PLUS_BYTE:
return is.readLineBytes();
@@ -192,9 +187,9 @@ private static byte[] processBulkReply(final RedisInputStream is) {
}
private static List