Skip to content

Commit 173ad53

Browse files
committed
Added RetryConfig to configure retries behavior
1 parent 1983bed commit 173ad53

File tree

5 files changed

+91
-20
lines changed

5 files changed

+91
-20
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.atomic.AtomicBoolean;
1010
import java.util.concurrent.atomic.AtomicInteger;
11-
import java.util.function.BiConsumer;
1211

1312
import org.slf4j.Logger;
1413

14+
import tech.ydb.common.retry.RetryConfig;
1515
import tech.ydb.core.Status;
1616

1717
/**
@@ -27,18 +27,18 @@ public abstract class GrpcStreamRetrier {
2727
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2828
.toCharArray();
2929

30+
private final RetryConfig retryConfig;
3031
protected final String id;
3132
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3233
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
3334
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
3435

3536
private final ScheduledExecutorService scheduler;
36-
private final BiConsumer<Status, Throwable> errorsHandler;
3737

38-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
38+
protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) {
39+
this.retryConfig = retryConfig;
3940
this.scheduler = scheduler;
4041
this.id = generateRandomId(ID_LENGTH);
41-
this.errorsHandler = errorsHandler;
4242
}
4343

4444
protected abstract Logger getLogger();
@@ -131,10 +131,6 @@ protected void onSessionClosed(Status status, Throwable th) {
131131
}
132132
}
133133

134-
if (errorsHandler != null) {
135-
errorsHandler.accept(status, th);
136-
}
137-
138134
if (!isStopped.get()) {
139135
tryScheduleReconnect();
140136
} else {

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
58+
super(settings.getRetryConfig(), topicRpc.getScheduler());
5959
this.topicRpc = topicRpc;
6060
this.settings = settings;
6161
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

+43-5
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99

1010
import com.google.common.collect.ImmutableList;
1111

12+
import tech.ydb.common.retry.RetryConfig;
13+
import tech.ydb.common.retry.RetryPolicy;
1214
import tech.ydb.core.Status;
15+
import tech.ydb.core.StatusCode;
1316

1417
/**
1518
* @author Nikolay Perfilov
@@ -20,17 +23,17 @@ public class ReaderSettings {
2023
private final String consumerName;
2124
private final String readerName;
2225
private final List<TopicReadSettings> topics;
26+
private final RetryConfig retryConfig;
2327
private final long maxMemoryUsageBytes;
2428
private final Executor decompressionExecutor;
25-
private final BiConsumer<Status, Throwable> errorsHandler;
2629

2730
private ReaderSettings(Builder builder) {
2831
this.consumerName = builder.consumerName;
2932
this.readerName = builder.readerName;
3033
this.topics = ImmutableList.copyOf(builder.topics);
34+
this.retryConfig = builder.retryConfig;
3135
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
3236
this.decompressionExecutor = builder.decompressionExecutor;
33-
this.errorsHandler = builder.errorsHandler;
3437
}
3538

3639
public String getConsumerName() {
@@ -42,12 +45,17 @@ public String getReaderName() {
4245
return readerName;
4346
}
4447

48+
public RetryConfig getRetryConfig() {
49+
return retryConfig;
50+
}
51+
4552
public List<TopicReadSettings> getTopics() {
4653
return topics;
4754
}
4855

56+
@Deprecated
4957
public BiConsumer<Status, Throwable> getErrorsHandler() {
50-
return errorsHandler;
58+
return null;
5159
}
5260

5361
public long getMaxMemoryUsageBytes() {
@@ -70,9 +78,9 @@ public static class Builder {
7078
private boolean readWithoutConsumer = false;
7179
private String readerName = null;
7280
private List<TopicReadSettings> topics = new ArrayList<>();
81+
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
7382
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
7483
private Executor decompressionExecutor = null;
75-
private BiConsumer<Status, Throwable> errorsHandler = null;
7684

7785
public Builder setConsumerName(String consumerName) {
7886
this.consumerName = consumerName;
@@ -91,6 +99,7 @@ public Builder withoutConsumer() {
9199

92100
/**
93101
* Set reader name for debug purposes
102+
* @param readerName name of reader
94103
* @return settings builder
95104
*/
96105
public Builder setReaderName(String readerName) {
@@ -108,13 +117,42 @@ public Builder setTopics(List<TopicReadSettings> topics) {
108117
return this;
109118
}
110119

120+
/**
121+
* Set {@link RetryConfig} to define behavior of the stream internal retries
122+
* @param config retry mode
123+
* @return settings builder
124+
*/
125+
public Builder setRetryConfig(RetryConfig config) {
126+
this.retryConfig = config;
127+
return this;
128+
}
129+
111130
public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
112131
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
113132
return this;
114133
}
115134

