diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 70dadd0b..a8f4d551 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -277,7 +277,7 @@ def make_connection(self, node): return connection - def release(self, connection): + def release(self, connection, add_to_pool=True): """ Releases the connection back to the pool """ @@ -296,7 +296,15 @@ def release(self, connection): pass # TODO: Log.warning("Tried to release connection that did not exist any longer : {0}".format(connection)) - self._available_connections.setdefault(connection.node["name"], []).append(connection) + if add_to_pool: + self._available_connections.setdefault(connection.node["name"], []).append(connection) + else: + # If we don't add it back to the pool it shouldn't count towards the + # connection pool, or we'll artificially reduce the maximum size of the + # pool + self._created_connections_per_node.setdefault(node['name'], 0) + if self._created_connections_per_node[connection.node["name"]] > 0: + self._created_connections_per_node[connection.node["name"]] -= 1 def disconnect(self): """ @@ -538,7 +546,7 @@ def get_connection_by_node(self, node): return connection - def release(self, connection): + def release(self, connection, add_to_pool=True): """ Releases the connection back to the pool """ @@ -546,9 +554,13 @@ def release(self, connection): if connection.pid != self.pid: return + # In some cases we don't want to add back this connection to the pool but + # we still want to free its slot + conn_to_add = connection if add_to_pool else None + # Put the connection back into the pool. try: - self._get_pool(connection.node).put_nowait(connection) + self._get_pool(connection.node).put_nowait(conn_to_add) except Full: # perhaps the pool has been reset() after a fork? regardless, # we don't want this connection diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 3eae7a39..e00e36ae 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -203,7 +203,15 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= # we can build a list of commands for each node. node_name = node['name'] if node_name not in nodes: - nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + try: + nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + except: + # Sommething happened, maybe the pool is full, we need to release any connection + # we've taken or we'll leak them. Because we're not sure if the connections are + # in a good state we'll release their slot but not reuse them + for n in nodes.values(): + self.connection_pool.release(n.connection, add_to_pool=False) + raise nodes[node_name].append(c)