From d71e651ab5a6cff27f1255be1345e6539155d08d Mon Sep 17 00:00:00 2001 From: Bhaa Shakur Date: Fri, 28 Apr 2023 01:16:12 +0300 Subject: [PATCH] Catch BulkIndexError exception in async_bulk. resolves #864 --- elasticsearch/_async/helpers.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 6e05bc9fc..ce6bdb85b 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -44,7 +44,7 @@ _process_bulk_chunk_success, expand_action, ) -from ..helpers.errors import ScanError +from ..helpers.errors import ScanError, BulkIndexError from ..serializer import Serializer from .client import AsyncElasticsearch # noqa @@ -334,10 +334,18 @@ async def async_bulk( # make streaming_bulk yield successful results so we can count them kwargs["yield_ok"] = True - async for ok, item in async_streaming_bulk( + async_streaming_bulk_itr = async_streaming_bulk( client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc] - ): - # go through request-response pairs and detect failures + ) + # go through request-response pairs and detect failures + while True: + try: + ok, item = await anext(async_streaming_bulk_itr) + except StopAsyncIteration: + break + except BulkIndexError as e: + ok, item = False, e.errors[0] + if not ok: if not stats_only: errors.append(item)