Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,18 +959,38 @@ async def on_connect(self, connection: Connection):
# NOTE: for python3, we can't pass bytestrings as keyword arguments
# so we need to decode channel/pattern names back to unicode strings
# before passing them to [p]subscribe.
#
# However, channels subscribed without a callback (positional args) may
# have binary names that are not valid in the current encoding (e.g.
# arbitrary bytes that are not valid UTF-8). These channels are stored
# with a ``None`` handler. We re-subscribe them as positional args so
# that no decoding is required.
self.pending_unsubscribe_channels.clear()
self.pending_unsubscribe_patterns.clear()
if self.channels:
channels = {}
channels_with_handlers = {}
channels_without_handlers = []
for k, v in self.channels.items():
channels[self.encoder.decode(k, force=True)] = v
await self.subscribe(**channels)
if v is not None:
channels_with_handlers[self.encoder.decode(k, force=True)] = v
else:
channels_without_handlers.append(k)
if channels_with_handlers or channels_without_handlers:
await self.subscribe(
*channels_without_handlers, **channels_with_handlers
)
if self.patterns:
patterns = {}
patterns_with_handlers = {}
patterns_without_handlers = []
for k, v in self.patterns.items():
patterns[self.encoder.decode(k, force=True)] = v
await self.psubscribe(**patterns)
if v is not None:
patterns_with_handlers[self.encoder.decode(k, force=True)] = v
else:
patterns_without_handlers.append(k)
if patterns_with_handlers or patterns_without_handlers:
await self.psubscribe(
*patterns_without_handlers, **patterns_with_handlers
)

@property
def subscribed(self):
Expand Down
46 changes: 33 additions & 13 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,25 +898,45 @@ def on_connect(self, connection) -> None:
# NOTE: for python3, we can't pass bytestrings as keyword arguments
# so we need to decode channel/pattern names back to unicode strings
# before passing them to [p]subscribe.
#
# However, channels subscribed without a callback (positional args) may
# have binary names that are not valid in the current encoding (e.g.
# arbitrary bytes that are not valid UTF-8). These channels are stored
# with a ``None`` handler. We re-subscribe them as positional args so
# that no decoding is required.
self.pending_unsubscribe_channels.clear()
self.pending_unsubscribe_patterns.clear()
self.pending_unsubscribe_shard_channels.clear()
if self.channels:
channels = {
self.encoder.decode(k, force=True): v for k, v in self.channels.items()
}
self.subscribe(**channels)
channels_with_handlers = {}
channels_without_handlers = []
for k, v in self.channels.items():
if v is not None:
channels_with_handlers[self.encoder.decode(k, force=True)] = v
else:
channels_without_handlers.append(k)
if channels_with_handlers or channels_without_handlers:
self.subscribe(*channels_without_handlers, **channels_with_handlers)
if self.patterns:
patterns = {
self.encoder.decode(k, force=True): v for k, v in self.patterns.items()
}
self.psubscribe(**patterns)
patterns_with_handlers = {}
patterns_without_handlers = []
for k, v in self.patterns.items():
if v is not None:
patterns_with_handlers[self.encoder.decode(k, force=True)] = v
else:
patterns_without_handlers.append(k)
if patterns_with_handlers or patterns_without_handlers:
self.psubscribe(*patterns_without_handlers, **patterns_with_handlers)
if self.shard_channels:
shard_channels = {
self.encoder.decode(k, force=True): v
for k, v in self.shard_channels.items()
}
self.ssubscribe(**shard_channels)
shard_with_handlers = {}
shard_without_handlers = []
for k, v in self.shard_channels.items():
if v is not None:
shard_with_handlers[self.encoder.decode(k, force=True)] = v
else:
shard_without_handlers.append(k)
if shard_with_handlers or shard_without_handlers:
self.ssubscribe(*shard_without_handlers, **shard_with_handlers)

@property
def subscribed(self) -> bool:
Expand Down
44 changes: 44 additions & 0 deletions tests/test_asyncio/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,50 @@ async def test_resubscribe_to_patterns_on_reconnection(self, pubsub):
kwargs = make_subscribe_test_data(pubsub, "pattern")
await self._test_resubscribe_on_reconnection(**kwargs)

async def test_resubscribe_binary_channel_on_reconnection(self, pubsub):
"""Binary channel names that are not valid UTF-8 must survive
reconnection without raising ``UnicodeDecodeError``.
See https://github.com/redis/redis-py/issues/3912
"""
# b'\x80\x81\x82' is deliberately invalid UTF-8
binary_channel = b"\x80\x81\x82"
p = pubsub
await p.subscribe(binary_channel)
assert await wait_for_message(p) is not None # consume subscribe ack

# force reconnect
await p.connection.disconnect()

