Skip to content

Commit

Permalink
feat: AsyncPipeline that can schedule components to run concurrently (#…
Browse files Browse the repository at this point in the history
…8812)

* add component checks

* pipeline should run deterministically

* add FIFOQueue

* add agent tests

* add order dependent tests

* run new tests

* remove code that is not needed

* test: intermediate from cycle outputs are available outside cycle

* add tests for component checks (Claude)

* adapt tests for component checks (o1 review)

* chore: format

* remove tests that aren't needed anymore

* add _calculate_priority tests

* revert accidental change in pyproject.toml

* test format conversion

* adapt to naming convention

* chore: proper docstrings and type hints for PQ

* format

* add more unit tests

* rm unneeded comments

* test input consumption

* lint

* fix: docstrings

* lint

* format

* format

* fix license header

* fix license header

* add component run tests

* fix: pass correct input format to tracing

* fix types

* format

* format

* types

* add defaults from Socket instead of signature

- otherwise components with dynamic inputs would fail

* fix test names

* still wait for optional inputs on greedy variadic sockets

- mirrors previous behavior

* fix format

* wip: warn for ambiguous running order

* wip: alternative warning

* fix license header

* make code more readable

Co-authored-by: Amna Mubashar <[email protected]>

* Introduce content tracing to a behavioral test

* Fixing linting

* Remove debug print statements

* Fix tracer tests

* remove print

* test: test for component inputs

* test: remove testing for run order

* chore: update component checks from experimental

* chore: update pipeline and base from experimental

* refactor: remove unused method

* refactor: remove unused method

* refactor: outdated comment

* refactor: inputs state is updated as side effect

- to prepare for AsyncPipeline implementation

* format

* test: add file conversion test

* format

* fix: original implementation deepcopies outputs

* lint

* fix: from_dict was updated

* fix: format

* fix: test

* test: add test for thread safety

* remove unused imports

* format

* test: FIFOPriorityQueue

* chore: add release note

* feat: add AsyncPipeline

* chore: Add release notes

* fix: format

* debug: switch run order to debug ubuntu and windows tests

* fix: consider priorities of other components while waiting for DEFER

* refactor: simplify code

* fix: resolve merge conflict with mermaid changes

* fix: format

* fix: remove unused import

* refactor: rename to avoid accidental conflicts

* fix: track pipeline type

* fix: and extend test

* fix: format

* style: sort alphabetically

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update test/core/pipeline/features/conftest.py

Co-authored-by: Amna Mubashar <[email protected]>

* Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml

* fix: indentation, do not close loop

* fix: use asyncio.run

* fix: format

---------

Co-authored-by: Amna Mubashar <[email protected]>
Co-authored-by: David S. Batista <[email protected]>
  • Loading branch information
3 people authored Feb 7, 2025
1 parent 35788a2 commit e5b9bde
Show file tree
Hide file tree
Showing 10 changed files with 758 additions and 116 deletions.
19 changes: 10 additions & 9 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import haystack.tracing
from haystack.core.component import component
from haystack.core.errors import ComponentError, DeserializationError
from haystack.core.pipeline import Pipeline, PredefinedPipeline
from haystack.core.pipeline import AsyncPipeline, Pipeline, PredefinedPipeline
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses import Answer, Document, ExtractedAnswer, GeneratedAnswer

Expand All @@ -18,15 +18,16 @@
haystack.tracing.auto_enable_tracing()

__all__ = [
"component",
"default_from_dict",
"default_to_dict",
"DeserializationError",
"Answer",
"AsyncPipeline",
"ComponentError",
"Pipeline",
"PredefinedPipeline",
"DeserializationError",
"Document",
"Answer",
"GeneratedAnswer",
"ExtractedAnswer",
"GeneratedAnswer",
"Pipeline",
"PredefinedPipeline",
"component",
"default_from_dict",
"default_to_dict",
]
3 changes: 2 additions & 1 deletion haystack/core/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0

from .async_pipeline import AsyncPipeline
from .pipeline import Pipeline
from .template import PredefinedPipeline

__all__ = ["Pipeline", "PredefinedPipeline"]
__all__ = ["AsyncPipeline", "Pipeline", "PredefinedPipeline"]
535 changes: 535 additions & 0 deletions haystack/core/pipeline/async_pipeline.py

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions haystack/telemetry/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import posthog
import yaml
Expand All @@ -18,7 +18,7 @@
from haystack.telemetry._environment import collect_system_specs

if TYPE_CHECKING:
from haystack.core.pipeline import Pipeline
from haystack.core.pipeline import AsyncPipeline, Pipeline


HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED"
Expand Down Expand Up @@ -135,7 +135,7 @@ def send_telemetry_wrapper(*args, **kwargs):


@send_telemetry
def pipeline_running(pipeline: "Pipeline") -> Optional[Tuple[str, Dict[str, Any]]]:
def pipeline_running(pipeline: Union["Pipeline", "AsyncPipeline"]) -> Optional[Tuple[str, Dict[str, Any]]]:
"""
Collects telemetry data for a pipeline run and sends it to Posthog.
Expand Down Expand Up @@ -170,6 +170,7 @@ def pipeline_running(pipeline: "Pipeline") -> Optional[Tuple[str, Dict[str, Any]
# Data sent to Posthog
return "Pipeline run (2.x)", {
"pipeline_id": str(id(pipeline)),
"pipeline_type": generate_qualified_class_name(type(pipeline)),
"runs": pipeline._telemetry_runs,
"components": components,
}
Expand Down
8 changes: 8 additions & 0 deletions releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
highlights: >
We are introducing the `AsyncPipeline`: Supports running pipelines asynchronously. Schedules components concurrently
whenever possible. Leads to major speed improvements for any pipelines that may run workloads in parallel.
features:
- |
Added a new `AsyncPipeline` implementation that allows pipelines to be executed from async code,
supporting concurrent scheduling of pipeline components for faster processing.
62 changes: 60 additions & 2 deletions test/core/pipeline/features/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
from typing import Tuple, List, Dict, Any, Set, Union
from pathlib import Path
import re
import pytest
import asyncio

from pytest_bdd import when, then, parsers

from haystack import Pipeline
from haystack import Pipeline, AsyncPipeline

PIPELINE_NAME_REGEX = re.compile(r"\[(.*)\]")


@pytest.fixture(params=[AsyncPipeline, Pipeline])
def pipeline_class(request):
"""
A parametrized fixture that will yield AsyncPipeline for one test run
and Pipeline for the next test run.
"""
return request.param


@dataclass
class PipelineRunData:
"""
Expand All @@ -34,6 +45,54 @@ class _PipelineResult:

@when("I run the Pipeline", target_fixture="pipeline_result")
def run_pipeline(
pipeline_data: Tuple[Union[AsyncPipeline, Pipeline], List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
if isinstance(pipeline_data[0], AsyncPipeline):
return run_async_pipeline(pipeline_data, spying_tracer)
else:
return run_sync_pipeline(pipeline_data, spying_tracer)


def run_async_pipeline(
pipeline_data: Tuple[Union[AsyncPipeline], List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
"""
Attempts to run a pipeline with the given inputs.
`pipeline_data` is a tuple that must contain:
* A Pipeline instance
* The data to run the pipeline with
If successful returns a tuple of the run outputs and the expected outputs.
In case an exceptions is raised returns that.
"""
pipeline, pipeline_run_data = pipeline_data[0], pipeline_data[1]

results: List[_PipelineResult] = []

async def run_inner(data, include_outputs_from):
"""Wrapper function to call pipeline.run_async method with required params."""
return await pipeline.run_async(data=data.inputs, include_outputs_from=include_outputs_from)

for data in pipeline_run_data:
try:
outputs = asyncio.run(run_inner(data, data.include_outputs_from))

component_calls = {
(span.tags["haystack.component.name"], span.tags["haystack.component.visits"]): span.tags[
"haystack.component.input"
]
for span in spying_tracer.spans
if "haystack.component.name" in span.tags and "haystack.component.visits" in span.tags
}
results.append(_PipelineResult(outputs=outputs, component_calls=component_calls))
spying_tracer.spans.clear()
except Exception as e:
return e

return [e for e in zip(results, pipeline_run_data)]


def run_sync_pipeline(
pipeline_data: Tuple[Pipeline, List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
"""
Expand Down Expand Up @@ -61,7 +120,6 @@ def run_pipeline(
if "haystack.component.name" in span.tags and "haystack.component.visits" in span.tags
}
results.append(_PipelineResult(outputs=outputs, component_calls=component_calls))

spying_tracer.spans.clear()
except Exception as e:
return e
Expand Down
4 changes: 2 additions & 2 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Feature: Pipeline running
Scenario Outline: Running a correct Pipeline
Given a pipeline <kind>
When I run the Pipeline
Then it should return the expected result
And components are called with the expected inputs
Then components are called with the expected inputs
And it should return the expected result

Examples:
| kind |
Expand Down
Loading

0 comments on commit e5b9bde

Please sign in to comment.