Skip to content

Commit 7a67b96

Browse files
committed
Allow multi-node commands in async pipeline
1 parent 00f5be4 commit 7a67b96

File tree

1 file changed

+40
-15
lines changed

1 file changed

+40
-15
lines changed

Diff for: redis/asyncio/cluster.py

+40-15
Original file line numberDiff line numberDiff line change
@@ -1070,12 +1070,13 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10701070
ret = False
10711071
for cmd in commands:
10721072
try:
1073-
cmd.result = await self.parse_response(
1073+
result = await self.parse_response(
10741074
connection, cmd.args[0], **cmd.kwargs
10751075
)
10761076
except Exception as e:
1077-
cmd.result = e
1077+
result = e
10781078
ret = True
1079+
cmd.set_node_result(self.name, result)
10791080

10801081
# Release connection
10811082
self._free.append(connection)
@@ -1530,12 +1531,11 @@ async def _execute(
15301531
raise RedisClusterException(
15311532
f"No targets were found to execute {cmd.args} command on"
15321533
)
1533-
if len(target_nodes) > 1:
1534-
raise RedisClusterException(f"Too many targets for command {cmd.args}")
1535-
node = target_nodes[0]
1536-
if node.name not in nodes:
1537-
nodes[node.name] = (node, [])
1538-
nodes[node.name][1].append(cmd)
1534+
cmd.target_nodes = target_nodes
1535+
for node in target_nodes:
1536+
if node.name not in nodes:
1537+
nodes[node.name] = (node, [])
1538+
nodes[node.name][1].append(cmd)
15391539

15401540
errors = await asyncio.gather(
15411541
*(
@@ -1550,20 +1550,27 @@ async def _execute(
15501550
for cmd in todo:
15511551
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
15521552
try:
1553-
cmd.result = await client.execute_command(
1553+
result = await client.execute_command(
15541554
*cmd.args, **cmd.kwargs
15551555
)
15561556
except Exception as e:
1557-
cmd.result = e
1557+
result = e
1558+
1559+
if isinstance(result, dict):
1560+
cmd.result = result
1561+
else:
1562+
cmd.set_node_result(cmd.target_nodes[0].name, result)
15581563

15591564
if raise_on_error:
15601565
for cmd in todo:
1561-
result = cmd.result
1562-
if isinstance(result, Exception):
1566+
name_exc = cmd.get_first_exception()
1567+
if name_exc:
1568+
name, exc = name_exc
15631569
command = " ".join(map(safe_str, cmd.args))
15641570
msg = (
15651571
f"Command # {cmd.position + 1} ({command}) of pipeline "
1566-
f"caused error: {result.args}"
1572+
f"caused error on node {name}: "
1573+
f"{result.args}"
15671574
)
15681575
result.args = (msg,) + result.args[1:]
15691576
raise result
@@ -1581,7 +1588,7 @@ async def _execute(
15811588
client.replace_default_node()
15821589
break
15831590

1584-
return [cmd.result for cmd in stack]
1591+
return [cmd.unwrap_result() for cmd in stack]
15851592

15861593
def _split_command_across_slots(
15871594
self, command: str, *keys: KeyT
@@ -1620,7 +1627,25 @@ def __init__(self, position: int, *args: Any, **kwargs: Any) -> None:
16201627
self.args = args
16211628
self.kwargs = kwargs
16221629
self.position = position
1623-
self.result: Union[Any, Exception] = None
1630+
self.result: Dict[str, Union[Any, Exception]] = {}
1631+
self.target_nodes = None
1632+
1633+
def set_node_result(self, node_name: str, result: Union[Any, Exception]):
1634+
self.result[node_name] = result
1635+
1636+
def unwrap_result(
1637+
self,
1638+
) -> Optional[Union[Union[Any, Exception], Dict[str, Union[Any, Exception]]]]:
1639+
if len(self.result) == 0:
1640+
return None
1641+
if len(self.result) == 1:
1642+
return next(iter(self.result.values()))
1643+
return self.result
1644+
1645+
def get_first_exception(self) -> Optional[Tuple[str, Exception]]:
1646+
return next(
1647+
((n, r) for n, r in self.result.items() if isinstance(r, Exception)), None
1648+
)
16241649

16251650
def __repr__(self) -> str:
16261651
return f"[{self.position}] {self.args} ({self.kwargs})"

0 commit comments

Comments
 (0)