Skip to content

Draft: Experimenting with retry strategy unification for standalone client and cluster client. #3607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,10 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
)
return dict(zip(keys, values))
except Exception as e:
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
if (
retry_attempts > 0
and type(e) in self.__class__.CONNECTION_ERRORS_FOR_RETRY
):
# The nodes and slots cache were should be reinitialized.
# Try again with the new cluster setup.
retry_attempts -= 1
Expand Down Expand Up @@ -1566,7 +1569,7 @@ async def execute(
allow_redirections=allow_redirections,
)

except self.__class__.ERRORS_ALLOW_RETRY as e:
except self.__class__.CONNECTION_ERRORS_FOR_RETRY as e:
if retry_attempts > 0:
# Try again with the new cluster setup. All other errors
# should be raised.
Expand Down Expand Up @@ -1657,7 +1660,10 @@ async def _execute(
for cmd in default_node[1]:
# Check if it has a command that failed with a relevant
# exception
if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
if (
type(cmd.result)
in self.__class__.CONNECTION_ERRORS_FOR_RETRY
):
client.replace_default_node()
break

Expand Down
80 changes: 45 additions & 35 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_RedisCallbacksRESP3,
bool_ok,
)
from redis.backoff import ExponentialWithJitterBackoff
from redis.cache import CacheConfig, CacheInterface
from redis.commands import (
CoreCommands,
Expand Down Expand Up @@ -58,6 +59,7 @@
from redis.utils import (
HIREDIS_AVAILABLE,
_set_info_logger,
deprecated_args,
get_lib_version,
safe_str,
str_if_bytes,
Expand Down Expand Up @@ -189,6 +191,11 @@ def from_pool(
client.auto_close_connection_pool = True
return client

@deprecated_args(
args_to_warn=["retry_on_timeout"],
reason="TimeoutError is included by default.",
version="6.1.0",
)
def __init__(
self,
host: str = "localhost",
Expand All @@ -203,8 +210,6 @@ def __init__(
unix_socket_path: Optional[str] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
charset: Optional[str] = None,
errors: Optional[str] = None,
decode_responses: bool = False,
retry_on_timeout: bool = False,
retry_on_error: Optional[List[Type[Exception]]] = None,
Expand All @@ -230,7 +235,9 @@ def __init__(
lib_name: Optional[str] = "redis-py",
lib_version: Optional[str] = get_lib_version(),
username: Optional[str] = None,
retry: Optional[Retry] = None,
retry: Optional[Retry] = Retry(
backoff=ExponentialWithJitterBackoff(base=0.1, cap=5), retries=3
),
redis_connect_func: Optional[Callable[[], None]] = None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
Expand All @@ -240,10 +247,24 @@ def __init__(
) -> None:
"""
Initialize a new Redis client.
To specify a retry policy for specific errors, first set
`retry_on_error` to a list of the error/s to retry on, then set
`retry` to a valid `Retry` object.
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.

To specify a retry policy for specific errors, you have two options:

1. Set the `retry_on_error` to a list of the error/s to retry on, and
you can also set `retry` to a valid `Retry` object(in case the default
one is not appropriate) - with this approach the retries will be triggered
on the default errors specified in the Retry object enriched with the
errors specified in `retry_on_error`.

2. Define a `Retry` object with configured 'supported_errors' and set
it to the `retry` parameter - with this approach you completely redefine
the errors on which retries will happen.

`retry_on_timeout` is deprecated - please include the TimeoutError
either in the Retry object or in the `retry_on_error` list.

When 'connection_pool' is provided - the retry configuration of the
provided pool will be used.

Args:

Expand All @@ -256,24 +277,8 @@ def __init__(
else:
self._event_dispatcher = event_dispatcher
if not connection_pool:
if charset is not None:
warnings.warn(
DeprecationWarning(
'"charset" is deprecated. Use "encoding" instead'
)
)
encoding = charset
if errors is not None:
warnings.warn(
DeprecationWarning(
'"errors" is deprecated. Use "encoding_errors" instead'
)
)
encoding_errors = errors
if not retry_on_error:
retry_on_error = []
if retry_on_timeout is True:
retry_on_error.append(TimeoutError)
kwargs = {
"db": db,
"username": username,
Expand Down Expand Up @@ -395,10 +400,10 @@ def get_connection_kwargs(self) -> Dict:
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
def get_retry(self) -> Retry:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
def set_retry(self, retry: Retry) -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

Expand Down Expand Up @@ -598,18 +603,20 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
conn.send_command(*args, **options)
return self.parse_response(conn, command_name, **options)

def _disconnect_raise(self, conn, error):
def _conditional_disconnect(self, conn, error) -> None:
"""
Close the connection and raise an exception
if retry_on_error is not set or the error
is not one of the specified error types
Close the connection if the error is not TimeoutError.
The supported exceptions are already checked in the
retry object so we don't need to do it here.
After we disconnect the connection, it will try to reconnect and
do a health check as part of the send_command logic(on connection level).
"""
if isinstance(error, TimeoutError):
# If the error is a TimeoutError, we don't want to
# disconnect the connection. We want to retry the command.
return

conn.disconnect()
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error

# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
Expand All @@ -619,17 +626,20 @@ def _execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
pool = self.connection_pool
command_name = args[0]

conn = self.connection or pool.get_connection()

if self._single_connection_client:
self.single_connection_lock.acquire()

try:
return conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_raise(conn, error),
lambda error: self._conditional_disconnect(conn, error),
)

finally:
if self._single_connection_client:
self.single_connection_lock.release()
Expand Down
Loading
Loading