Skip to content

Add dynamic_startup_nodes parameter to async RedisCluster #3646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
| Enable read from replicas in READONLY mode and defines the load balancing
strategy that will be used for cluster node selection.
The data read from replicas is eventually consistent with the data in primary nodes.
:param dynamic_startup_nodes:
| Set the RedisCluster's startup nodes to all the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
It will remove the initial passed startup nodes if their endpoints aren't
listed in the CLUSTER SLOTS output.
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
specific IP addresses, it is best to set it to false.
:param reinitialize_steps:
| Specifies the number of MOVED errors that need to occur before reinitializing
the whole cluster topology. If a MOVED error occurs and the cluster does not
Expand Down Expand Up @@ -250,6 +258,7 @@ def __init__(
require_full_coverage: bool = True,
read_from_replicas: bool = False,
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
dynamic_startup_nodes: bool = True,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
max_connections: int = 2**31,
Expand Down Expand Up @@ -388,6 +397,7 @@ def __init__(
startup_nodes,
require_full_coverage,
kwargs,
dynamic_startup_nodes=dynamic_startup_nodes,
address_remap=address_remap,
event_dispatcher=self._event_dispatcher,
)
Expand Down Expand Up @@ -1162,6 +1172,7 @@ async def _mock(self, error: RedisError):

class NodesManager:
__slots__ = (
"_dynamic_startup_nodes",
"_moved_exception",
"_event_dispatcher",
"connection_kwargs",
Expand All @@ -1179,6 +1190,7 @@ def __init__(
startup_nodes: List["ClusterNode"],
require_full_coverage: bool,
connection_kwargs: Dict[str, Any],
dynamic_startup_nodes: bool = True,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
event_dispatcher: Optional[EventDispatcher] = None,
) -> None:
Expand All @@ -1191,6 +1203,8 @@ def __init__(
self.nodes_cache: Dict[str, "ClusterNode"] = {}
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
self.read_load_balancer = LoadBalancer()

self._dynamic_startup_nodes: bool = dynamic_startup_nodes
self._moved_exception: MovedError = None
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
Expand Down Expand Up @@ -1433,8 +1447,10 @@ async def initialize(self) -> None:
# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
Expand Down
21 changes: 21 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
assert rc.get_node(host=default_host, port=7001) is not None
assert rc.get_node(host=default_host, port=7002) is not None

@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
rc = await get_mocked_redis_client(
host="[email protected]",
port=7000,
cluster_slots=default_cluster_slots,
dynamic_startup_nodes=dynamic_startup_nodes,
)
# Nodes are taken from default_cluster_slots
discovered_nodes = [
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
"127.0.0.1:7003",
]
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
if dynamic_startup_nodes is True:
assert sorted(startup_nodes) == sorted(discovered_nodes)
else:
assert startup_nodes == ["[email protected]:7000"]


class TestClusterPipeline:
"""Tests for the ClusterPipeline class."""
Expand Down
Loading