From c4623b0d2cb71751eedbe4af937a7f44bc73319a Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 11 Apr 2024 13:56:31 +0300 Subject: [PATCH 01/17] Attempt to make the subscribeToShardChannelViaOtherEndpointa and pubSubSsl more stable --- src/test/java/io/lettuce/core/SslIntegrationTests.java | 7 +++---- .../RedisClusterPubSubConnectionIntegrationTests.java | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index 0c1b586190..a503bbb374 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -383,14 +383,13 @@ void pubSubSsl() { RedisPubSubCommands connection = redisClient.connectPubSub(URI_NO_VERIFY).sync(); connection.subscribe("c1"); connection.subscribe("c2"); - Delay.delay(Duration.ofMillis(200)); RedisPubSubCommands connection2 = redisClient.connectPubSub(URI_NO_VERIFY).sync(); - assertThat(connection2.pubsubChannels()).contains("c1", "c2"); + Wait.untilTrue(()->connection2.pubsubChannels().contains("c1")).waitOrTimeout(); + Wait.untilTrue(()->connection2.pubsubChannels().contains("c2")).waitOrTimeout(); connection.quit(); - Delay.delay(Duration.ofMillis(200)); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); + Wait.untilTrue(connection.getStatefulConnection()::isOpen).waitOrTimeout(); Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 1e8503839b..62ebd81cc2 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -120,7 +120,7 @@ void testRegularClientPubSubShardChannels() { void subscribeToShardChannel() throws Exception { pubSubConnection.addListener(connectionListener); pubSubConnection.async().ssubscribe(shardChannel); - assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + assertThat(connectionListener.getChannels().take()).isEqualTo(shardChannel); } @Test @@ -132,7 +132,7 @@ void subscribeToShardChannelViaOtherEndpoint() throws Exception { RedisPubSubAsyncCommands other = pubSub .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); other.ssubscribe(shardChannel); - assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + assertThat(connectionListener.getChannels().take()).isEqualTo(shardChannel); } @Test From fc765451964bed1e8e49e26fa29adbf8076e23c6 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 11 Apr 2024 14:15:16 +0300 Subject: [PATCH 02/17] take() completely blocks the pipeline, let's try with Wait --- .../pubsub/RedisClusterPubSubConnectionIntegrationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 62ebd81cc2..86f582c3d5 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -120,7 +120,7 @@ void testRegularClientPubSubShardChannels() { void subscribeToShardChannel() throws Exception { pubSubConnection.addListener(connectionListener); pubSubConnection.async().ssubscribe(shardChannel); - assertThat(connectionListener.getChannels().take()).isEqualTo(shardChannel); + Wait.untilTrue(() -> shardChannel.equals(connectionListener.getChannels().poll())).waitOrTimeout(); } @Test @@ -132,7 +132,7 @@ void subscribeToShardChannelViaOtherEndpoint() throws Exception { RedisPubSubAsyncCommands other = pubSub .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); other.ssubscribe(shardChannel); - assertThat(connectionListener.getChannels().take()).isEqualTo(shardChannel); + Wait.untilTrue(() -> shardChannel.equals(connectionListener.getChannels().poll())).waitOrTimeout(); } @Test From 987777bc2b2b723243e25a12336d4d77116f0359 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 11 Apr 2024 17:42:32 +0300 Subject: [PATCH 03/17] Add colors to the log, make integration tests failures also fail the build, disable shard pubsub test that fails --- Makefile | 4 ++-- pom.xml | 1 + ...usterPubSubConnectionIntegrationTests.java | 19 ++++++++++++------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index c5a0d97a58..d52b95dea5 100644 --- a/Makefile +++ b/Makefile @@ -388,11 +388,11 @@ stop: pkill redis-sentinel && sleep 1 || true test-coverage: start - mvn -B -DskipITs=false clean compile verify jacoco:report -P$(PROFILE) + mvn -DskipITs=false clean compile verify jacoco:report -P$(PROFILE) $(MAKE) stop test: start - mvn -B -DskipITs=false clean compile verify -P$(PROFILE) + mvn -DskipITs=false clean compile verify -P$(PROFILE) $(MAKE) stop prepare: stop diff --git a/pom.xml b/pom.xml index 015cd8ccaa..0dbefd614c 100644 --- a/pom.xml +++ b/pom.xml @@ -862,6 +862,7 @@ integration-test integration-test + verify diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 86f582c3d5..b2810329f2 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -9,6 +9,7 @@ import javax.inject.Inject; +import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,8 +63,6 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { String shardChannel = "shard-channel"; - String shardTestChannel = "shard-test-channel"; - @Inject RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { this.clusterClient = clusterClient; @@ -101,15 +100,14 @@ void testRegularClientPubSubChannels() { } @Test + @EnabledOnCommand("SSUBSCRIBE") void testRegularClientPubSubShardChannels() { - String nodeId = pubSubConnection.sync().clusterMyId(); RedisClusterNode otherNode = getOtherThan(nodeId); - /// TODO : uncomment after SSUBSCRIBE is implemented - // pubSubConnection.sync().ssubscribe(key); + pubSubConnection.sync().ssubscribe(key); List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubShardChannels(); - // assertThat(channelsOnSubscribedNode).hasSize(1); + assertThat(channelsOnSubscribedNode).hasSize(1); List channelsOnOtherNode = connection.getConnection(otherNode.getNodeId()).sync().pubsubShardChannels(); assertThat(channelsOnOtherNode).isEmpty(); @@ -123,6 +121,13 @@ void subscribeToShardChannel() throws Exception { Wait.untilTrue(() -> shardChannel.equals(connectionListener.getChannels().poll())).waitOrTimeout(); } + @Ignore + // This test is currently failing because the replica of the master node, where we subscribe to a shard channel, + // could be used to SPUBLISH to this channel, but does not list the shard channels with PUBSUB SHARDCHANNELS or + // PUBSUB SHARDNUMSUB + + // furthermore the test does not address the possibility that the SSUBSCRIBE could result in a MOVED, e.g. when + // the hash of the shard channel name would have to be hosted on another node @Test @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { @@ -132,7 +137,7 @@ void subscribeToShardChannelViaOtherEndpoint() throws Exception { RedisPubSubAsyncCommands other = pubSub .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); other.ssubscribe(shardChannel); - Wait.untilTrue(() -> shardChannel.equals(connectionListener.getChannels().poll())).waitOrTimeout(); + assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); } @Test From 6bd465b4c7383e99a6b9d9ab8fb24fec89f44534 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 11 Apr 2024 18:17:25 +0300 Subject: [PATCH 04/17] Noticed a few things that were wrong with the initial implementation --- ...usterPubSubConnectionIntegrationTests.java | 46 +++++-------------- .../core/support/PubSubTestListener.java | 18 ++++++++ 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index b2810329f2..9697106ab6 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -12,6 +12,7 @@ import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -59,15 +60,11 @@ class RedisClusterPubSubConnectionIntegrationTests extends TestSupport { private StatefulRedisClusterPubSubConnection pubSubConnection2; - BlockingQueue shardChannels; - String shardChannel = "shard-channel"; @Inject RedisClusterPubSubConnectionIntegrationTests(RedisClusterClient clusterClient) { this.clusterClient = clusterClient; - shardChannels = LettuceFactories.newBlockingQueue(); - } @BeforeEach @@ -75,7 +72,7 @@ void openPubSubConnection() { connection = clusterClient.connect(); pubSubConnection = clusterClient.connectPubSub(); pubSubConnection2 = clusterClient.connectPubSub(); - + pubSubConnection.addListener(connectionListener); } @AfterEach @@ -83,6 +80,8 @@ void closePubSubConnection() { connection.close(); pubSubConnection.close(); pubSubConnection2.close(); + connectionListener.clear(); + pubSubConnection.removeListener(connectionListener); } @Test @@ -99,39 +98,25 @@ void testRegularClientPubSubChannels() { assertThat(channelsOnOtherNode).isEmpty(); } - @Test - @EnabledOnCommand("SSUBSCRIBE") - void testRegularClientPubSubShardChannels() { - String nodeId = pubSubConnection.sync().clusterMyId(); - RedisClusterNode otherNode = getOtherThan(nodeId); - pubSubConnection.sync().ssubscribe(key); - - List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubShardChannels(); - assertThat(channelsOnSubscribedNode).hasSize(1); - - List channelsOnOtherNode = connection.getConnection(otherNode.getNodeId()).sync().pubsubShardChannels(); - assertThat(channelsOnOtherNode).isEmpty(); - } - @Test @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannel() throws Exception { - pubSubConnection.addListener(connectionListener); - pubSubConnection.async().ssubscribe(shardChannel); - Wait.untilTrue(() -> shardChannel.equals(connectionListener.getChannels().poll())).waitOrTimeout(); + pubSubConnection.sync().ssubscribe(shardChannel); + + Wait.untilEquals(1L, connectionListener.getShardCounts()::poll).waitOrTimeout(); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } - @Ignore + @Disabled // This test is currently failing because the replica of the master node, where we subscribe to a shard channel, // could be used to SPUBLISH to this channel, but does not list the shard channels with PUBSUB SHARDCHANNELS or // PUBSUB SHARDNUMSUB // furthermore the test does not address the possibility that the SSUBSCRIBE could result in a MOVED, e.g. when - // the hash of the shard channel name would have to be hosted on another node + // the hash of the shard channel name falls into the slot space of another node @Test - @EnabledOnCommand("SSUBSCRIBE") + // @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { - pubSubConnection.addListener(connectionListener); RedisClusterPubSubAsyncCommands pubSub = pubSubConnection.async(); String nodeId = pubSub.clusterMyId().get(); RedisPubSubAsyncCommands other = pubSub @@ -174,7 +159,6 @@ void testRegularClientPublish() throws Exception { String nodeId = pubSubConnection.sync().clusterMyId(); RedisClusterNode otherNode = getOtherThan(nodeId); pubSubConnection.sync().subscribe(key); - pubSubConnection.addListener(connectionListener); connection.getConnection(nodeId).sync().publish(key, value); assertThat(connectionListener.getMessages().take()).isEqualTo(value); @@ -188,7 +172,6 @@ void testPubSubClientPublish() throws Exception { String nodeId = pubSubConnection.sync().clusterMyId(); pubSubConnection.sync().subscribe(key); - pubSubConnection.addListener(connectionListener); assertThat(pubSubConnection2.sync().clusterMyId()).isEqualTo(nodeId); @@ -215,7 +198,6 @@ void testRegularClientPubSubPublish() throws Exception { String nodeId = pubSubConnection.sync().clusterMyId(); RedisClusterNode otherNode = getOtherThan(nodeId); pubSubConnection.sync().subscribe(key); - pubSubConnection.addListener(connectionListener); List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubChannels(); assertThat(channelsOnSubscribedNode).hasSize(1); @@ -269,7 +251,6 @@ void testNodeMessagePropagationSubscription() throws Exception { RedisClusterNode partition = pubSubConnection.getPartitions().getPartition(0); pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); StatefulRedisPubSubConnection node = pubSubConnection.getConnection(partition.getNodeId()); node.sync().subscribe("channel"); @@ -284,7 +265,6 @@ void testNodeHostAndPortMessagePropagationSubscription() throws Exception { RedisClusterNode partition = pubSubConnection.getPartitions().getPartition(0); pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); RedisURI uri = partition.getUri(); StatefulRedisPubSubConnection node = pubSubConnection.getConnection(uri.getHost(), uri.getPort()); @@ -299,7 +279,6 @@ void testNodeHostAndPortMessagePropagationSubscription() throws Exception { void testAsyncSubscription() throws Exception { pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); PubSubAsyncNodeSelection masters = pubSubConnection.async().masters(); NodeSelectionPubSubAsyncCommands commands = masters.commands(); @@ -317,7 +296,6 @@ void testAsyncSubscription() throws Exception { void testSyncSubscription() throws Exception { pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); PubSubNodeSelection masters = pubSubConnection.sync().masters(); NodeSelectionPubSubCommands commands = masters.commands(); @@ -335,7 +313,6 @@ void testSyncSubscription() throws Exception { void testReactiveSubscription() throws Exception { pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); PubSubReactiveNodeSelection masters = pubSubConnection.reactive().masters(); NodeSelectionPubSubReactiveCommands commands = masters.commands(); @@ -355,7 +332,6 @@ void testClusterListener() throws Exception { BlockingQueue nodes = new LinkedBlockingQueue<>(); pubSubConnection.setNodeMessagePropagation(true); - pubSubConnection.addListener(connectionListener); pubSubConnection.addListener(new RedisClusterPubSubAdapter() { @Override diff --git a/src/test/java/io/lettuce/core/support/PubSubTestListener.java b/src/test/java/io/lettuce/core/support/PubSubTestListener.java index 3503c4c0c2..d89c9a7810 100644 --- a/src/test/java/io/lettuce/core/support/PubSubTestListener.java +++ b/src/test/java/io/lettuce/core/support/PubSubTestListener.java @@ -33,6 +33,8 @@ public class PubSubTestListener implements RedisPubSubListener { private BlockingQueue patterns = LettuceFactories.newBlockingQueue(); private BlockingQueue messages = LettuceFactories.newBlockingQueue(); private BlockingQueue counts = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardChannels = LettuceFactories.newBlockingQueue(); + private BlockingQueue shardCounts = LettuceFactories.newBlockingQueue(); // RedisPubSubListener implementation @@ -55,6 +57,12 @@ public void subscribed(String channel, long count) { counts.add(count); } + @Override + public void ssubscribed(String shardChannel, long count) { + shardChannels.add(shardChannel); + shardCounts.add(count); + } + @Override public void psubscribed(String pattern, long count) { patterns.add(pattern); @@ -77,6 +85,10 @@ public BlockingQueue getChannels() { return channels; } + public BlockingQueue getShardChannels() { + return shardChannels; + } + public BlockingQueue getPatterns() { return patterns; } @@ -89,13 +101,19 @@ public BlockingQueue getCounts() { return counts; } + public BlockingQueue getShardCounts() { + return shardCounts; + } + /** * Clear listener state (i.e. channels, patterns, messages, counts). */ public void clear() { channels.clear(); + shardChannels.clear(); patterns.clear(); messages.clear(); counts.clear(); + shardCounts.clear(); } } From 405220b62451fa34e48640799d21b1b113c315d0 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Thu, 11 Apr 2024 18:25:53 +0300 Subject: [PATCH 05/17] Disable the last failing test --- src/test/java/io/lettuce/core/SslIntegrationTests.java | 2 ++ .../pubsub/RedisClusterPubSubConnectionIntegrationTests.java | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index a503bbb374..5ff6d81494 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -36,6 +36,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -377,6 +378,7 @@ void masterSlaveSslWithAllInvalidHostsWillFail() { .isInstanceOf(RedisConnectionException.class); } + @Disabled // constantly fails on the pipeline, but not locally, hard to reproduce @Test void pubSubSsl() { diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 9697106ab6..b1e0359ce7 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -9,7 +9,6 @@ import javax.inject.Inject; -import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -31,7 +30,6 @@ import io.lettuce.core.cluster.pubsub.api.sync.PubSubNodeSelection; import io.lettuce.core.event.command.CommandFailedEvent; import io.lettuce.core.event.command.CommandListener; -import io.lettuce.core.internal.LettuceFactories; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.support.PubSubTestListener; From 6d4f02ad323302681d5c7ae23983e767b7a91760 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 15 Apr 2024 15:10:47 +0300 Subject: [PATCH 06/17] Reverting back to a working version of the two removed tests in RedisClusterPubSubConnectionIntegartionTests --- .../io/lettuce/core/SslIntegrationTests.java | 10 +++--- ...usterPubSubConnectionIntegrationTests.java | 33 +++++++++++++------ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index 5ff6d81494..f951204167 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -385,19 +385,17 @@ void pubSubSsl() { RedisPubSubCommands connection = redisClient.connectPubSub(URI_NO_VERIFY).sync(); connection.subscribe("c1"); connection.subscribe("c2"); + Delay.delay(Duration.ofMillis(200)); RedisPubSubCommands connection2 = redisClient.connectPubSub(URI_NO_VERIFY).sync(); - Wait.untilTrue(()->connection2.pubsubChannels().contains("c1")).waitOrTimeout(); - Wait.untilTrue(()->connection2.pubsubChannels().contains("c2")).waitOrTimeout(); + assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); - Wait.untilTrue(connection.getStatefulConnection()::isOpen).waitOrTimeout(); + Delay.delay(Duration.ofMillis(200)); + Wait.untilTrue(connection::isOpen).waitOrTimeout(); Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); - - connection.getStatefulConnection().close(); - connection2.getStatefulConnection().close(); } private static RedisURI.Builder sslURIBuilder(int portOffset) { diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index b1e0359ce7..5a71ba694d 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -11,7 +11,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -96,6 +95,21 @@ void testRegularClientPubSubChannels() { assertThat(channelsOnOtherNode).isEmpty(); } + @Test + void testRegularClientPubSubShardChannels() { + + String nodeId = pubSubConnection.sync().clusterMyId(); + RedisClusterNode otherNode = getOtherThan(nodeId); + + pubSubConnection.sync().ssubscribe(shardChannel); + + List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubShardChannels(); + assertThat(channelsOnSubscribedNode).hasSize(1); + + List channelsOnOtherNode = connection.getConnection(otherNode.getNodeId()).sync().pubsubShardChannels(); + assertThat(channelsOnOtherNode).isEmpty(); + } + @Test @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannel() throws Exception { @@ -105,22 +119,21 @@ void subscribeToShardChannel() throws Exception { Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } - @Disabled - // This test is currently failing because the replica of the master node, where we subscribe to a shard channel, - // could be used to SPUBLISH to this channel, but does not list the shard channels with PUBSUB SHARDCHANNELS or - // PUBSUB SHARDNUMSUB - - // furthermore the test does not address the possibility that the SSUBSCRIBE could result in a MOVED, e.g. when - // the hash of the shard channel name falls into the slot space of another node @Test - // @EnabledOnCommand("SSUBSCRIBE") + @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { + // Step 1. fetch the connection to the shard that serves the slot for RedisClusterPubSubAsyncCommands pubSub = pubSubConnection.async(); String nodeId = pubSub.clusterMyId().get(); + // Step 2. fetch another node, that does not serve this slot RedisPubSubAsyncCommands other = pubSub .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); + // Step 3. Use the connection from step 2 to subscribe to + // This should cause a MOVED response and a subscription using the right connection automatically other.ssubscribe(shardChannel); - assertThat(connectionListener.getChannels().poll(3, TimeUnit.SECONDS)).isEqualTo(shardChannel); + + // Step 4. Verify that the subscription succeeded + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } @Test From b55860ef9637ab312db8ca75710dd1fbea5825d2 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 15 Apr 2024 16:02:08 +0300 Subject: [PATCH 07/17] Make sure we do not depend on default slot allocation --- ...sClusterPubSubConnectionIntegrationTests.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 5a71ba694d..ade4f67abc 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -5,10 +5,11 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import javax.inject.Inject; +import io.lettuce.core.RedisAsyncCommandsImpl; +import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -98,15 +99,18 @@ void testRegularClientPubSubChannels() { @Test void testRegularClientPubSubShardChannels() { - String nodeId = pubSubConnection.sync().clusterMyId(); - RedisClusterNode otherNode = getOtherThan(nodeId); - pubSubConnection.sync().ssubscribe(shardChannel); - List channelsOnSubscribedNode = connection.getConnection(nodeId).sync().pubsubShardChannels(); + Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); + RedisCommands rightSlot = + connection.sync().nodes(node -> node.getSlots().contains(clusterKeyslot)).commands(0); + RedisCommands wrongSlot = + connection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)).commands(0); + + List channelsOnSubscribedNode = rightSlot.pubsubShardChannels(); assertThat(channelsOnSubscribedNode).hasSize(1); - List channelsOnOtherNode = connection.getConnection(otherNode.getNodeId()).sync().pubsubShardChannels(); + List channelsOnOtherNode = wrongSlot.pubsubShardChannels(); assertThat(channelsOnOtherNode).isEmpty(); } From ebd120271f6821528324b992f414f48426ada48d Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 15 Apr 2024 16:10:40 +0300 Subject: [PATCH 08/17] Same approach for other failing test --- ...ClusterPubSubConnectionIntegrationTests.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index ade4f67abc..7667fe22fd 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -126,17 +126,12 @@ void subscribeToShardChannel() throws Exception { @Test @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { - // Step 1. fetch the connection to the shard that serves the slot for - RedisClusterPubSubAsyncCommands pubSub = pubSubConnection.async(); - String nodeId = pubSub.clusterMyId().get(); - // Step 2. fetch another node, that does not serve this slot - RedisPubSubAsyncCommands other = pubSub - .nodes(node -> node.getRole().isUpstream() && !node.getNodeId().equals(nodeId)).commands(0); - // Step 3. Use the connection from step 2 to subscribe to - // This should cause a MOVED response and a subscription using the right connection automatically - other.ssubscribe(shardChannel); - - // Step 4. Verify that the subscription succeeded + Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); + RedisPubSubCommands otherSlot = + pubSubConnection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)).commands(0); + + otherSlot.ssubscribe(shardChannel); + Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } From 2a9e8a82a9afd1b062ada4330548515fab9a99cb Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 15 Apr 2024 17:03:26 +0300 Subject: [PATCH 09/17] Verify that the slot are different locally and on the pipeline --- .../RedisClusterPubSubConnectionIntegrationTests.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 7667fe22fd..5c63bfaaa0 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -8,7 +8,7 @@ import javax.inject.Inject; -import io.lettuce.core.RedisAsyncCommandsImpl; +import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -23,7 +23,6 @@ import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; -import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands; import io.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection; import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands; @@ -31,7 +30,6 @@ import io.lettuce.core.event.command.CommandFailedEvent; import io.lettuce.core.event.command.CommandListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; -import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.support.PubSubTestListener; import io.lettuce.test.LettuceExtension; import io.lettuce.test.TestFutures; @@ -128,7 +126,10 @@ void subscribeToShardChannel() throws Exception { void subscribeToShardChannelViaOtherEndpoint() throws Exception { Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); RedisPubSubCommands otherSlot = - pubSubConnection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot)).commands(0); + pubSubConnection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot) && node.getRole().isReplica()).commands(0); + + Partitions partitions = connection.getPartitions(); + assertThat("27f88788f03a86296b7d860152f4ae24ee59c8c9").isEqualTo(partitions.getPartitionBySlot(clusterKeyslot).getNodeId()); otherSlot.ssubscribe(shardChannel); From 912f84145f8cee9afdb6c6f2fb8952cf43c2b9fb Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 15 Apr 2024 17:43:29 +0300 Subject: [PATCH 10/17] Using replica works --- .../RedisClusterPubSubConnectionIntegrationTests.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 5c63bfaaa0..6060fe0b74 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -124,12 +124,10 @@ void subscribeToShardChannel() throws Exception { @Test @EnabledOnCommand("SSUBSCRIBE") void subscribeToShardChannelViaOtherEndpoint() throws Exception { - Integer clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); + int clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); + String thisNode = connection.getPartitions().getPartitionBySlot(clusterKeyslot).getNodeId(); RedisPubSubCommands otherSlot = - pubSubConnection.sync().nodes(node -> !node.getSlots().contains(clusterKeyslot) && node.getRole().isReplica()).commands(0); - - Partitions partitions = connection.getPartitions(); - assertThat("27f88788f03a86296b7d860152f4ae24ee59c8c9").isEqualTo(partitions.getPartitionBySlot(clusterKeyslot).getNodeId()); + pubSubConnection.sync().nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); otherSlot.ssubscribe(shardChannel); From 4ec12203eef7b148a5eadaef40c74721c813351b Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Tue, 16 Apr 2024 15:07:27 +0300 Subject: [PATCH 11/17] Testing SSUBSCRIBE on replica, instead of other master --- ...edisClusterPubSubConnectionIntegrationTests.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java index 6060fe0b74..c4a808fad9 100644 --- a/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubConnectionIntegrationTests.java @@ -8,8 +8,7 @@ import javax.inject.Inject; -import io.lettuce.core.cluster.models.partitions.Partitions; -import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -114,7 +113,7 @@ void testRegularClientPubSubShardChannels() { @Test @EnabledOnCommand("SSUBSCRIBE") - void subscribeToShardChannel() throws Exception { + void subscribeToShardChannel(){ pubSubConnection.sync().ssubscribe(shardChannel); Wait.untilEquals(1L, connectionListener.getShardCounts()::poll).waitOrTimeout(); @@ -123,13 +122,13 @@ void subscribeToShardChannel() throws Exception { @Test @EnabledOnCommand("SSUBSCRIBE") - void subscribeToShardChannelViaOtherEndpoint() throws Exception { + void subscribeToShardChannelViaReplica() { int clusterKeyslot = connection.sync().clusterKeyslot(shardChannel).intValue(); String thisNode = connection.getPartitions().getPartitionBySlot(clusterKeyslot).getNodeId(); - RedisPubSubCommands otherSlot = - pubSubConnection.sync().nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); - otherSlot.ssubscribe(shardChannel); + RedisPubSubAsyncCommands replica = + pubSubConnection.async().nodes(node -> thisNode.equals(node.getSlaveOf())).commands(0); + replica.ssubscribe(shardChannel); Wait.untilEquals(shardChannel, connectionListener.getShardChannels()::poll).waitOrTimeout(); } From fa6f053171a6dbf1433477e39e8e2321e5897875 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 09:12:40 +0300 Subject: [PATCH 12/17] Enabling the SSLIntegrationTests test --- src/test/java/io/lettuce/core/SslIntegrationTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index f951204167..39b014cbe3 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -378,20 +378,17 @@ void masterSlaveSslWithAllInvalidHostsWillFail() { .isInstanceOf(RedisConnectionException.class); } - @Disabled // constantly fails on the pipeline, but not locally, hard to reproduce @Test void pubSubSsl() { RedisPubSubCommands connection = redisClient.connectPubSub(URI_NO_VERIFY).sync(); connection.subscribe("c1"); connection.subscribe("c2"); - Delay.delay(Duration.ofMillis(200)); RedisPubSubCommands connection2 = redisClient.connectPubSub(URI_NO_VERIFY).sync(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); - Delay.delay(Duration.ofMillis(200)); Wait.untilTrue(connection::isOpen).waitOrTimeout(); Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); From 421f856fdf9d2e8f6281832362b1e7dfdc2d2c46 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 10:14:07 +0300 Subject: [PATCH 13/17] Increasing the check duration fixes the test --- src/test/java/io/lettuce/core/SslIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index 39b014cbe3..819cb551d1 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -390,7 +390,7 @@ void pubSubSsl() { assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); Wait.untilTrue(connection::isOpen).waitOrTimeout(); - Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); + Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).during(Duration.ofSeconds(60)).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); } From 824ba4c17700bfe64d71361a7da2a15f35ffc847 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 10:21:59 +0300 Subject: [PATCH 14/17] Pipeline still fails with 60sec, increase to 120sec --- src/test/java/io/lettuce/core/SslIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index 819cb551d1..5acd07f4d8 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -390,7 +390,7 @@ void pubSubSsl() { assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); Wait.untilTrue(connection::isOpen).waitOrTimeout(); - Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).during(Duration.ofSeconds(60)).waitOrTimeout(); + Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).during(Duration.ofSeconds(120)).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); } From f444608e64f8f55d5f21d19d2047b7a7d5302bbd Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 17:09:23 +0300 Subject: [PATCH 15/17] Set the connection state as activated only after all resubscribe events have completed --- .../StatefulRedisPubSubConnectionImpl.java | 30 +++++++++++++++---- .../io/lettuce/core/SslIntegrationTests.java | 4 +-- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 33b1f1412e..70ad02da1b 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -24,12 +24,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletionStage; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisCommandExecutionException; import io.lettuce.core.RedisFuture; import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.Futures; import io.lettuce.core.protocol.ConnectionWatchdog; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; @@ -146,15 +148,33 @@ private T[] toArray(Collection c) { @Override public void activated() { - super.activated(); - for (RedisFuture command : resubscribe()) { - command.exceptionally(throwable -> { + List> futures = resubscribe(); + + if (futures.isEmpty()) { + super.activated(); + return; + } + + CompletionStage lastCommand = null; + for (RedisFuture command : futures) { + if (lastCommand == null){ + lastCommand = command; + continue; + } + lastCommand = lastCommand.handleAsync( (result,throwable) -> { if (throwable instanceof RedisCommandExecutionException) { - InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + command.getError()); + InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + throwable.getMessage()); } return null; - }); + }).thenCombineAsync(command, (res1, res2) -> null); } + + lastCommand.handleAsync( (result,throwable) -> { + if (throwable instanceof RedisCommandExecutionException) { + InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + throwable.getMessage()); + } + return null; + }).thenRun(super::activated); } } diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index 5acd07f4d8..fbfa76267a 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -389,8 +389,8 @@ void pubSubSsl() { assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); - Wait.untilTrue(connection::isOpen).waitOrTimeout(); - Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).during(Duration.ofSeconds(120)).waitOrTimeout(); + Wait.untilTrue(connection.getStatefulConnection()::isOpen).waitOrTimeout(); + Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); } From ab47845cfc843fae78f41795c78529d76b7ce323 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 17:30:56 +0300 Subject: [PATCH 16/17] Fix unsuccessful on pipeline, reverting to previous version --- .../StatefulRedisPubSubConnectionImpl.java | 30 ++++--------------- .../io/lettuce/core/SslIntegrationTests.java | 2 +- 2 files changed, 6 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 70ad02da1b..5cfb000ebc 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.CompletionStage; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisCommandExecutionException; @@ -148,33 +147,14 @@ private T[] toArray(Collection c) { @Override public void activated() { - List> futures = resubscribe(); - - if (futures.isEmpty()) { - super.activated(); - return; - } - - CompletionStage lastCommand = null; - for (RedisFuture command : futures) { - if (lastCommand == null){ - lastCommand = command; - continue; - } - lastCommand = lastCommand.handleAsync( (result,throwable) -> { + super.activated(); + for (RedisFuture command : resubscribe()) { + command.exceptionally(throwable -> { if (throwable instanceof RedisCommandExecutionException) { - InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + throwable.getMessage()); + InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + command.getError()); } return null; - }).thenCombineAsync(command, (res1, res2) -> null); + }); } - - lastCommand.handleAsync( (result,throwable) -> { - if (throwable instanceof RedisCommandExecutionException) { - InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + throwable.getMessage()); - } - return null; - }).thenRun(super::activated); } - } diff --git a/src/test/java/io/lettuce/core/SslIntegrationTests.java b/src/test/java/io/lettuce/core/SslIntegrationTests.java index fbfa76267a..098ee9eb80 100644 --- a/src/test/java/io/lettuce/core/SslIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SslIntegrationTests.java @@ -390,7 +390,7 @@ void pubSubSsl() { assertThat(connection2.pubsubChannels()).contains("c1", "c2"); connection.quit(); Wait.untilTrue(connection.getStatefulConnection()::isOpen).waitOrTimeout(); - Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).waitOrTimeout(); + Wait.untilEquals(2, () -> connection2.pubsubChannels().size()).during(Duration.ofSeconds(120)).waitOrTimeout(); assertThat(connection2.pubsubChannels()).contains("c1", "c2"); } From 40c29d8f0db33e50fc96d2ad4d7393a65f9f34c4 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Wed, 17 Apr 2024 17:45:11 +0300 Subject: [PATCH 17/17] Polishing --- .../lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 5cfb000ebc..33b1f1412e 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -30,7 +30,6 @@ import io.lettuce.core.RedisFuture; import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.codec.RedisCodec; -import io.lettuce.core.internal.Futures; import io.lettuce.core.protocol.ConnectionWatchdog; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; @@ -157,4 +156,5 @@ public void activated() { }); } } + }