Skip to content

Commit eec9182

Browse files
mathisluckaAmnah199davidsbatista
authored
fix: pipeline run bugs in cyclic and acyclic pipelines (#8707)
* add component checks * pipeline should run deterministically * add FIFOQueue * add agent tests * add order dependent tests * run new tests * remove code that is not needed * test: intermediate from cycle outputs are available outside cycle * add tests for component checks (Claude) * adapt tests for component checks (o1 review) * chore: format * remove tests that aren't needed anymore * add _calculate_priority tests * revert accidental change in pyproject.toml * test format conversion * adapt to naming convention * chore: proper docstrings and type hints for PQ * format * add more unit tests * rm unneeded comments * test input consumption * lint * fix: docstrings * lint * format * format * fix license header * fix license header * add component run tests * fix: pass correct input format to tracing * fix types * format * format * types * add defaults from Socket instead of signature - otherwise components with dynamic inputs would fail * fix test names * still wait for optional inputs on greedy variadic sockets - mirrors previous behavior * fix format * wip: warn for ambiguous running order * wip: alternative warning * fix license header * make code more readable Co-authored-by: Amna Mubashar <[email protected]> * Introduce content tracing to a behavioral test * Fixing linting * Remove debug print statements * Fix tracer tests * remove print * test: test for component inputs * test: remove testing for run order * chore: update component checks from experimental * chore: update pipeline and base from experimental * refactor: remove unused method * refactor: remove unused method * refactor: outdated comment * refactor: inputs state is updated as side effect - to prepare for AsyncPipeline implementation * format * test: add file conversion test * format * fix: original implementation deepcopies outputs * lint * fix: from_dict was updated * fix: format * fix: test * test: add test for thread safety * remove unused imports * format * test: FIFOPriorityQueue * chore: add release note * fix: resolve merge conflict with mermaid changes * fix: format * fix: remove unused import * refactor: rename to avoid accidental conflicts * chore: remove unused inputs, add missing license header * chore: extend release notes * Update releasenotes/notes/fix-pipeline-run-2fefeafc705a6d91.yaml Co-authored-by: Amna Mubashar <[email protected]> * fix: format * fix: format * Update release note --------- Co-authored-by: Amna Mubashar <[email protected]> Co-authored-by: David S. Batista <[email protected]>
1 parent 0530049 commit eec9182

18 files changed

+6367
-2765
lines changed

Diff for: haystack/core/pipeline/base.py

+248-526
Large diffs are not rendered by default.

Diff for: haystack/core/pipeline/component_checks.py

+245
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
from typing import Any, Dict, List
6+
7+
from haystack.core.component.types import InputSocket, _empty
8+
9+
_NO_OUTPUT_PRODUCED = _empty
10+
11+
12+
def can_component_run(component: Dict, inputs: Dict) -> bool:
13+
"""
14+
Checks if the component can run, given the current state of its inputs.
15+
16+
A component needs to pass two gates so that it is ready to run:
17+
1. It has received all mandatory inputs.
18+
2. It has received a trigger.
19+
:param component: Component metadata and the component instance.
20+
:param inputs: Inputs for the component.
21+
"""
22+
received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True)
23+
received_trigger = has_any_trigger(component, inputs)
24+
25+
return received_all_mandatory_inputs and received_trigger
26+
27+
28+
def has_any_trigger(component: Dict, inputs: Dict) -> bool:
29+
"""
30+
Checks if a component was triggered to execute.
31+
32+
There are 3 triggers:
33+
1. A predecessor provided input to the component.
34+
2. Input to the component was provided from outside the pipeline (e.g. user input).
35+
3. The component does not receive input from any other components in the pipeline and `Pipeline.run` was called.
36+
37+
A trigger can only cause a component to execute ONCE because:
38+
1. Components consume inputs from predecessors before execution (they are deleted).
39+
2. Inputs from outside the pipeline can only trigger a component when it is executed for the first time.
40+
3. `Pipeline.run` can only trigger a component when it is executed for the first time.
41+
42+
:param component: Component metadata and the component instance.
43+
:param inputs: Inputs for the component.
44+
"""
45+
trigger_from_predecessor = any_predecessors_provided_input(component, inputs)
46+
trigger_from_user = has_user_input(inputs) and component["visits"] == 0
47+
trigger_without_inputs = can_not_receive_inputs_from_pipeline(component) and component["visits"] == 0
48+
49+
return trigger_from_predecessor or trigger_from_user or trigger_without_inputs
50+
51+
52+
def are_all_sockets_ready(component: Dict, inputs: Dict, only_check_mandatory: bool = False) -> bool:
53+
"""
54+
Checks if all sockets of a component have enough inputs for the component to execute.
55+
56+
:param component: Component metadata and the component instance.
57+
:param inputs: Inputs for the component.
58+
:param only_check_mandatory: If only mandatory sockets should be checked.
59+
"""
60+
filled_sockets = set()
61+
expected_sockets = set()
62+
if only_check_mandatory:
63+
sockets_to_check = {
64+
socket_name: socket for socket_name, socket in component["input_sockets"].items() if socket.is_mandatory
65+
}
66+
else:
67+
sockets_to_check = {
68+
socket_name: socket
69+
for socket_name, socket in component["input_sockets"].items()
70+
if socket.is_mandatory or len(socket.senders)
71+
}
72+
73+
for socket_name, socket in sockets_to_check.items():
74+
socket_inputs = inputs.get(socket_name, [])
75+
expected_sockets.add(socket_name)
76+
77+
# Check if socket has all required inputs or is a lazy variadic socket with any input
78+
if has_socket_received_all_inputs(socket, socket_inputs) or (
79+
is_socket_lazy_variadic(socket) and any_socket_input_received(socket_inputs)
80+
):
81+
filled_sockets.add(socket_name)
82+
83+
return filled_sockets == expected_sockets
84+
85+
86+
def any_predecessors_provided_input(component: Dict, inputs: Dict) -> bool:
87+
"""
88+
Checks if a component received inputs from any predecessors.
89+
90+
:param component: Component metadata and the component instance.
91+
:param inputs: Inputs for the component.
92+
"""
93+
return any(
94+
any_socket_value_from_predecessor_received(inputs.get(socket_name, []))
95+
for socket_name in component["input_sockets"].keys()
96+
)
97+
98+
99+
def any_socket_value_from_predecessor_received(socket_inputs: List[Dict[str, Any]]) -> bool:
100+
"""
101+
Checks if a component socket received input from any predecessors.
102+
103+
:param socket_inputs: Inputs for the component's socket.
104+
"""
105+
# When sender is None, the input was provided from outside the pipeline.
106+
return any(inp["value"] != _NO_OUTPUT_PRODUCED and inp["sender"] is not None for inp in socket_inputs)
107+
108+
109+
def has_user_input(inputs: Dict) -> bool:
110+
"""
111+
Checks if a component has received input from outside the pipeline (e.g. user input).
112+
113+
:param inputs: Inputs for the component.
114+
"""
115+
return any(inp for socket in inputs.values() for inp in socket if inp["sender"] is None)
116+
117+
118+
def can_not_receive_inputs_from_pipeline(component: Dict) -> bool:
119+
"""
120+
Checks if a component can not receive inputs from any other components in the pipeline.
121+
122+
:param: Component metadata and the component instance.
123+
"""
124+
return all(len(sock.senders) == 0 for sock in component["input_sockets"].values())
125+
126+
127+
def all_socket_predecessors_executed(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
128+
"""
129+
Checks if all components connecting to an InputSocket have executed.
130+
131+
:param: The InputSocket of a component.
132+
:param: socket_inputs: Inputs for the socket.
133+
"""
134+
expected_senders = set(socket.senders)
135+
executed_senders = {inp["sender"] for inp in socket_inputs if inp["sender"] is not None}
136+
137+
return expected_senders == executed_senders
138+
139+
140+
def any_socket_input_received(socket_inputs: List[Dict]) -> bool:
141+
"""
142+
Checks if a socket has received any input from any other components in the pipeline or from outside the pipeline.
143+
144+
:param socket_inputs: Inputs for the socket.
145+
"""
146+
return any(inp["value"] != _NO_OUTPUT_PRODUCED for inp in socket_inputs)
147+
148+
149+
def has_lazy_variadic_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
150+
"""
151+
Checks if a lazy variadic socket has received all expected inputs from other components in the pipeline.
152+
153+
:param socket: The InputSocket of a component.
154+
:param socket_inputs: Inputs for the socket.
155+
"""
156+
expected_senders = set(socket.senders)
157+
actual_senders = {
158+
sock["sender"] for sock in socket_inputs if sock["value"] != _NO_OUTPUT_PRODUCED and sock["sender"] is not None
159+
}
160+
161+
return expected_senders == actual_senders
162+
163+
164+
def is_socket_lazy_variadic(socket: InputSocket) -> bool:
165+
"""
166+
Checks if an InputSocket is a lazy variadic socket.
167+
168+
:param socket: The InputSocket of a component.
169+
"""
170+
return socket.is_variadic and not socket.is_greedy
171+
172+
173+
def has_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
174+
"""
175+
Checks if a socket has received all expected inputs.
176+
177+
:param socket: The InputSocket of a component.
178+
:param socket_inputs: Inputs for the socket.
179+
"""
180+
# No inputs received for the socket, it is not filled.
181+
if len(socket_inputs) == 0:
182+
return False
183+
184+
# The socket is greedy variadic and at least one input was produced, it is complete.
185+
if socket.is_variadic and socket.is_greedy and any(sock["value"] != _NO_OUTPUT_PRODUCED for sock in socket_inputs):
186+
return True
187+
188+
# The socket is lazy variadic and all expected inputs were produced.
189+
if is_socket_lazy_variadic(socket) and has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs):
190+
return True
191+
192+
# The socket is not variadic and the only expected input is complete.
193+
return not socket.is_variadic and socket_inputs[0]["value"] != _NO_OUTPUT_PRODUCED
194+
195+
196+
def all_predecessors_executed(component: Dict, inputs: Dict) -> bool:
197+
"""
198+
Checks if all predecessors of a component have executed.
199+
200+
:param component: Component metadata and the component instance.
201+
:param inputs: Inputs for the component.
202+
"""
203+
return all(
204+
all_socket_predecessors_executed(socket, inputs.get(socket_name, []))
205+
for socket_name, socket in component["input_sockets"].items()
206+
)
207+
208+
209+
def are_all_lazy_variadic_sockets_resolved(component: Dict, inputs: Dict) -> bool:
210+
"""
211+
Checks if the final state for all lazy variadic sockets of a component is resolved.
212+
213+
:param component: Component metadata and the component instance.
214+
:param inputs: Inputs for the component.
215+
"""
216+
for socket_name, socket in component["input_sockets"].items():
217+
if is_socket_lazy_variadic(socket):
218+
socket_inputs = inputs.get(socket_name, [])
219+
220+
# Checks if a lazy variadic socket is ready to run.
221+
# A socket is ready if either:
222+
# - it has received all expected inputs, or
223+
# - all its predecessors have executed
224+
# If none of the conditions are met, the socket is not ready to run and we defer the component.
225+
if not (
226+
has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
227+
or all_socket_predecessors_executed(socket, socket_inputs)
228+
):
229+
return False
230+
231+
return True
232+
233+
234+
def is_any_greedy_socket_ready(component: Dict, inputs: Dict) -> bool:
235+
"""
236+
Checks if the component has any greedy socket that is ready to run.
237+
238+
:param component: Component metadata and the component instance.
239+
:param inputs: Inputs for the component.
240+
"""
241+
for socket_name, socket in component["input_sockets"].items():
242+
if socket.is_greedy and has_socket_received_all_inputs(socket, inputs.get(socket_name, [])):
243+
return True
244+
245+
return False

0 commit comments

Comments
 (0)