Skip to content

Commit d71e651

Browse files
committed
Catch BulkIndexError exception in async_bulk. resolves elastic#864
1 parent 03e2b6d commit d71e651

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

elasticsearch/_async/helpers.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
_process_bulk_chunk_success,
4545
expand_action,
4646
)
47-
from ..helpers.errors import ScanError
47+
from ..helpers.errors import ScanError, BulkIndexError
4848
from ..serializer import Serializer
4949
from .client import AsyncElasticsearch # noqa
5050

@@ -334,10 +334,18 @@ async def async_bulk(
334334

335335
# make streaming_bulk yield successful results so we can count them
336336
kwargs["yield_ok"] = True
337-
async for ok, item in async_streaming_bulk(
337+
async_streaming_bulk_itr = async_streaming_bulk(
338338
client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc]
339-
):
340-
# go through request-response pairs and detect failures
339+
)
340+
# go through request-response pairs and detect failures
341+
while True:
342+
try:
343+
ok, item = await anext(async_streaming_bulk_itr)
344+
except StopAsyncIteration:
345+
break
346+
except BulkIndexError as e:
347+
ok, item = False, e.errors[0]
348+
341349
if not ok:
342350
if not stats_only:
343351
errors.append(item)

0 commit comments

Comments
 (0)