135+
/**
136+
* @param handler
137+
* @return builder
138+
* @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead
139+
*/
140+
@Deprecated
116141
public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
117-
this.errorsHandler = handler;
142+
final RetryConfig currentConfig = retryConfig;
143+
retryConfig = new RetryConfig() {
144+
@Override
145+
public RetryPolicy isStatusRetryable(StatusCode code) {
146+
handler.accept(Status.of(code), null);
147+
return currentConfig.isStatusRetryable(code);
148+
}
149+
150+
@Override
151+
public RetryPolicy isThrowableRetryable(Throwable th) {
152+
handler.accept(null, th);
153+
return currentConfig.isThrowableRetryable(th);
154+
}
155+
};
118156
return this;
119157
}
120158

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

+42-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import java.util.function.BiConsumer;
44

5+
import tech.ydb.common.retry.RetryConfig;
6+
import tech.ydb.common.retry.RetryPolicy;
57
import tech.ydb.core.Status;
8+
import tech.ydb.core.StatusCode;
69
import tech.ydb.topic.description.Codec;
710

811
/**
@@ -17,19 +20,19 @@ public class WriterSettings {
1720
private final String messageGroupId;
1821
private final Long partitionId;
1922
private final Codec codec;
23+
private final RetryConfig retryConfig;
2024
private final long maxSendBufferMemorySize;
2125
private final int maxSendBufferMessagesCount;
22-
private final BiConsumer<Status, Throwable> errorsHandler;
2326

2427
private WriterSettings(Builder builder) {
2528
this.topicPath = builder.topicPath;
2629
this.producerId = builder.producerId;
2730
this.messageGroupId = builder.messageGroupId;
2831
this.partitionId = builder.partitionId;
2932
this.codec = builder.codec;
33+
this.retryConfig = builder.retryConfig;
3034
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
3135
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
32-
this.errorsHandler = builder.errorsHandler;
3336
}
3437

3538
public static Builder newBuilder() {
@@ -48,8 +51,9 @@ public String getMessageGroupId() {
4851
return messageGroupId;
4952
}
5053

54+
@Deprecated
5155
public BiConsumer<Status, Throwable> getErrorsHandler() {
52-
return errorsHandler;
56+
return null;
5357
}
5458

5559
public Long getPartitionId() {
@@ -60,6 +64,10 @@ public Codec getCodec() {
6064
return codec;
6165
}
6266

67+
public RetryConfig getRetryConfig() {
68+
return retryConfig;
69+
}
70+
6371
public long getMaxSendBufferMemorySize() {
6472
return maxSendBufferMemorySize;
6573
}
@@ -77,9 +85,9 @@ public static class Builder {
7785
private String messageGroupId = null;
7886
private Long partitionId = null;
7987
private Codec codec = Codec.GZIP;
88+
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
8089
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
8190
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
82-
private BiConsumer<Status, Throwable> errorsHandler = null;
8391

8492
/**
8593
* Set path to a topic to write to
@@ -135,6 +143,16 @@ public Builder setCodec(Codec codec) {
135143
return this;
136144
}
137145

146+
/**
147+
* Set {@link RetryConfig} to define behavior of the stream internal retries
148+
* @param config retry mode
149+
* @return settings builder
150+
*/
151+
public Builder setRetryConfig(RetryConfig config) {
152+
this.retryConfig = config;
153+
return this;
154+
}
155+
138156
/**
139157
* Set memory usage limit for send buffer.
140158
* Writer will not accept new messages if memory usage exceeds this limit.
@@ -158,8 +176,27 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) {
158176
return this;
159177
}
160178

179+
/**
180+
* @param handler
181+
* @return builder
182+
* @deprecated use {@link Builder#setRetryConfig(tech.ydb.common.retry.RetryConfig)} instead
183+
*/
184+
@Deprecated
161185
public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
162-
this.errorsHandler = handler;
186+
final RetryConfig currentConfig = retryConfig;
187+
retryConfig = new RetryConfig() {
188+
@Override
189+
public RetryPolicy isStatusRetryable(StatusCode code) {
190+
handler.accept(Status.of(code), null);
191+
return currentConfig.isStatusRetryable(code);
192+
}
193+
194+
@Override
195+
public RetryPolicy isThrowableRetryable(Throwable th) {
196+
handler.accept(null, th);
197+
return currentConfig.isThrowableRetryable(th);
198+
}
199+
};
163200
return this;
164201
}
165202

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
69+
super(settings.getRetryConfig(), topicRpc.getScheduler());
7070
this.topicRpc = topicRpc;
7171
this.settings = settings;
7272
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)