Skip to content

Commit 9e5b678

Browse files
committed
Updated implementation of GrpcStreamRetrier
1 parent 173ad53 commit 9e5b678

File tree

3 files changed

+57
-49
lines changed

3 files changed

+57
-49
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+55-47
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,31 @@
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.RejectedExecutionException;
66
import java.util.concurrent.ScheduledExecutorService;
7-
import java.util.concurrent.ThreadLocalRandom;
87
import java.util.concurrent.TimeUnit;
98
import java.util.concurrent.atomic.AtomicBoolean;
10-
import java.util.concurrent.atomic.AtomicInteger;
119

1210
import org.slf4j.Logger;
1311

1412
import tech.ydb.common.retry.RetryConfig;
13+
import tech.ydb.common.retry.RetryPolicy;
1514
import tech.ydb.core.Status;
1615

1716
/**
1817
* @author Nikolay Perfilov
1918
*/
2019
public abstract class GrpcStreamRetrier {
21-
// TODO: add retry policy
22-
private static final int MAX_RECONNECT_COUNT = 0; // Inf
23-
private static final int EXP_BACKOFF_BASE_MS = 256;
24-
private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec)
25-
private static final int EXP_BACKOFF_MAX_POWER = 7;
2620
private static final int ID_LENGTH = 6;
2721
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2822
.toCharArray();
2923

30-
private final RetryConfig retryConfig;
3124
protected final String id;
3225
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3326
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
34-
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
3527

3628
private final ScheduledExecutorService scheduler;
29+
private final RetryConfig retryConfig;
30+
private volatile int retryCount;
31+
private volatile long retryStartedAt;
3732

3833
protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) {
3934
this.retryConfig = retryConfig;
@@ -54,45 +49,31 @@ protected static String generateRandomId(int length) {
5449
.toString();
5550
}
5651

57-
private void tryScheduleReconnect() {
58-
int currentReconnectCounter = reconnectCounter.get() + 1;
59-
if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) {
60-
if (isStopped.compareAndSet(false, true)) {
61-
String errorMessage = "[" + id + "] Maximum retry count (" + MAX_RECONNECT_COUNT
62-
+ ") exceeded. Shutting down " + getStreamName();
63-
getLogger().error(errorMessage);
64-
shutdownImpl(errorMessage);
65-
return;
66-
} else {
67-
getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " +
68-
"shut down.", id, MAX_RECONNECT_COUNT, getStreamName());
69-
}
70-
}
71-
if (isReconnecting.compareAndSet(false, true)) {
72-
reconnectCounter.set(currentReconnectCounter);
73-
int delayMs = currentReconnectCounter <= EXP_BACKOFF_MAX_POWER
74-
? EXP_BACKOFF_BASE_MS * (1 << currentReconnectCounter)
75-
: EXP_BACKOFF_CEILING_MS;
76-
// Add jitter
77-
delayMs = delayMs + ThreadLocalRandom.current().nextInt(delayMs);
78-
getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, currentReconnectCounter,
79-
getStreamName(), delayMs);
80-
try {
81-
scheduler.schedule(this::reconnect, delayMs, TimeUnit.MILLISECONDS);
82-
} catch (RejectedExecutionException exception) {
83-
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
84-
"Shutting down " + getStreamName();
85-
getLogger().error(errorMessage);
86-
shutdownImpl(errorMessage);
87-
}
88-
} else {
52+
private void tryReconnect(long delay) {
53+
if (!isReconnecting.compareAndSet(false, true)) {
8954
getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id,
9055
getStreamName());
56+
return;
9157
}
58+
59+
getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryCount, getStreamName(), delay);
60+
try {
61+
scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS);
62+
} catch (RejectedExecutionException exception) {
63+
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
64+
"Shutting down " + getStreamName();
65+
getLogger().error(errorMessage);
66+
shutdownImpl(errorMessage);
67+
}
68+
}
69+
70+
protected void resetRetries() {
71+
retryStartedAt = -1;
72+
retryCount = 0;
9273
}
9374

9475
void reconnect() {
95-
getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), reconnectCounter.get());
76+
getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), retryCount);
9677
if (!isReconnecting.compareAndSet(true, false)) {
9778
getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen");
9879
}
@@ -115,26 +96,53 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
11596
protected void onSessionClosed(Status status, Throwable th) {
11697
getLogger().info("[{}] onSessionClosed called", id);
11798

99+
RetryPolicy retryPolicy = null;
118100
if (th != null) {
119101
getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th);
102+
retryPolicy = retryConfig.isThrowableRetryable(th);
120103
} else {
121104
if (status.isSuccess()) {
122105
if (isStopped.get()) {
123106
getLogger().info("[{}] {} stream session closed successfully", id, getStreamName());
124107
return;
125108
} else {
126-
getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName(),
127-
getStreamName());
109+
getLogger().warn("[{}] {} stream session was closed on working {}", id, getStreamName());
128110
}
129111
} else {
130112
getLogger().warn("[{}] Error in {} stream session: {}", id, getStreamName(), status);
113+
retryPolicy = retryConfig.isStatusRetryable(status.getCode());
131114
}
132115
}
133116

134-
if (!isStopped.get()) {
135-
tryScheduleReconnect();
136-
} else {
117+
if (isStopped.get()) {
137118
getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName());
119+
return;
120+
}
121+
122+
if (retryPolicy != null) {
123+
if (retryCount < 1) {
124+
retryStartedAt = System.currentTimeMillis();
125+
}
126+
long delay = retryPolicy.nextRetryMs(retryCount + 1, System.currentTimeMillis() - retryStartedAt);
127+
if (delay >= 0) {
128+
retryCount++;
129+
tryReconnect(delay);
130+
return;
131+
}
138132
}
133+
134+
long elapsedMs = retryStartedAt > 0 ? System.currentTimeMillis() - retryStartedAt : 0;
135+
if (!isStopped.compareAndSet(false, true)) {
136+
getLogger().warn("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down.",
137+
id, retryCount, elapsedMs, getStreamName());
138+
return;
139+
}
140+
141+
String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs +
142+
" ms elapsed. Shutting down " + getStreamName();
143+
getLogger().error(errorMessage);
144+
shutdownImpl(errorMessage);
139145
}
146+
147+
140148
}

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
515515
}
516516
logger.debug("[{}] processMessage called", streamId);
517517
if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
518-
reconnectCounter.set(0);
518+
resetRetries();
519519
} else {
520520
Status status = Status.of(StatusCode.fromProto(message.getStatus()),
521521
Issue.fromPb(message.getIssuesList()));

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
479479
private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
480480
logger.debug("[{}] processMessage called", streamId);
481481
if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
482-
reconnectCounter.set(0);
482+
resetRetries();
483483
} else {
484484
Status status = Status.of(StatusCode.fromProto(message.getStatus()),
485485
Issue.fromPb(message.getIssuesList()));

0 commit comments

Comments
 (0)