Skip to content

Commit 16fd5e0

Browse files
committed
Revert old behavior with infinite retries by default
1 parent 44bc7de commit 16fd5e0

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,30 @@
99

1010
import org.slf4j.Logger;
1111

12+
import tech.ydb.common.retry.ExponentialBackoffRetry;
1213
import tech.ydb.common.retry.RetryConfig;
1314
import tech.ydb.common.retry.RetryPolicy;
1415
import tech.ydb.core.Status;
16+
import tech.ydb.core.StatusCode;
1517

1618
/**
1719
* @author Nikolay Perfilov
1820
*/
1921
public abstract class GrpcStreamRetrier {
22+
public static final RetryConfig RETRY_ALL = new RetryConfig() {
23+
@Override
24+
public RetryPolicy isStatusRetryable(StatusCode code) {
25+
return RETRY_ALL_POLICY;
26+
}
27+
28+
@Override
29+
public RetryPolicy isThrowableRetryable(Throwable th) {
30+
return RETRY_ALL_POLICY;
31+
}
32+
};
33+
34+
private static final RetryPolicy RETRY_ALL_POLICY = new ExponentialBackoffRetry(256, 7);
35+
2036
private static final int ID_LENGTH = 6;
2137
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2238
.toCharArray();
@@ -97,7 +113,7 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
97113
protected void onSessionClosed(Status status, Throwable th) {
98114
logger.info("[{}] onSessionClosed called", id);
99115

100-
RetryPolicy retryPolicy = null;
116+
RetryPolicy retryPolicy;
101117
if (th != null) {
102118
logger.error("[{}] Exception in {} stream session: ", id, getStreamName(), th);
103119
retryPolicy = retryConfig.isThrowableRetryable(th);
@@ -111,8 +127,8 @@ protected void onSessionClosed(Status status, Throwable th) {
111127
}
112128
} else {
113129
logger.warn("[{}] Error in {} stream session: {}", id, getStreamName(), status);
114-
retryPolicy = retryConfig.isStatusRetryable(status.getCode());
115130
}
131+
retryPolicy = retryConfig.isStatusRetryable(status.getCode());
116132
}
117133

118134
if (isStopped.get()) {

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import tech.ydb.common.retry.RetryPolicy;
1414
import tech.ydb.core.Status;
1515
import tech.ydb.core.StatusCode;
16+
import tech.ydb.topic.impl.GrpcStreamRetrier;
1617

1718
/**
1819
* @author Nikolay Perfilov
@@ -78,7 +79,7 @@ public static class Builder {
7879
private boolean readWithoutConsumer = false;
7980
private String readerName = null;
8081
private List<TopicReadSettings> topics = new ArrayList<>();
81-
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
82+
private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL;
8283
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
8384
private Executor decompressionExecutor = null;
8485

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import tech.ydb.core.Status;
88
import tech.ydb.core.StatusCode;
99
import tech.ydb.topic.description.Codec;
10+
import tech.ydb.topic.impl.GrpcStreamRetrier;
1011

1112
/**
1213
* @author Nikolay Perfilov
@@ -85,7 +86,7 @@ public static class Builder {
8586
private String messageGroupId = null;
8687
private Long partitionId = null;
8788
private Codec codec = Codec.GZIP;
88-
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
89+
private RetryConfig retryConfig = GrpcStreamRetrier.RETRY_ALL;
8990
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
9091
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
9192

topic/src/test/java/tech/ydb/topic/impl/RetryModeTest.java topic/src/test/java/tech/ydb/topic/impl/TopicRetriesTest.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import org.junit.Assert;
88
import org.junit.Test;
99

10+
import tech.ydb.common.retry.RetryConfig;
1011
import tech.ydb.core.Status;
1112
import tech.ydb.core.StatusCode;
12-
import tech.ydb.topic.settings.RetryMode;
13+
import tech.ydb.core.UnexpectedResultException;
1314
import tech.ydb.topic.settings.WriterSettings;
1415
import tech.ydb.topic.write.Message;
1516
import tech.ydb.topic.write.SyncWriter;
@@ -18,10 +19,10 @@
1819
*
1920
* @author Aleksandr Gorshenin
2021
*/
21-
public class RetryModeTest extends BaseMockedTest {
22+
public class TopicRetriesTest extends BaseMockedTest {
2223

2324
@Test
24-
public void alwaysRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
25+
public void defaultRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
2526
mockStreams()
2627
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE))
2728
.then(defaultStreamMockAnswer())
@@ -30,7 +31,6 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept
3031

3132
SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
3233
.setTopicPath("/mocked_topic")
33-
.setRetryMode(RetryMode.ALWAYS)
3434
.build());
3535
writer.init();
3636

@@ -47,7 +47,7 @@ public void alwaysRetryWriterTest() throws InterruptedException, ExecutionExcept
4747
stream1.nextMsg().isWrite().hasWrite(2, 1);
4848
stream1.responseWriteWritten(1, 1);
4949

50-
stream1.complete(Status.SUCCESS);
50+
stream1.complete(Status.of(StatusCode.SUCCESS));
5151

5252
// Retry #2 - Stream is closed by server
5353
getScheduler().hasTasks(1).executeNextTasks(1);
@@ -88,7 +88,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi
8888

8989
WriterSettings settings = WriterSettings.newBuilder()
9090
.setTopicPath("/mocked_topic")
91-
.setRetryMode(RetryMode.NONE)
91+
.setRetryConfig(RetryConfig.noRetries())
9292
.build();
9393

9494
SyncWriter writer = client.createSyncWriter(settings);
@@ -109,7 +109,7 @@ public void disabledRetryNetworkErrorTest() throws InterruptedException, Executi
109109
public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException {
110110
WriterSettings settings = WriterSettings.newBuilder()
111111
.setTopicPath("/mocked_topic")
112-
.setRetryMode(RetryMode.NONE)
112+
.setRetryConfig(RetryConfig.noRetries())
113113
.build();
114114

115115
SyncWriter writer = client.createSyncWriter(settings);
@@ -134,7 +134,7 @@ public void disabledRetryStreamCloseTest() throws InterruptedException, Executio
134134
public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException {
135135
WriterSettings settings = WriterSettings.newBuilder()
136136
.setTopicPath("/mocked_topic")
137-
.setRetryMode(RetryMode.NONE)
137+
.setRetryConfig(RetryConfig.noRetries())
138138
.build();
139139

140140
SyncWriter writer = client.createSyncWriter(settings);
@@ -162,7 +162,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio
162162

163163
WriterSettings settings = WriterSettings.newBuilder()
164164
.setTopicPath("/mocked_topic")
165-
.setRetryMode(RetryMode.RECOVER)
165+
.setRetryConfig(RetryConfig.noRetries())
166166
.build();
167167

168168
SyncWriter writer = client.createSyncWriter(settings);
@@ -180,7 +180,7 @@ public void recoverRetryNetworkErrorTest() throws InterruptedException, Executio
180180
}
181181

182182
@Test
183-
public void recoverRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
183+
public void idempotentRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
184184
mockStreams()
185185
.then(defaultStreamMockAnswer())
186186
.then(errorStreamMockAnswer(StatusCode.OVERLOADED))
@@ -190,7 +190,7 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep
190190

191191
SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
192192
.setTopicPath("/mocked_topic")
193-
.setRetryMode(RetryMode.RECOVER)
193+
.setRetryConfig(RetryConfig.idempotentRetryForever())
194194
.build());
195195
writer.init();
196196

@@ -203,7 +203,9 @@ public void recoverRetryWriterTest() throws InterruptedException, ExecutionExcep
203203
stream1.nextMsg().isWrite().hasWrite(2, 1);
204204
stream1.responseWriteWritten(1, 1);
205205

206-
stream1.complete(new RuntimeException("io exception"));
206+
stream1.complete(new RuntimeException("io exception",
207+
new UnexpectedResultException("inner", Status.of(StatusCode.CLIENT_INTERNAL_ERROR)))
208+
);
207209

208210
// Retry #1 - Stream is by runtime exception
209211
getScheduler().hasTasks(1).executeNextTasks(1);

0 commit comments

Comments
 (0)