-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: AsyncPipeline that can schedule components to run concurrently #8812
Conversation
# Conflicts: # releasenotes/notes/fix-pipeline-run-2fefeafc705a6d91.yaml # test/conftest.py # test/core/pipeline/features/conftest.py # test/core/pipeline/features/pipeline_run.feature # test/core/pipeline/features/test_run.py # test/core/pipeline/test_component_checks.py # test/core/pipeline/test_pipeline.py # test/core/pipeline/test_pipeline_base.py
@Amnah199 @davidsbatista much smaller diff now that the other PR is merged. This is largely the same as the PR that we already merged to experimental with the following differences:
|
@@ -23,6 +23,7 @@ | |||
"default_to_dict", | |||
"DeserializationError", | |||
"ComponentError", | |||
"AsyncPipeline", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) suggestion: keeping these imports ordered alphabetically helps locate something as the list grows
__all__ = [
"Answer",
"AsyncPipeline",
"ComponentError",
"DeserializationError",
"Document",
"ExtractedAnswer",
"GeneratedAnswer",
"Pipeline",
"PredefinedPipeline",
"component",
"default_from_dict",
"default_to_dict",
]
I did another quick review, although most of this was already reviewed before From my side it's approved, but to play safe, let's wait for Amna to also do another quick review before merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: Amna Mubashar <[email protected]>
Co-authored-by: Amna Mubashar <[email protected]>
async_loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(async_loop) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well, we can avoid manual handling of loops by using asyncio.run
, if you feel that would be better.
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | ||
pp = AsyncPipeline() | ||
pp.add_component("wait", waiting_component()) | ||
|
||
run_data = [{"wait_for": 1}, {"wait_for": 2}] | ||
|
||
async_loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(async_loop) | ||
|
||
async def run_all(): | ||
# Create concurrent tasks for each pipeline run | ||
tasks = [pp.run_async(data) for data in run_data] | ||
await asyncio.gather(*tasks) | ||
|
||
try: | ||
async_loop.run_until_complete(run_all()) | ||
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"] | ||
for span in component_spans: | ||
assert span.tags["haystack.component.visits"] == 1 | ||
finally: | ||
async_loop.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this? (although I didnt test it)
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | |
pp = AsyncPipeline() | |
pp.add_component("wait", waiting_component()) | |
run_data = [{"wait_for": 1}, {"wait_for": 2}] | |
async_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(async_loop) | |
async def run_all(): | |
# Create concurrent tasks for each pipeline run | |
tasks = [pp.run_async(data) for data in run_data] | |
await asyncio.gather(*tasks) | |
try: | |
async_loop.run_until_complete(run_all()) | |
component_spans = [sp for sp in spying_tracer.spans if sp.operation_name == "haystack.component.run_async"] | |
for span in component_spans: | |
assert span.tags["haystack.component.visits"] == 1 | |
finally: | |
async_loop.close() | |
def test_async_pipeline_reentrance(waiting_component, spying_tracer): | |
""" | |
Test that the AsyncPipeline can execute multiple runs concurrently and that | |
each component is called exactly once per run (as indicated by the 'visits' tag). | |
""" | |
async_pipeline = AsyncPipeline() | |
async_pipeline.add_component("wait", waiting_component()) | |
run_data = [{"wait_for": 1}, {"wait_for": 2}] | |
async def run_all(): | |
tasks = [async_pipeline.run_async(data) for data in run_data] | |
await asyncio.gather(*tasks) | |
component_spans = [ | |
sp for sp in spying_tracer.spans | |
if sp.operation_name == "haystack.component.run_async" | |
] | |
for span in component_spans: | |
expected_visits = 1 | |
actual_visits = span.tags.get("haystack.component.visits") | |
assert actual_visits == expected_visits, ( | |
f"Expected {expected_visits} visit, got {actual_visits} for span {span}" | |
) | |
# Use asyncio.run to manage the event loop. | |
asyncio.run(run_all()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks again @mathislucka.
Much appreciated!
Related Issues
Proposed Changes:
Implements an AsyncPipeline that supports:
run
-method with concurrent execution of componentsHow did you test it?
Notes for the reviewer
Review after #8707
Code was reviewed here before: deepset-ai/haystack-experimental#180
Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
and added!
in case the PR includes breaking changes.