Skip to content

Commit 4ce6934

Browse files
authored
fix: Update deepcopying in Pipeline to have a fallback in case of error (#9346)
* First pass at fix for deepcopying inputs and outputs * Add reno * Add recursion for dict objects * Bump recursion depth * More tests and some improvments * Fix unit tests * PR comments
1 parent 64f384b commit 4ce6934

File tree

8 files changed

+186
-50
lines changed

8 files changed

+186
-50
lines changed

haystack/core/pipeline/async_pipeline.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import asyncio
6-
from copy import deepcopy
76
from typing import Any, AsyncIterator, Dict, List, Optional, Set
87

98
from haystack import logging, tracing
@@ -16,6 +15,7 @@
1615
ComponentPriority,
1716
PipelineBase,
1817
)
18+
from haystack.core.pipeline.utils import deepcopy_with_fallback
1919
from haystack.telemetry import pipeline_running
2020

2121
logger = logging.getLogger(__name__)
@@ -58,7 +58,7 @@ async def _run_component_async( # pylint: disable=too-many-positional-arguments
5858
with PipelineBase._create_component_span(
5959
component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
6060
) as span:
61-
span.set_content_tag(_COMPONENT_INPUT, deepcopy(component_inputs))
61+
span.set_content_tag(_COMPONENT_INPUT, deepcopy_with_fallback(component_inputs))
6262
logger.info("Running component {component_name}", component_name=component_name)
6363

6464
if getattr(instance, "__haystack_supports_async__", False):
@@ -76,7 +76,7 @@ async def _run_component_async( # pylint: disable=too-many-positional-arguments
7676
raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
7777

7878
span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
79-
span.set_content_tag(_COMPONENT_OUTPUT, deepcopy(outputs))
79+
span.set_content_tag(_COMPONENT_OUTPUT, deepcopy_with_fallback(outputs))
8080

8181
return outputs
8282

@@ -238,7 +238,7 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[s
238238
partial_result = finished.result()
239239
scheduled_components.discard(finished_component_name)
240240
if partial_result:
241-
yield_dict = {finished_component_name: deepcopy(partial_result)}
241+
yield_dict = {finished_component_name: deepcopy_with_fallback(partial_result)}
242242
yield yield_dict # partial outputs
243243

244244
if component_name in scheduled_components:
@@ -274,7 +274,7 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[s
274274

275275
scheduled_components.remove(component_name)
276276
if pruned:
277-
yield {component_name: deepcopy(pruned)}
277+
yield {component_name: deepcopy_with_fallback(pruned)}
278278

279279
async def _schedule_task(component_name: str) -> None:
280280
"""
@@ -337,7 +337,7 @@ async def _wait_for_one_task_to_complete() -> AsyncIterator[Dict[str, Any]]:
337337
partial_result = finished.result()
338338
scheduled_components.discard(finished_component_name)
339339
if partial_result:
340-
yield {finished_component_name: deepcopy(partial_result)}
340+
yield {finished_component_name: deepcopy_with_fallback(partial_result)}
341341

342342
async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
343343
"""
@@ -350,7 +350,7 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
350350
partial_result = finished.result()
351351
scheduled_components.discard(finished_component_name)
352352
if partial_result:
353-
yield {finished_component_name: deepcopy(partial_result)}
353+
yield {finished_component_name: deepcopy_with_fallback(partial_result)}
354354

355355
# -------------------------------------------------
356356
# MAIN SCHEDULING LOOP
@@ -428,7 +428,7 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
428428
yield partial_res
429429

430430
# 4) Yield final pipeline outputs
431-
yield deepcopy(pipeline_outputs)
431+
yield deepcopy_with_fallback(pipeline_outputs)
432432

433433
async def run_async(
434434
self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4

haystack/core/pipeline/base.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import itertools
66
from collections import defaultdict
7-
from copy import deepcopy
87
from datetime import datetime
98
from enum import IntEnum
109
from pathlib import Path
@@ -33,7 +32,7 @@
3332
is_any_greedy_socket_ready,
3433
is_socket_lazy_variadic,
3534
)
36-
from haystack.core.pipeline.utils import FIFOPriorityQueue, parse_connect_string
35+
from haystack.core.pipeline.utils import FIFOPriorityQueue, deepcopy_with_fallback, parse_connect_string
3736
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
3837
from haystack.core.type_utils import _type_name, _types_are_compatible
3938
from haystack.marshal import Marshaller, YamlMarshaller
@@ -176,7 +175,7 @@ def from_dict(
176175
:returns:
177176
Deserialized component.
178177
"""
179-
data_copy = deepcopy(data) # to prevent modification of original data
178+
data_copy = deepcopy_with_fallback(data) # to prevent modification of original data
180179
metadata = data_copy.get("metadata", {})
181180
max_runs_per_component = data_copy.get("max_runs_per_component", 100)
182181
connection_type_validation = data_copy.get("connection_type_validation", True)
@@ -895,7 +894,7 @@ def _prepare_component_input_data(self, data: Dict[str, Any]) -> Dict[str, Dict[
895894
# deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly
896895
# when the same input reference is passed to multiple components.
897896
for component_name, component_inputs in data.items():
898-
data[component_name] = {k: deepcopy(v) for k, v in component_inputs.items()}
897+
data[component_name] = {k: deepcopy_with_fallback(v) for k, v in component_inputs.items()}
899898

900899
return data
901900

haystack/core/pipeline/pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
from copy import deepcopy
65
from typing import Any, Dict, Mapping, Optional, Set, cast
76

87
from haystack import logging, tracing
@@ -15,6 +14,7 @@
1514
ComponentPriority,
1615
PipelineBase,
1716
)
17+
from haystack.core.pipeline.utils import deepcopy_with_fallback
1818
from haystack.telemetry import pipeline_running
1919

2020
logger = logging.getLogger(__name__)
@@ -54,7 +54,7 @@ def _run_component(
5454
) as span:
5555
# We deepcopy the inputs otherwise we might lose that information
5656
# when we delete them in case they're sent to other Components
57-
span.set_content_tag(_COMPONENT_INPUT, deepcopy(inputs))
57+
span.set_content_tag(_COMPONENT_INPUT, deepcopy_with_fallback(inputs))
5858
logger.info("Running component {component_name}", component_name=component_name)
5959
try:
6060
component_output = instance.run(**inputs)
@@ -252,7 +252,7 @@ def run( # noqa: PLR0915, PLR0912
252252
)
253253

254254
if component_pipeline_outputs:
255-
pipeline_outputs[component_name] = deepcopy(component_pipeline_outputs)
255+
pipeline_outputs[component_name] = deepcopy_with_fallback(component_pipeline_outputs)
256256
if self._is_queue_stale(priority_queue):
257257
priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
258258

haystack/core/pipeline/utils.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,57 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import heapq
6+
from copy import deepcopy
67
from itertools import count
78
from typing import Any, List, Optional, Tuple
89

10+
from haystack import logging
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def deepcopy_with_fallback(obj: Any, max_depth: Optional[int] = 100) -> Any:
16+
"""
17+
Attempts to create a deep copy of the given object with a safe fallback mechanism.
18+
19+
If the object is a dictionary, each value is copied individually using this function recursively.
20+
If an individual item fails to be copied, the original value is used as a fallback.
21+
A maximum recursion depth is enforced to avoid deep or cyclic structures from causing issues.
22+
23+
:param obj: The object to attempt to deep copy.
24+
:param max_depth: The maximum depth to recurse during deep copying. If 0, returns the object as-is.
25+
:return: A deep copy of the object if successful; otherwise, the original object.
26+
"""
27+
if max_depth is not None and max_depth <= 0:
28+
return obj
29+
30+
try:
31+
return deepcopy(obj)
32+
except Exception as e:
33+
# If the deepcopy fails we try to deepcopy each individual value if the object is a dictionary
34+
next_depth = None if max_depth is None else max_depth - 1
35+
if isinstance(obj, dict):
36+
logger.info(
37+
"Deepcopy failed for object of type '{obj_type}'. Error: {error}. Attempting item-wise copy.",
38+
obj_type=type(obj).__name__,
39+
error=e,
40+
)
41+
return {key: deepcopy_with_fallback(value, next_depth) for key, value in obj.items()}
42+
elif isinstance(obj, (list, tuple, set)):
43+
logger.info(
44+
"Deepcopy failed for object of type '{obj_type}'. Error: {error}. Attempting item-wise copy.",
45+
obj_type=type(obj).__name__,
46+
error=e,
47+
)
48+
return type(obj)(deepcopy_with_fallback(item, next_depth) for item in obj)
49+
50+
logger.info(
51+
"Deepcopy failed for object of type '{obj_type}'. Error: {error}. Returning original object instead.",
52+
obj_type=type(obj).__name__,
53+
error=e,
54+
)
55+
return obj
56+
957

1058
def parse_connect_string(connection: str) -> Tuple[str, Optional[str]]:
1159
"""

haystack/tools/component_tool.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
from copy import copy, deepcopy
65
from dataclasses import fields, is_dataclass
76
from inspect import getdoc
87
from typing import Any, Callable, Dict, Optional, Union, get_args, get_origin
@@ -419,29 +418,3 @@ def _create_property_schema(self, python_type: Any, description: str, default: A
419418
schema["default"] = default
420419

421420
return schema
422-
423-
def __deepcopy__(self, memo: Dict[Any, Any]) -> "ComponentTool":
424-
# Jinja2 templates throw an Exception when we deepcopy them (see https://github.com/pallets/jinja/issues/758)
425-
# When we use a ComponentTool in a pipeline at runtime, we deepcopy the tool
426-
# We overwrite ComponentTool.__deepcopy__ to fix this until a more comprehensive fix is merged.
427-
# We track the issue here: https://github.com/deepset-ai/haystack/issues/9011
428-
result = copy(self)
429-
430-
# Add the object to the memo dictionary to handle circular references
431-
memo[id(self)] = result
432-
433-
# Deep copy all attributes with exception handling
434-
for key, value in self.__dict__.items():
435-
try:
436-
# Try to deep copy the attribute
437-
setattr(result, key, deepcopy(value, memo))
438-
except (TypeError, NotImplementedError):
439-
# Fall back to using the original attribute for components that use Jinja2-templates
440-
logger.debug(
441-
"deepcopy of ComponentTool {tool_name} failed. Using original attribute '{attribute}' instead.",
442-
tool_name=self.name,
443-
attribute=key,
444-
)
445-
setattr(result, key, getattr(self, key))
446-
447-
return result
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
fixes:
3+
- |
4+
Introduced `deepcopy_with_fallback` to improve robustness in the `Pipeline.run` method.
5+
Previously, standard `deepcopy` was used on inputs and outputs to avoid unintended side effects from shared mutable data structures, as these values are accessed in multiple parts of the pipeline. However, certain Python objects cannot be deepcopied, which could lead to runtime errors.
6+
The new `deepcopy_with_fallback` function attempts to deep copy an object and safely falls back to the original object if copying fails. This ensures pipeline execution remains stable while maintaining the protective behavior against shared-state modification.

test/core/pipeline/test_utils.py

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import logging
56
import pytest
67

7-
from haystack.core.pipeline.utils import parse_connect_string, FIFOPriorityQueue
8+
from haystack.components.builders.prompt_builder import PromptBuilder
9+
from haystack.core.pipeline.utils import parse_connect_string, FIFOPriorityQueue, deepcopy_with_fallback
10+
from haystack.tools import ComponentTool, Tool
11+
12+
13+
def get_weather_report(city: str) -> str:
14+
return f"Weather report for {city}: 20°C, sunny"
815

916

1017
def test_parse_connection():
@@ -175,3 +182,80 @@ def test_queue_ordering_parametrized(empty_queue, items):
175182
sorted_items = sorted(items, key=lambda x: (x[0], items.index(x)))
176183
for priority, item in sorted_items:
177184
assert empty_queue.pop() == (priority, item)
185+
186+
187+
class TestDeepcopyWithFallback:
188+
def test_deepcopy_with_fallback_copyable(self, caplog):
189+
tool = Tool(
190+
name="weather",
191+
description="Get weather report",
192+
parameters={"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]},
193+
function=get_weather_report,
194+
)
195+
original = {"tools": tool}
196+
with caplog.at_level(logging.INFO):
197+
copy = deepcopy_with_fallback(original)
198+
assert "Deepcopy failed for object of type" not in caplog.text
199+
assert copy["tools"] == original["tools"]
200+
# This should be a true copy so changing the name in the copy should not affect the original
201+
copy["tools"].name = "copied_tool"
202+
assert copy["tools"] != original["tools"]
203+
204+
def test_deepcopy_with_fallback_not_copyable(self, caplog):
205+
problematic_tool = ComponentTool(
206+
name="problematic_tool", description="This is a problematic tool.", component=PromptBuilder("{{query}}")
207+
)
208+
original = {"tools": problematic_tool}
209+
with caplog.at_level(logging.INFO):
210+
copy = deepcopy_with_fallback(original)
211+
assert "Deepcopy failed for object of type 'ComponentTool'" in caplog.text
212+
assert copy["tools"] == original["tools"]
213+
# Not a true copy but a shallow copy so changing the name in the copy should also affect the original
214+
copy["tools"].name = "copied_tool"
215+
assert copy["tools"] == original["tools"]
216+
217+
def test_deepcopy_with_fallback_mixed_copyable_list(self, caplog):
218+
tool1 = Tool(
219+
name="tool1",
220+
description="Get weather report",
221+
parameters={"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]},
222+
function=get_weather_report,
223+
)
224+
tool2 = ComponentTool(
225+
name="problematic_tool", description="This is a problematic tool.", component=PromptBuilder("{{query}}")
226+
)
227+
original = {"tools": [tool1, tool2]}
228+
with caplog.at_level(logging.INFO):
229+
copy = deepcopy_with_fallback(original)
230+
assert "Deepcopy failed for object of type 'ComponentTool'" in caplog.text
231+
assert copy["tools"][0] == original["tools"][0]
232+
# First tool should be a true copy so changing the name in the copy should not affect the original
233+
copy["tools"][0].name = "copied_tool1"
234+
assert copy["tools"][0] != original["tools"][0]
235+
assert copy["tools"][1] == original["tools"][1]
236+
# second should be a shallow copy so changing the name in the copy should also affect the original
237+
copy["tools"][1].name = "copied_tool2"
238+
assert copy["tools"][1] == original["tools"][1]
239+
240+
def test_deepcopy_with_fallback_mixed_copyable_tuple(self, caplog):
241+
tool1 = Tool(
242+
name="tool1",
243+
description="Get weather report",
244+
parameters={"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]},
245+
function=get_weather_report,
246+
)
247+
tool2 = ComponentTool(
248+
name="problematic_tool", description="This is a problematic tool.", component=PromptBuilder("{{query}}")
249+
)
250+
original = {"tools": (tool1, tool2)}
251+
with caplog.at_level(logging.INFO):
252+
copy = deepcopy_with_fallback(original)
253+
assert "Deepcopy failed for object of type 'ComponentTool'" in caplog.text
254+
assert copy["tools"][0] == original["tools"][0]
255+
# First tool should be a true copy so changing the name in the copy should not affect the original
256+
copy["tools"][0].name = "copied_tool1"
257+
assert copy["tools"][0] != original["tools"][0]
258+
assert copy["tools"][1] == original["tools"][1]
259+
# second should be a shallow copy so changing the name in the copy should also affect the original
260+
copy["tools"][1].name = "copied_tool2"
261+
assert copy["tools"][1] == original["tools"][1]

0 commit comments

Comments
 (0)