From e765121505e52c0f494ee8c42ab7d16d46d08a19 Mon Sep 17 00:00:00 2001 From: Aidin Gharibnavaz Date: Fri, 10 May 2024 13:24:09 +1200 Subject: [PATCH] Return a CompletableFuture from BulkIngester.flush method --- .../_helpers/bulk/BulkIngester.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index a6e61367e..83af5d49c 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -35,7 +35,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -80,9 +80,9 @@ private static class RequestExecution { public final long id; public final BulkRequest request; public final List contexts; - public final CompletionStage futureResponse; + public final CompletableFuture futureResponse; - RequestExecution(long id, BulkRequest request, List contexts, CompletionStage futureResponse) { + RequestExecution(long id, BulkRequest request, List contexts, CompletableFuture futureResponse) { this.id = id; this.request = request; this.contexts = contexts; @@ -271,7 +271,11 @@ private void failsafeFlush() { } } - public void flush() { + /** + * @return A future of the response. The BulkResponse is empty if there was nothing to execute. + */ + @Nullable + public CompletableFuture flush() { RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes @@ -294,7 +298,7 @@ public void flush() { listener.beforeBulk(id, request, requestContexts); } - CompletionStage result = client.bulk(request); + CompletableFuture result = client.bulk(request); requestsInFlightCount++; if (listener == null) { @@ -327,7 +331,15 @@ public void flush() { } return null; }); + + return exec.futureResponse; } + + return CompletableFuture.completedFuture(BulkResponse.of(b -> b + .errors(false) + .items(Collections.emptyList()) + .took(1)) + ); } public void add(BulkOperation operation, Context context) {