From b38264f1bbbe9b1d4b9c6f52ec13f6bb4999b206 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Tue, 13 May 2025 21:02:16 +0300 Subject: [PATCH] Add dynamic_startup_nodes parameter to async RedisCluster --- redis/asyncio/cluster.py | 20 ++++++++++++++++++-- tests/test_asyncio/test_cluster.py | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 6044edc23a..aebfef230b 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -134,6 +134,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand | Enable read from replicas in READONLY mode and defines the load balancing strategy that will be used for cluster node selection. The data read from replicas is eventually consistent with the data in primary nodes. + :param dynamic_startup_nodes: + | Set the RedisCluster's startup nodes to all the discovered nodes. + If true (default value), the cluster's discovered nodes will be used to + determine the cluster nodes-slots mapping in the next topology refresh. + It will remove the initial passed startup nodes if their endpoints aren't + listed in the CLUSTER SLOTS output. + If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists + specific IP addresses, it is best to set it to false. :param reinitialize_steps: | Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not @@ -250,6 +258,7 @@ def __init__( require_full_coverage: bool = True, read_from_replicas: bool = False, load_balancing_strategy: Optional[LoadBalancingStrategy] = None, + dynamic_startup_nodes: bool = True, reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, max_connections: int = 2**31, @@ -388,6 +397,7 @@ def __init__( startup_nodes, require_full_coverage, kwargs, + dynamic_startup_nodes=dynamic_startup_nodes, address_remap=address_remap, event_dispatcher=self._event_dispatcher, ) @@ -1162,6 +1172,7 @@ async def _mock(self, error: RedisError): class NodesManager: __slots__ = ( + "_dynamic_startup_nodes", "_moved_exception", "_event_dispatcher", "connection_kwargs", @@ -1179,6 +1190,7 @@ def __init__( startup_nodes: List["ClusterNode"], require_full_coverage: bool, connection_kwargs: Dict[str, Any], + dynamic_startup_nodes: bool = True, address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None, event_dispatcher: Optional[EventDispatcher] = None, ) -> None: @@ -1191,6 +1203,8 @@ def __init__( self.nodes_cache: Dict[str, "ClusterNode"] = {} self.slots_cache: Dict[int, List["ClusterNode"]] = {} self.read_load_balancer = LoadBalancer() + + self._dynamic_startup_nodes: bool = dynamic_startup_nodes self._moved_exception: MovedError = None if event_dispatcher is None: self._event_dispatcher = EventDispatcher() @@ -1433,8 +1447,10 @@ async def initialize(self) -> None: # Set the tmp variables to the real variables self.slots_cache = tmp_slots self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True) - # Populate the startup nodes with all discovered nodes - self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) + + if self._dynamic_startup_nodes: + # Populate the startup nodes with all discovered nodes + self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) # Set the default node self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 91e8b58a82..1b3fbd5526 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -2723,6 +2723,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None: assert rc.get_node(host=default_host, port=7001) is not None assert rc.get_node(host=default_host, port=7002) is not None + @pytest.mark.parametrize("dynamic_startup_nodes", [True, False]) + async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes): + rc = await get_mocked_redis_client( + host="my@DNS.com", + port=7000, + cluster_slots=default_cluster_slots, + dynamic_startup_nodes=dynamic_startup_nodes, + ) + # Nodes are taken from default_cluster_slots + discovered_nodes = [ + "127.0.0.1:7000", + "127.0.0.1:7001", + "127.0.0.1:7002", + "127.0.0.1:7003", + ] + startup_nodes = list(rc.nodes_manager.startup_nodes.keys()) + if dynamic_startup_nodes is True: + assert sorted(startup_nodes) == sorted(discovered_nodes) + else: + assert startup_nodes == ["my@DNS.com:7000"] + class TestClusterPipeline: """Tests for the ClusterPipeline class."""