From b52890e1efbdaa5c2a1a4f3fcddb9a70727eb642 Mon Sep 17 00:00:00 2001 From: Richard Deurwaarder Date: Wed, 7 Aug 2019 22:32:38 +0200 Subject: [PATCH] [FLINK-13230] [pubsub] Add retry mechanism to acknowledging pubsub messages --- .../gcp/pubsub/BlockingGrpcPubSubSubscriber.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java index 4b639128c883c..ec9492ee4a860 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java +++ b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/BlockingGrpcPubSubSubscriber.java @@ -95,12 +95,24 @@ public void acknowledge(List acknowledgementIds) { .addAllAckIds(splittedAckIds.f0) .build(); - stub.withDeadlineAfter(60, SECONDS).acknowledge(acknowledgeRequest); - + acknowledgeWithRetries(acknowledgeRequest, retries); splittedAckIds = splitAckIds(splittedAckIds.f1); } } + private void acknowledgeWithRetries(AcknowledgeRequest acknowledgeRequest, int retriesRemaining) { + try { + stub.withDeadlineAfter(timeout.toMillis(), TimeUnit.MILLISECONDS).acknowledge(acknowledgeRequest); + } catch (StatusRuntimeException e) { + if (retriesRemaining > 0) { + acknowledgeWithRetries(acknowledgeRequest, retriesRemaining - 1); + return; + } + + throw e; + } + } + /* maxPayload is the maximum number of bytes to devote to actual ids in * acknowledgement or modifyAckDeadline requests. A serialized * AcknowledgeRequest grpc call has a small constant overhead, plus the size of the