# get_message triggers on_connect → re-subscribe; must not raise
messages = []
for _ in range(1):
messages.append(await wait_for_message(p))

assert len(messages) == 1
assert messages[0]["type"] == "subscribe"
assert messages[0]["channel"] == binary_channel

Comment on lines +178 to +185
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_message(p) can return None on timeout; appending it and then indexing messages[0]["type"] will raise a TypeError and obscure the underlying failure. Consider asserting the returned message is not None (or directly asserting equality vs an expected subscribe/psubscribe message) before subscripting it.

Copilot uses AI. Check for mistakes.
async def test_resubscribe_binary_pattern_on_reconnection(self, pubsub):
"""Binary pattern names that are not valid UTF-8 must survive
reconnection without raising ``UnicodeDecodeError``.
See https://github.com/redis/redis-py/issues/3912
"""
binary_pattern = b"\x80\x81*"
p = pubsub
await p.psubscribe(binary_pattern)
assert await wait_for_message(p) is not None # consume psubscribe ack

# force reconnect
await p.connection.disconnect()

messages = []
for _ in range(1):
messages.append(await wait_for_message(p))

assert len(messages) == 1
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the channel test: if wait_for_message(p) times out and returns None, subscripting messages[0] will raise TypeError rather than producing a clear assertion failure. Add an explicit is not None assertion (or compare to an expected psubscribe message) before accessing keys.

Suggested change
assert len(messages) == 1
assert len(messages) == 1
assert messages[0] is not None

Copilot uses AI. Check for mistakes.
assert messages[0]["type"] == "psubscribe"
assert messages[0]["channel"] == binary_pattern

async def _test_subscribed_property(
self, p, sub_type, unsub_type, sub_func, unsub_func, keys
):
Expand Down
52 changes: 49 additions & 3 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,52 @@ def test_resubscribe_to_shard_channels_on_reconnection(self, r):
kwargs = make_subscribe_test_data(r.pubsub(), "shard_channel")
self._test_resubscribe_on_reconnection(**kwargs)

@pytest.mark.onlynoncluster
def test_resubscribe_binary_channel_on_reconnection(self, r):
"""Binary channel names that are not valid UTF-8 must survive
reconnection without raising ``UnicodeDecodeError``.
See https://github.com/redis/redis-py/issues/3912
"""
# b'\x80\x81\x82' is deliberately invalid UTF-8
binary_channel = b"\x80\x81\x82"
p = r.pubsub()
p.subscribe(binary_channel)
assert wait_for_message(p) is not None # consume subscribe ack

# force reconnect
p.connection.disconnect()

# get_message triggers on_connect → re-subscribe; must not raise
messages = []
for _ in range(1):
messages.append(wait_for_message(p))

assert len(messages) == 1
assert messages[0]["type"] == "subscribe"
assert messages[0]["channel"] == binary_channel
Comment on lines +215 to +221
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_message(p) can return None on timeout; appending it and then indexing messages[0]["type"] will raise a TypeError and obscure the real failure. Consider asserting the returned message is not None (or directly asserting equality vs an expected subscribe/psubscribe message) before subscripting it.

Copilot uses AI. Check for mistakes.

@pytest.mark.onlynoncluster
def test_resubscribe_binary_pattern_on_reconnection(self, r):
"""Binary pattern names that are not valid UTF-8 must survive
reconnection without raising ``UnicodeDecodeError``.
See https://github.com/redis/redis-py/issues/3912
"""
binary_pattern = b"\x80\x81*"
p = r.pubsub()
p.psubscribe(binary_pattern)
assert wait_for_message(p) is not None # consume psubscribe ack

# force reconnect
p.connection.disconnect()

messages = []
for _ in range(1):
messages.append(wait_for_message(p))

assert len(messages) == 1
assert messages[0]["type"] == "psubscribe"
assert messages[0]["channel"] == binary_pattern
Comment on lines +237 to +243
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the channel test: if wait_for_message(p) times out and returns None, subscripting messages[0] will raise TypeError and make failures harder to diagnose. Add an explicit is not None assertion (or compare to an expected psubscribe message) before accessing messages[0].

Copilot uses AI. Check for mistakes.

def _test_subscribed_property(
self, p, sub_type, unsub_type, sub_func, unsub_func, keys
):
Expand Down Expand Up @@ -1122,9 +1168,9 @@ def test_get_message_wait_for_subscription_not_being_called(self, r):
# Ensure p has the event attribute your wait_for_message would call:
ev = getattr(p, "subscribed_event", None)

assert ev is not None, (
"PubSub event attribute not found (check redis-py version)"
)
assert (
ev is not None
), "PubSub event attribute not found (check redis-py version)"

with patch.object(ev, "wait") as mock:
assert wait_for_message(p) == make_message("subscribe", "foo", 1)
Expand Down
Loading