Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix running Pipeline with conditional branch and Component wit… #7968

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def _init_inputs_state(self, data: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[

return {**data}

def _init_to_run(self) -> List[Tuple[str, Component]]:
def _init_to_run(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]:
to_run: List[Tuple[str, Component]] = []
for node_name in self.graph.nodes:
component = self.graph.nodes[node_name]["instance"]
Expand All @@ -729,6 +729,11 @@ def _init_to_run(self) -> List[Tuple[str, Component]]:
to_run.append((node_name, component))
continue

if node_name in pipeline_inputs:
# This component is in the input data, if it has enough inputs it can run right away
to_run.append((node_name, component))
continue

for socket in component.__haystack_input__._sockets_dict.values():
if not socket.senders or socket.is_variadic:
# Component has at least one input not connected or is variadic, can run right away.
Expand Down
19 changes: 14 additions & 5 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ def run(self, word: str):
# Initialize the inputs state
last_inputs: Dict[str, Dict[str, Any]] = self._init_inputs_state(data)

# Take all components that have at least 1 input not connected or is variadic,
# and all components that have no inputs at all
to_run: List[Tuple[str, Component]] = self._init_to_run()
# Take all components that:
# - have no inputs
# - receive input from the user
# - have at least one input not connected
# - have at least one input that is variadic
to_run: List[Tuple[str, Component]] = self._init_to_run(data)

# These variables are used to detect when we're stuck in a loop.
# Stuck loops can happen when one or more components are waiting for input but
Expand Down Expand Up @@ -232,8 +235,15 @@ def run(self, word: str):
if name != sender_component_name:
continue

pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
if edge_data["from_socket"].name not in res:
# This output has not been produced by the component, skip it
# The component didn't produce any output for this socket.
# We can't run the receiver, let's remove it from the list of components to run
# or we risk running it if it's in those lists.
if pair in to_run:
to_run.remove(pair)
if pair in waiting_for_input:
waiting_for_input.remove(pair)
continue

if receiver_component_name not in last_inputs:
Expand All @@ -249,7 +259,6 @@ def run(self, word: str):
else:
last_inputs[receiver_component_name][edge_data["to_socket"].name] = value

pair = (receiver_component_name, self.graph.nodes[receiver_component_name]["instance"])
is_greedy = pair[1].__haystack_is_greedy__
is_variadic = edge_data["to_socket"].is_variadic
if is_variadic and is_greedy:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
Fix some bugs running a Pipeline that has Components with conditional outputs.
Some branches that were expected not to run would run anyway, even if they received no inputs.
Some branches instead would cause the Pipeline to get stuck waiting to run that branch, even if they received no inputs.
The behaviour would depend whether the Component not receiving the input has a optional input or not.
10 changes: 7 additions & 3 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,19 +694,23 @@ def test__init_to_run(self):
pipe.add_component("with_no_inputs", ComponentWithNoInputs())
pipe.add_component("with_single_input", ComponentWithSingleInput())
pipe.add_component("another_with_single_input", ComponentWithSingleInput())
pipe.add_component("yet_another_with_single_input", ComponentWithSingleInput())
pipe.add_component("with_multiple_inputs", ComponentWithMultipleInputs())

pipe.connect("yet_another_with_single_input.out", "with_variadic.in")
pipe.connect("with_no_inputs.out", "with_variadic.in")
pipe.connect("with_single_input.out", "another_with_single_input.in")
pipe.connect("another_with_single_input.out", "with_multiple_inputs.in1")
pipe.connect("with_multiple_inputs.out", "with_variadic.in")

to_run = pipe._init_to_run()
assert len(to_run) == 4
data = {"yet_another_with_single_input": {"in": 1}}
to_run = pipe._init_to_run(data)
assert len(to_run) == 5
assert to_run[0][0] == "with_variadic"
assert to_run[1][0] == "with_no_inputs"
assert to_run[2][0] == "with_single_input"
assert to_run[3][0] == "with_multiple_inputs"
assert to_run[3][0] == "yet_another_with_single_input"
assert to_run[4][0] == "with_multiple_inputs"

def test__init_inputs_state(self):
pipe = Pipeline()
Expand Down