Skip to content

Commit fe2fc99

Browse files
committed
Updated implementation of GrpcStreamRetrier
1 parent 9d22563 commit fe2fc99

File tree

3 files changed

+73
-46
lines changed

3 files changed

+73
-46
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+71-44
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,33 @@
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.RejectedExecutionException;
66
import java.util.concurrent.ScheduledExecutorService;
7-
import java.util.concurrent.ThreadLocalRandom;
87
import java.util.concurrent.TimeUnit;
98
import java.util.concurrent.atomic.AtomicBoolean;
109
import java.util.concurrent.atomic.AtomicInteger;
1110

1211
import org.slf4j.Logger;
1312

13+
import tech.ydb.common.retry.ExponentialBackoffRetry;
14+
import tech.ydb.common.retry.RetryPolicy;
1415
import tech.ydb.core.Status;
1516
import tech.ydb.topic.settings.RetryMode;
1617

1718
/**
1819
* @author Nikolay Perfilov
1920
*/
2021
public abstract class GrpcStreamRetrier {
21-
// TODO: add retry policy
22-
private static final int MAX_RECONNECT_COUNT = 0; // Inf
23-
private static final int EXP_BACKOFF_BASE_MS = 256;
24-
private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec)
25-
private static final int EXP_BACKOFF_MAX_POWER = 7;
2622
private static final int ID_LENGTH = 6;
2723
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2824
.toCharArray();
2925

30-
private final RetryMode retryMode;
3126
protected final String id;
3227
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3328
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
29+
3430
private final ScheduledExecutorService scheduler;
35-
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
31+
private final RetryMode retryMode;
32+
private final RetryPolicy retryPolicy = new DefaultRetryPolicy();
33+
private final AtomicInteger retry = new AtomicInteger(-1);
3634

3735
protected GrpcStreamRetrier(RetryMode retryMode, ScheduledExecutorService scheduler) {
3836
this.retryMode = retryMode;
@@ -53,45 +51,32 @@ protected static String generateRandomId(int length) {
5351
.toString();
5452
}
5553

56-
private void tryScheduleReconnect() {
57-
int currentReconnectCounter = reconnectCounter.get() + 1;
58-
if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) {
59-
if (isStopped.compareAndSet(false, true)) {
60-
String errorMessage = "[" + id + "] Maximum retry count (" + MAX_RECONNECT_COUNT
61-
+ ") exceeded. Shutting down " + getStreamName();
62-
getLogger().error(errorMessage);
63-
shutdownImpl(errorMessage);
64-
return;
65-
} else {
66-
getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " +
67-
"shut down.", id, MAX_RECONNECT_COUNT, getStreamName());
68-
}
69-
}
70-
if (isReconnecting.compareAndSet(false, true)) {
71-
reconnectCounter.set(currentReconnectCounter);
72-
int delayMs = currentReconnectCounter <= EXP_BACKOFF_MAX_POWER
73-
? EXP_BACKOFF_BASE_MS * (1 << currentReconnectCounter)
74-
: EXP_BACKOFF_CEILING_MS;
75-
// Add jitter
76-
delayMs = delayMs + ThreadLocalRandom.current().nextInt(delayMs);
77-
getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, currentReconnectCounter,
78-
getStreamName(), delayMs);
79-
try {
80-
scheduler.schedule(this::reconnect, delayMs, TimeUnit.MILLISECONDS);
81-
} catch (RejectedExecutionException exception) {
82-
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
83-
"Shutting down " + getStreamName();
84-
getLogger().error(errorMessage);
85-
shutdownImpl(errorMessage);
86-
}
87-
} else {
54+
private void tryScheduleReconnect(int retryNumber) {
55+
if (!isReconnecting.compareAndSet(false, true)) {
8856
getLogger().info("[{}] should reconnect {} stream, but reconnect is already in progress", id,
8957
getStreamName());
58+
return;
59+
}
60+
61+
retry.set(retryNumber);
62+
long delay = retryPolicy.nextRetryMs(retryNumber, 0);
63+
getLogger().warn("[{}] Retry #{}. Scheduling {} reconnect in {}ms...", id, retryNumber, getStreamName(), delay);
64+
try {
65+
scheduler.schedule(this::reconnect, delay, TimeUnit.MILLISECONDS);
66+
} catch (RejectedExecutionException exception) {
67+
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
68+
"Shutting down " + getStreamName();
69+
getLogger().error(errorMessage);
70+
shutdownImpl(errorMessage);
9071
}
9172
}
9273

74+
protected void resetRetries() {
75+
retry.set(0);
76+
}
77+
9378
void reconnect() {
94-
getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), reconnectCounter.get());
79+
getLogger().info("[{}] {} reconnect #{} started", id, getStreamName(), retry.get());
9580
if (!isReconnecting.compareAndSet(true, false)) {
9681
getLogger().warn("Couldn't reset reconnect flag. Shouldn't happen");
9782
}
@@ -130,10 +115,52 @@ protected void onSessionClosed(Status status, Throwable th) {
130115
}
131116
}
132117

133-
if (!isStopped.get()) {
134-
tryScheduleReconnect();
135-
} else {
118+
if (isStopped.get()) {
136119
getLogger().info("[{}] {} is already stopped, no need to schedule reconnect", id, getStreamName());
120+
return;
121+
}
122+
123+
int currentRetry = nextRetryNumber();
124+
if (currentRetry > 0) {
125+
tryScheduleReconnect(currentRetry);
126+
return;
127+
}
128+
129+
if (!isStopped.compareAndSet(false, true)) {
130+
getLogger().warn("[{}] Stopped by retry mode {} after {} retries. But {} is already shut down.", id,
131+
retryMode, currentRetry, getStreamName());
132+
return;
133+
}
134+
135+
String errorMessage = "[" + id + "] Stopped by retry mode " + retryMode + " after " + currentRetry +
136+
" retries. Shutting down " + getStreamName();
137+
getLogger().error(errorMessage);
138+
shutdownImpl(errorMessage);
139+
}
140+
141+
private int nextRetryNumber() {
142+
int next = retry.get() + 1;
143+
switch (retryMode) {
144+
case RECOVER: return next;
145+
case ALWAYS: return Math.max(1, next);
146+
case NONE:
147+
default:
148+
return 0;
149+
}
150+
}
151+
152+
private static class DefaultRetryPolicy extends ExponentialBackoffRetry {
153+
154+
private static final int EXP_BACKOFF_BASE_MS = 256;
155+
private static final int EXP_BACKOFF_MAX_POWER = 7;
156+
157+
DefaultRetryPolicy() {
158+
super(EXP_BACKOFF_BASE_MS, EXP_BACKOFF_MAX_POWER);
159+
}
160+
161+
@Override
162+
public long nextRetryMs(int retryCount, long elapsedTimeMs /* ignored */) {
163+
return backoffTimeMillis(retryCount);
137164
}
138165
}
139166
}

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
515515
}
516516
logger.debug("[{}] processMessage called", streamId);
517517
if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
518-
reconnectCounter.set(0);
518+
resetRetries();
519519
} else {
520520
Status status = Status.of(StatusCode.fromProto(message.getStatus()),
521521
Issue.fromPb(message.getIssuesList()));

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
479479
private void processMessage(YdbTopic.StreamWriteMessage.FromServer message) {
480480
logger.debug("[{}] processMessage called", streamId);
481481
if (message.getStatus() == StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
482-
reconnectCounter.set(0);
482+
resetRetries();
483483
} else {
484484
Status status = Status.of(StatusCode.fromProto(message.getStatus()),
485485
Issue.fromPb(message.getIssuesList()));

0 commit comments

Comments
 (0)