diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 1bc339917..94f03e0f2 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -21,6 +21,7 @@ Any, AsyncIterable, AsyncIterator, + Awaitable, Callable, Collection, Dict, @@ -167,6 +168,7 @@ async def async_streaming_bulk( expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY ] = expand_action, + sleep: Callable[[float],Awaitable[None]] = asyncio.sleep, raise_on_exception: bool = True, max_retries: int = 0, initial_backoff: float = 2, @@ -202,6 +204,7 @@ async def async_streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg sleep: custom callable defined for custom action on cancelling :arg retry_on_status: HTTP status code that will trigger a retry. (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when @@ -246,7 +249,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: ] ] = [] if attempt: - await asyncio.sleep( + await sleep( min(max_backoff, initial_backoff * 2 ** (attempt - 1)) ) @@ -304,6 +307,7 @@ async def async_bulk( client: AsyncElasticsearch, actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]], stats_only: bool = False, + sleep: Callable[[float],Awaitable[None]] = asyncio.sleep, ignore_status: Union[int, Collection[int]] = (), *args: Any, **kwargs: Any, @@ -329,6 +333,7 @@ async def async_bulk( :arg actions: iterator containing the actions :arg stats_only: if `True` only report number of successful/failed operations instead of just number of successful and a list of error responses + :arg sleep: custom callable defined for custom action on cancelling :arg ignore_status: list of HTTP status code that you want to ignore Any additional keyword arguments will be passed to @@ -344,7 +349,7 @@ async def async_bulk( # make streaming_bulk yield successful results so we can count them kwargs["yield_ok"] = True async for ok, item in async_streaming_bulk( - client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc] + client, actions, sleep=sleep, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc] ): # go through request-response pairs and detect failures if not ok: