diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 28fcd3aa23..955d5277ae 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -759,7 +759,10 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: ) return dict(zip(keys, values)) except Exception as e: - if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if ( + retry_attempts > 0 + and type(e) in self.__class__.CONNECTION_ERRORS_FOR_RETRY + ): # The nodes and slots cache were should be reinitialized. # Try again with the new cluster setup. retry_attempts -= 1 @@ -1566,7 +1569,7 @@ async def execute( allow_redirections=allow_redirections, ) - except self.__class__.ERRORS_ALLOW_RETRY as e: + except self.__class__.CONNECTION_ERRORS_FOR_RETRY as e: if retry_attempts > 0: # Try again with the new cluster setup. All other errors # should be raised. @@ -1657,7 +1660,10 @@ async def _execute( for cmd in default_node[1]: # Check if it has a command that failed with a relevant # exception - if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: + if ( + type(cmd.result) + in self.__class__.CONNECTION_ERRORS_FOR_RETRY + ): client.replace_default_node() break diff --git a/redis/client.py b/redis/client.py index e9435d33ef..e78689b62d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -23,6 +23,7 @@ _RedisCallbacksRESP3, bool_ok, ) +from redis.backoff import ExponentialWithJitterBackoff from redis.cache import CacheConfig, CacheInterface from redis.commands import ( CoreCommands, @@ -58,6 +59,7 @@ from redis.utils import ( HIREDIS_AVAILABLE, _set_info_logger, + deprecated_args, get_lib_version, safe_str, str_if_bytes, @@ -189,6 +191,11 @@ def from_pool( client.auto_close_connection_pool = True return client + @deprecated_args( + args_to_warn=["retry_on_timeout"], + reason="TimeoutError is included by default.", + version="6.1.0", + ) def __init__( self, host: str = "localhost", @@ -203,8 +210,6 @@ def __init__( unix_socket_path: Optional[str] = None, encoding: str = "utf-8", encoding_errors: str = "strict", - charset: Optional[str] = None, - errors: Optional[str] = None, decode_responses: bool = False, retry_on_timeout: bool = False, retry_on_error: Optional[List[Type[Exception]]] = None, @@ -230,7 +235,9 @@ def __init__( lib_name: Optional[str] = "redis-py", lib_version: Optional[str] = get_lib_version(), username: Optional[str] = None, - retry: Optional[Retry] = None, + retry: Optional[Retry] = Retry( + backoff=ExponentialWithJitterBackoff(base=0.1, cap=5), retries=3 + ), redis_connect_func: Optional[Callable[[], None]] = None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, @@ -240,10 +247,24 @@ def __init__( ) -> None: """ Initialize a new Redis client. - To specify a retry policy for specific errors, first set - `retry_on_error` to a list of the error/s to retry on, then set - `retry` to a valid `Retry` object. - To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. + + To specify a retry policy for specific errors, you have two options: + + 1. Set the `retry_on_error` to a list of the error/s to retry on, and + you can also set `retry` to a valid `Retry` object(in case the default + one is not appropriate) - with this approach the retries will be triggered + on the default errors specified in the Retry object enriched with the + errors specified in `retry_on_error`. + + 2. Define a `Retry` object with configured 'supported_errors' and set + it to the `retry` parameter - with this approach you completely redefine + the errors on which retries will happen. + + `retry_on_timeout` is deprecated - please include the TimeoutError + either in the Retry object or in the `retry_on_error` list. + + When 'connection_pool' is provided - the retry configuration of the + provided pool will be used. Args: @@ -256,24 +277,8 @@ def __init__( else: self._event_dispatcher = event_dispatcher if not connection_pool: - if charset is not None: - warnings.warn( - DeprecationWarning( - '"charset" is deprecated. Use "encoding" instead' - ) - ) - encoding = charset - if errors is not None: - warnings.warn( - DeprecationWarning( - '"errors" is deprecated. Use "encoding_errors" instead' - ) - ) - encoding_errors = errors if not retry_on_error: retry_on_error = [] - if retry_on_timeout is True: - retry_on_error.append(TimeoutError) kwargs = { "db": db, "username": username, @@ -395,10 +400,10 @@ def get_connection_kwargs(self) -> Dict: """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs - def get_retry(self) -> Optional["Retry"]: + def get_retry(self) -> Retry: return self.get_connection_kwargs().get("retry") - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.get_connection_kwargs().update({"retry": retry}) self.connection_pool.set_retry(retry) @@ -598,18 +603,20 @@ def _send_command_parse_response(self, conn, command_name, *args, **options): conn.send_command(*args, **options) return self.parse_response(conn, command_name, **options) - def _disconnect_raise(self, conn, error): + def _conditional_disconnect(self, conn, error) -> None: """ - Close the connection and raise an exception - if retry_on_error is not set or the error - is not one of the specified error types + Close the connection if the error is not TimeoutError. + The supported exceptions are already checked in the + retry object so we don't need to do it here. + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ + if isinstance(error, TimeoutError): + # If the error is a TimeoutError, we don't want to + # disconnect the connection. We want to retry the command. + return + conn.disconnect() - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - raise error # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): @@ -619,17 +626,20 @@ def _execute_command(self, *args, **options): """Execute a command and return a parsed response""" pool = self.connection_pool command_name = args[0] + conn = self.connection or pool.get_connection() if self._single_connection_client: self.single_connection_lock.acquire() + try: return conn.retry.call_with_retry( lambda: self._send_command_parse_response( conn, command_name, *args, **options ), - lambda error: self._disconnect_raise(conn, error), + lambda error: self._conditional_disconnect(conn, error), ) + finally: if self._single_connection_client: self.single_connection_lock.release() diff --git a/redis/cluster.py b/redis/cluster.py index 4ec03ac98f..2725c6b3ce 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -5,11 +5,11 @@ import time from collections import OrderedDict from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from redis._parsers import CommandsParser, Encoder from redis._parsers.helpers import parse_scan -from redis.backoff import default_backoff +from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface from redis.client import CaseInsensitiveDict, PubSub, Redis from redis.commands import READ_COMMANDS, RedisClusterCommands @@ -38,7 +38,7 @@ TryAgainError, ) from redis.lock import Lock -from redis.retry import Retry +from redis.retry import DEFAULT_SUPPORTED_EXCEPTIONS, Retry from redis.utils import ( HIREDIS_AVAILABLE, deprecated_args, @@ -412,7 +412,9 @@ class AbstractRedisCluster: list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), ) - ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) + CONNECTION_ERRORS_FOR_RETRY = tuple( + set(DEFAULT_SUPPORTED_EXCEPTIONS + (ClusterDownError,)) + ) def replace_default_node(self, target_node: "ClusterNode" = None) -> None: """Replace the default cluster node. @@ -433,7 +435,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None: # Choose a primary if the cluster contains different primaries self.nodes_manager.default_node = random.choice(primaries) else: - # Otherwise, hoose a primary if the cluster contains different primaries + # Otherwise, choose a primary if the cluster contains different primaries replicas = [node for node in self.get_replicas() if node != curr_node] if replicas: self.nodes_manager.default_node = random.choice(replicas) @@ -489,13 +491,23 @@ class initializer. In the case of conflicting arguments, querystring reason="Please configure the 'load_balancing_strategy' instead", version="5.0.3", ) + @deprecated_args( + args_to_warn=["cluster_error_retry_attempts"], + reason="Not needed anymore, the config will be part of the Retry object", + version="7.0.0", + ) def __init__( self, host: Optional[str] = None, port: int = 6379, startup_nodes: Optional[List["ClusterNode"]] = None, cluster_error_retry_attempts: int = 3, - retry: Optional["Retry"] = None, + retry_on_error: Optional[List[Type[Exception]]] = None, + retry: Retry = Retry( + backoff=ExponentialWithJitterBackoff(base=0.1, cap=5), + retries=3, + supported_errors=AbstractRedisCluster.CONNECTION_ERRORS_FOR_RETRY, + ), require_full_coverage: bool = False, reinitialize_steps: int = 5, read_from_replicas: bool = False, @@ -546,9 +558,20 @@ def __init__( If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. :param cluster_error_retry_attempts: + @deprecated - please use retry object configuration instead Number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered + :param retry_on_error: + A list of exceptions that should be retried - additional to the ones + configured in the 'retry'. + :param retry: + A Retry object that will be used to retry the command. + By default, the following + exceptions are retried: + - :class:`~.TimeoutError` + - :class:`~.ConnectionError` + - :class:`~.ClusterDownError` :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs @@ -623,11 +646,13 @@ def __init__( self.user_on_connect_func = kwargs.pop("redis_connect_func", None) kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) - if retry: - self.retry = retry - kwargs.update({"retry": self.retry}) - else: - kwargs.update({"retry": Retry(default_backoff(), 0)}) + + retry.update_retries(cluster_error_retry_attempts) + if retry_on_error: + retry.update_supported_errors(retry_on_error) + + self.retry = retry + self.default_node_needs_changing = False self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -769,13 +794,11 @@ def set_default_node(self, node): self.nodes_manager.default_node = node return True - def get_retry(self) -> Optional["Retry"]: + def get_retry(self) -> Retry: return self.retry - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.retry = retry - for node in self.get_nodes(): - node.redis_connection.set_retry(retry) def monitor(self, target_node=None): """ @@ -823,6 +846,7 @@ def pipeline(self, transaction=None, shard_hint=None): result_callbacks=self.result_callbacks, cluster_response_callbacks=self.cluster_response_callbacks, cluster_error_retry_attempts=self.cluster_error_retry_attempts, + retry=self.retry, read_from_replicas=self.read_from_replicas, load_balancing_strategy=self.load_balancing_strategy, reinitialize_steps=self.reinitialize_steps, @@ -1081,81 +1105,103 @@ def _parse_target_nodes(self, target_nodes): return nodes def execute_command(self, *args, **kwargs): - return self._internal_execute_command(*args, **kwargs) - - def _internal_execute_command(self, *args, **kwargs): - """ - Wrapper for ERRORS_ALLOW_RETRY error handling. - - It will try the number of times specified by the config option - "self.cluster_error_retry_attempts" which defaults to 3 unless manually - configured. + target_nodes_specified = False + target_nodes = [] + # the target_nodes should be extracted here, but not removed from the kwargs + # since we will need to extract them later in case they are defined as node flags + passed_targets = kwargs.get("target_nodes", None) + if passed_targets is not None and not self._is_nodes_flag(passed_targets): + target_nodes = self._parse_target_nodes(passed_targets) + target_nodes_specified = True - If it reaches the number of times, the command will raise the exception + res = {} + if target_nodes_specified: + # when target_nodes are provided we need to remove them from + # the kwargs prior command execution + kwargs.pop("target_nodes", None) + # when specific target node is provided we don't retry on connection errors + # TODO check what should we do for moved and ask errors + for node in target_nodes: + res[node.name] = self._execute_command(node, *args, **kwargs) + else: + res = self.retry.call_with_retry( + lambda: self._retryable_execute_command(*args, **kwargs), + lambda error: self._retry_error_handling(error), + ) + # remove the target_nodes from kwargs + # since it is not needed anymore + # and we don't want to pass it to the response callback + # or the result callback + kwargs.pop("target_nodes", None) + # Return the processed result + return self._process_result(args[0], res, **kwargs) + + def _retry_error_handling(self, error: Exception): + """ + Handle errors that are raised during command execution. + This method is called when a command fails and should specific + cluster behavior. + """ + if isinstance(error, ClusterDownError): + # ClusterDownError can occur during a failover and to get + # self-healed, we will try to reinitialize the cluster layout + # and retry executing the command + time.sleep(0.25) + self.nodes_manager.initialize() + elif isinstance(error, ConnectionError): + self.nodes_manager.initialize() + if self.default_node_needs_changing: + # If the default node is not reachable, we need to change it + self.replace_default_node() + + def _retryable_execute_command(self, *args, **kwargs): + """ Key argument :target_nodes: can be passed with the following types: nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM ClusterNode list dict """ - target_nodes_specified = False - is_default_node = False - target_nodes = None + res = {} passed_targets = kwargs.pop("target_nodes", None) - if passed_targets is not None and not self._is_nodes_flag(passed_targets): - target_nodes = self._parse_target_nodes(passed_targets) - target_nodes_specified = True - # If an error that allows retrying was thrown, the nodes and slots - # cache were reinitialized. We will retry executing the command with - # the updated cluster setup only when the target nodes can be - # determined again with the new cache tables. Therefore, when target - # nodes were passed to this function, we cannot retry the command - # execution since the nodes may not be valid anymore after the tables - # were reinitialized. So in case of passed target nodes, - # retry_attempts will be set to 0. - retry_attempts = ( - 0 if target_nodes_specified else self.cluster_error_retry_attempts - ) - # Add one for the first execution - execute_attempts = 1 + retry_attempts - for _ in range(execute_attempts): - try: - res = {} - if not target_nodes_specified: - # Determine the nodes to execute the command on - target_nodes = self._determine_nodes( - *args, **kwargs, nodes_flag=passed_targets - ) - if not target_nodes: - raise RedisClusterException( - f"No targets were found to execute {args} command on" - ) - if ( - len(target_nodes) == 1 - and target_nodes[0] == self.get_default_node() - ): - is_default_node = True - for node in target_nodes: - res[node.name] = self._execute_command(node, *args, **kwargs) - # Return the processed result - return self._process_result(args[0], res, **kwargs) - except Exception as e: - if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: - if is_default_node: - # Replace the default cluster node - self.replace_default_node() - # The nodes and slots cache were reinitialized. - # Try again with the new cluster setup. - retry_attempts -= 1 - continue - else: - # raise the exception - raise e + target_nodes = [] + try: + # Determine the nodes to execute the command on + target_nodes = self._determine_nodes( + *args, **kwargs, nodes_flag=passed_targets + ) + if not target_nodes: + raise RedisClusterException( + f"No targets were found to execute {args} command on" + ) + + for node in target_nodes: + res[node.name] = self._execute_command(node, *args, **kwargs) + + # Return the collected results + return res + except Exception: + if len(target_nodes) == 1 and target_nodes[0] == self.get_default_node(): + # Set a flag that we need to replace the default node in error handler + # after cluster is reinitialized + self.default_node_needs_changing = True + raise def _execute_command(self, target_node, *args, **kwargs): """ - Send a command to a node in the cluster + Send a command to a node in the cluster. + In this method certain errors are handled and retried. + The method does internal retries on the following errors: + - MovedError + - AskError + - TryAgainError + The method is called by the retry object. + + :param target_node: The node to send the command to + :param args: The command to send + :param kwargs: The command's arguments + :return: The response from the server """ command = args[0] redis_node = None @@ -1201,10 +1247,13 @@ def _execute_command(self, target_node, *args, **kwargs): ) return response except AuthenticationError: + # We need to raise this error without disconnecting the connection raise - except (ConnectionError, TimeoutError) as e: - # Connection retries are being handled in the node's - # Retry object. + except (ConnectionError, TimeoutError): + # Connection retries are handles in the cluster's retry object. + # Here we need to handle some operations related to target_node + # and connection which are not available in the error handler. + # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. @@ -1213,11 +1262,14 @@ def _execute_command(self, target_node, *args, **kwargs): # Remove the failed node from the startup nodes before we try # to reinitialize the cluster + # TODO Discuss if the startup node should be removed from here + # when we are not using dynamic_startup_nodes + # with RedisEnterprise the startup node should not be removed + # from the startup nodes list. self.nodes_manager.startup_nodes.pop(target_node.name, None) # Reset the cluster node's connection target_node.redis_connection = None - self.nodes_manager.initialize() - raise e + raise except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1241,19 +1293,13 @@ def _execute_command(self, target_node, *args, **kwargs): except AskError as e: redirect_addr = get_node_name(host=e.host, port=e.port) asking = True - except ClusterDownError as e: - # ClusterDownError can occur during a failover and to get - # self-healed, we will try to reinitialize the cluster layout - # and retry executing the command - time.sleep(0.25) - self.nodes_manager.initialize() - raise e except ResponseError: + # We need to raise this error without disconnecting the connection raise - except Exception as e: + except Exception: if connection: connection.disconnect() - raise e + raise finally: if connection is not None: redis_node.connection_pool.release(connection) @@ -1324,8 +1370,12 @@ def __eq__(self, obj): return isinstance(obj, ClusterNode) and obj.name == self.name def __del__(self): - if self.redis_connection is not None: - self.redis_connection.close() + try: + if self.redis_connection is not None: + self.redis_connection.close() + except Exception: + # Ignore errors when closing the connection + pass class LoadBalancingStrategy(Enum): @@ -1576,17 +1626,26 @@ def create_redis_connections(self, nodes): ) def create_redis_node(self, host, port, **kwargs): + # We are configuring the connection pool to not retry connections + # to avoid retrying connections to nodes that are not reachable + # and to avoid blocking the connection pool. + # The retries will be handled on cluster client level + # where we will have proper handling of the cluster topology + retry = Retry(backoff=NoBackoff(), retries=0) + if self.from_url: # Create a redis node with a costumed connection pool kwargs.update({"host": host}) kwargs.update({"port": port}) kwargs.update({"cache": self._cache}) + kwargs.update({"retry": retry}) r = Redis(connection_pool=self.connection_pool_class(**kwargs)) else: r = Redis( host=host, port=port, cache=self._cache, + retry=retry, **kwargs, ) return r @@ -2040,6 +2099,11 @@ def __init__( read_from_replicas: bool = False, load_balancing_strategy: Optional[LoadBalancingStrategy] = None, cluster_error_retry_attempts: int = 3, + retry: Retry = Retry( + backoff=ExponentialWithJitterBackoff(base=0.1, cap=5), + retries=3, + supported_errors=AbstractRedisCluster.CONNECTION_ERRORS_FOR_RETRY, + ), reinitialize_steps: int = 5, lock=None, **kwargs, @@ -2058,6 +2122,8 @@ def __init__( self.command_flags = self.__class__.COMMAND_FLAGS.copy() self.cluster_response_callbacks = cluster_response_callbacks self.cluster_error_retry_attempts = cluster_error_retry_attempts + retry.update_retries(cluster_error_retry_attempts) + self.retry = retry self.reinitialize_counter = 0 self.reinitialize_steps = reinitialize_steps self.encoder = Encoder( @@ -2201,7 +2267,7 @@ def send_cluster_commands( raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except RedisCluster.ERRORS_ALLOW_RETRY as e: + except RedisCluster.CONNECTION_ERRORS_FOR_RETRY as e: if retry_attempts > 0: # Try again with the new cluster setup. All other errors # should be raised. diff --git a/redis/connection.py b/redis/connection.py index 08e980e866..ffb1e37ba3 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1611,7 +1611,7 @@ def close(self) -> None: """Close the pool, disconnecting all connections""" self.disconnect() - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.connection_kwargs.update({"retry": retry}) for conn in self._available_connections: conn.retry = retry diff --git a/redis/retry.py b/redis/retry.py index 03fd973c4c..904eb24805 100644 --- a/redis/retry.py +++ b/redis/retry.py @@ -9,6 +9,12 @@ if TYPE_CHECKING: from redis.backoff import AbstractBackoff +DEFAULT_SUPPORTED_EXCEPTIONS = ( + ConnectionError, + TimeoutError, + socket.timeout, +) + class Retry: """Retry a specific number of times after a failure""" @@ -17,11 +23,7 @@ def __init__( self, backoff: "AbstractBackoff", retries: int, - supported_errors: Tuple[Type[Exception], ...] = ( - ConnectionError, - TimeoutError, - socket.timeout, - ), + supported_errors: Tuple[Type[Exception], ...] = DEFAULT_SUPPORTED_EXCEPTIONS, ): """ Initialize a `Retry` object with a `Backoff` object @@ -44,6 +46,12 @@ def update_supported_errors( set(self._supported_errors + tuple(specified_errors)) ) + def update_retries(self, retries: int) -> None: + """ + Updates the retries count. + """ + self._retries = retries + def call_with_retry( self, do: Callable[[], T], diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index a0429152ec..a9a268720b 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -818,7 +818,7 @@ async def test_all_nodes_masters(self, r: RedisCluster) -> None: for node in r.get_primaries(): assert node in nodes - @pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY) + @pytest.mark.parametrize("error", RedisCluster.CONNECTION_ERRORS_FOR_RETRY) async def test_cluster_down_overreaches_retry_attempts( self, error: Union[Type[TimeoutError], Type[ClusterDownError], Type[ConnectionError]], diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d96342f87a..a4acd12019 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -14,7 +14,12 @@ import redis from redis import Redis from redis._parsers import CommandsParser -from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff +from redis.backoff import ( + ExponentialBackoff, + ExponentialWithJitterBackoff, + NoBackoff, + default_backoff, +) from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -161,6 +166,7 @@ def get_mocked_redis_client( nodes and slots setup to remove the problem of different IP addresses on different installations and machines. """ + cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots) coverage_res = kwargs.pop("coverage_result", "yes") cluster_enabled = kwargs.pop("cluster_enabled", True) @@ -741,7 +747,7 @@ def test_all_nodes_masters(self, r): for node in r.get_primaries(): assert node in nodes - @pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY) + @pytest.mark.parametrize("error", RedisCluster.CONNECTION_ERRORS_FOR_RETRY) def test_cluster_down_overreaches_retry_attempts(self, error): """ When error that allows retry is thrown, test that we retry executing @@ -751,7 +757,6 @@ def test_cluster_down_overreaches_retry_attempts(self, error): with patch.object(RedisCluster, "_execute_command") as execute_command: def raise_error(target_node, *args, **kwargs): - execute_command.failed_calls += 1 raise error("mocked error") execute_command.side_effect = raise_error @@ -759,8 +764,11 @@ def raise_error(target_node, *args, **kwargs): rc = get_mocked_redis_client(host=default_host, port=default_port) with pytest.raises(error): - rc.get("bar") - assert execute_command.failed_calls == rc.cluster_error_retry_attempts + with patch.object(NodesManager, "initialize") as initilize_cluster: + initilize_cluster.return_value = default_cluster_slots + rc.get("bar") + + assert execute_command.call_count == rc.retry._retries + 1 def test_user_on_connect_function(self, request): """ @@ -863,7 +871,7 @@ def __init__(self, val=0): def moved_redirect_effect(connection, *args, **options): # raise a timeout for 5 times so we'll need to reinitialize the topology - if count.val == 4: + if count.val == 2: parse_response.side_effect = real_func count.val += 1 raise TimeoutError() @@ -887,7 +895,7 @@ def test_cluster_get_set_retry_object(self, request): assert r.get_retry()._retries == retry._retries assert isinstance(r.get_retry()._backoff, NoBackoff) for node in r.get_nodes(): - assert node.redis_connection.get_retry()._retries == retry._retries + assert node.redis_connection.get_retry()._retries == 0 assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) rand_node = r.get_random_node() existing_conn = rand_node.redis_connection.connection_pool.get_connection() @@ -897,34 +905,31 @@ def test_cluster_get_set_retry_object(self, request): assert r.get_retry()._retries == new_retry._retries assert isinstance(r.get_retry()._backoff, ExponentialBackoff) for node in r.get_nodes(): - assert node.redis_connection.get_retry()._retries == new_retry._retries - assert isinstance( - node.redis_connection.get_retry()._backoff, ExponentialBackoff - ) - assert existing_conn.retry._retries == new_retry._retries + # validate that for each node the retries are disabled + # we want the retries to be handled by the cluster + assert node.redis_connection.get_retry()._retries == 0 + assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + assert existing_conn.retry._retries == 0 new_conn = rand_node.redis_connection.connection_pool.get_connection() - assert new_conn.retry._retries == new_retry._retries + assert new_conn.retry._retries == 0 def test_cluster_retry_object(self, r) -> None: # Test default retry # FIXME: Workaround for https://github.com/redis/redis-py/issues/3030 host = r.get_default_node().host - retry = r.get_connection_kwargs().get("retry") + retry = r.get_retry() assert isinstance(retry, Retry) - assert retry._retries == 0 - assert isinstance(retry._backoff, type(default_backoff())) + assert retry._retries == 3 + assert isinstance(retry._backoff, ExponentialWithJitterBackoff) node1 = r.get_node(host, 16379).redis_connection node2 = r.get_node(host, 16380).redis_connection - assert node1.get_retry()._retries == node2.get_retry()._retries + assert node1.get_retry()._retries == 0 and node2.get_retry()._retries == 0 # Test custom retry retry = Retry(ExponentialBackoff(10, 5), 5) rc_custom_retry = RedisCluster(host, 16379, retry=retry) - assert ( - rc_custom_retry.get_node(host, 16379).redis_connection.get_retry()._retries - == retry._retries - ) + assert rc_custom_retry.get_retry()._retries == retry._retries def test_replace_cluster_node(self, r) -> None: prev_default_node = r.get_default_node() diff --git a/tests/test_retry.py b/tests/test_retry.py index e1e4c414a4..cb001fbbd5 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -159,7 +159,7 @@ def test_client_retry_on_error_raise(self, request): def test_client_retry_on_error_different_error_raised(self, request): with patch.object(Redis, "parse_response") as parse_response: - parse_response.side_effect = TimeoutError() + parse_response.side_effect = OSError() retries = 3 r = _get_client( Redis, @@ -167,7 +167,7 @@ def test_client_retry_on_error_different_error_raised(self, request): retry_on_error=[ReadOnlyError], retry=Retry(NoBackoff(), retries), ) - with pytest.raises(TimeoutError): + with pytest.raises(OSError): try: r.get("foo") finally: