Skip to content

Fix issues with ClusterPipeline connection management #3804

Open
praboud wants to merge 7 commits intoredis:masterfrom
praboud:pipeline-conns
Open

Fix issues with ClusterPipeline connection management #3804
praboud wants to merge 7 commits intoredis:masterfrom
praboud:pipeline-conns

Conversation

@praboud
Copy link
Contributor

@praboud praboud commented Oct 18, 2025

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

ClusterPipeline doesn't correctly handle returning connections to the connection pools.

  1. It's possible to leak connections if an error other than ConnectionError or TimeoutError is thrown when establishing connections. We only catch those specific errors (and the try/catch doesn't wrap the entire area where we're getting the connections); if others are thrown, the exception is simply raised to the caller, and the connections are never returned to their respective pools.
  2. More problematically, it's possible to return dirty connections to the pool if an error is thrown after at least one of the connections has been written to, and before all of the connections were read from. This can cause pretty bad correctness issues, where the client gets a response intended for a different request.

This diff addresses these issues by wrapping the entire area in a try/catch with a finally that ensures the connections are released, but which closes the connections if we have read but not written to the connection.

# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
while True:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I've completely lost the plot, this used to be a while True loop which always breaks on the first iteration, so this does ~nothing.

@petyaslavova
Copy link
Collaborator

Hi @praboud, thank you for your contribution! We will review your changes soon.

@praboud
Copy link
Contributor Author

praboud commented Nov 11, 2025

@petyaslavova hey, just checking in on when you'll have a chance to review this PR.

@praboud
Copy link
Contributor Author

praboud commented Dec 9, 2025

@petyaslavova this PR addresses a fairly serious correctness issue with the Pipeline implementation, which can result in eg: one request getting the data from another request. If those two requests are eg: handling different users' data, this could be a really problematic security or privacy issue. I'm currently using a forked version of redis-py with this fix patched in, but I would love to get this upstreamed so that others don't run into the same problem. Do you have a sense of when it would be possible to get this reviewed? (I know there's a merge conflict - if you're +1 on the general approach in this PR, I can do a pass to resolve the conflict.)

@praboud
Copy link
Contributor Author

praboud commented Dec 10, 2025

Sorry, I just saw your comment on #3803 (comment) re: timing. Thanks for the update there, though I am a little worried that other folks using this library are unknowingly having correctness problems with the bug that this PR addresses .

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes critical connection management issues in ClusterPipeline that could lead to connection leaks and data corruption. The PR addresses two main problems: (1) connections being leaked when unexpected errors occur during connection establishment, and (2) dirty connections (with unread responses) being returned to the pool, causing subsequent requests to receive responses from previous requests.

Changes:

  • Wrapped the entire command execution flow in a try/finally block to ensure connections are always released
  • Added connection dirty state tracking (nodes_written/nodes_read counters) to detect connections that have been written to but not read from
  • Added logic to disconnect dirty connections before returning them to the pool to prevent response mixing

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
redis/cluster.py Fixed connection management by adding a comprehensive try/finally block with dirty connection detection and cleanup logic; added type hints to NodeCommands constructor
tests/test_cluster.py Added two test cases to verify connection leak prevention and dirty connection handling

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jit-ci
Copy link

jit-ci bot commented Jan 16, 2026

❌ Security scan failed

Security scan failed: Branch pipeline-conns does not exist in the remote repository


💡 Need to bypass this check? Comment @sera bypass to override.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@jit-ci
Copy link

jit-ci bot commented Jan 16, 2026

❌ Security scan failed

Security scan failed: Branch pipeline-conns does not exist in the remote repository


💡 Need to bypass this check? Comment @sera bypass to override.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@jit-ci
Copy link

jit-ci bot commented Jan 16, 2026

❌ Security scan failed

Security scan failed: Branch pipeline-conns does not exist in the remote repository


💡 Need to bypass this check? Comment @sera bypass to override.

self._nodes_manager.initialize()
if is_default_node:
self._pipe.replace_default_node()
nodes = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be done in the finally block?

connection = get_connection(redis_node)
except (ConnectionError, TimeoutError):
for n in nodes.values():
n.connection_pool.release(n.connection)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove the releasing of the connections from here - they will be released in the finally block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants