diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 6e05bc9fc..ce6bdb85b 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -44,7 +44,7 @@ _process_bulk_chunk_success, expand_action, ) -from ..helpers.errors import ScanError +from ..helpers.errors import ScanError, BulkIndexError from ..serializer import Serializer from .client import AsyncElasticsearch # noqa @@ -334,10 +334,18 @@ async def async_bulk( # make streaming_bulk yield successful results so we can count them kwargs["yield_ok"] = True - async for ok, item in async_streaming_bulk( + async_streaming_bulk_itr = async_streaming_bulk( client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc] - ): - # go through request-response pairs and detect failures + ) + # go through request-response pairs and detect failures + while True: + try: + ok, item = await anext(async_streaming_bulk_itr) + except StopAsyncIteration: + break + except BulkIndexError as e: + ok, item = False, e.errors[0] + if not ok: if not stats_only: errors.append(item)