Skip to content

Commit 9d22563

Browse files
committed
Added RetryMode to configure retries behavior
1 parent 28bee3a commit 9d22563

File tree

6 files changed

+57
-3
lines changed

6 files changed

+57
-3
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.slf4j.Logger;
1313

1414
import tech.ydb.core.Status;
15+
import tech.ydb.topic.settings.RetryMode;
1516

1617
/**
1718
* @author Nikolay Perfilov
@@ -26,13 +27,15 @@ public abstract class GrpcStreamRetrier {
2627
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2728
.toCharArray();
2829

30+
private final RetryMode retryMode;
2931
protected final String id;
3032
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3133
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
3234
private final ScheduledExecutorService scheduler;
3335
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
3436

35-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
37+
protected GrpcStreamRetrier(RetryMode retryMode, ScheduledExecutorService scheduler) {
38+
this.retryMode = retryMode;
3639
this.scheduler = scheduler;
3740
this.id = generateRandomId(ID_LENGTH);
3841
}

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler());
58+
super(settings.getRetryMode(), topicRpc.getScheduler());
5959
this.topicRpc = topicRpc;
6060
this.settings = settings;
6161
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

+18
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ public class ReaderSettings {
1717
private final String consumerName;
1818
private final String readerName;
1919
private final List<TopicReadSettings> topics;
20+
private final RetryMode retryMode;
2021
private final long maxMemoryUsageBytes;
2122
private final Executor decompressionExecutor;
2223

2324
private ReaderSettings(Builder builder) {
2425
this.consumerName = builder.consumerName;
2526
this.readerName = builder.readerName;
2627
this.topics = ImmutableList.copyOf(builder.topics);
28+
this.retryMode = builder.retryMode;
2729
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
2830
this.decompressionExecutor = builder.decompressionExecutor;
2931
}
@@ -37,6 +39,10 @@ public String getReaderName() {
3739
return readerName;
3840
}
3941

42+
public RetryMode getRetryMode() {
43+
return retryMode;
44+
}
45+
4046
public List<TopicReadSettings> getTopics() {
4147
return topics;
4248
}
@@ -61,6 +67,7 @@ public static class Builder {
6167
private boolean readWithoutConsumer = false;
6268
private String readerName = null;
6369
private List<TopicReadSettings> topics = new ArrayList<>();
70+
private RetryMode retryMode = RetryMode.ALWAYS;
6471
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
6572
private Executor decompressionExecutor = null;
6673

@@ -81,6 +88,7 @@ public Builder withoutConsumer() {
8188

8289
/**
8390
* Set reader name for debug purposes
91+
* @param readerName name of reader
8492
* @return settings builder
8593
*/
8694
public Builder setReaderName(String readerName) {
@@ -98,6 +106,16 @@ public Builder setTopics(List<TopicReadSettings> topics) {
98106
return this;
99107
}
100108

109+
/**
110+
* Set {@link RetryMode} to define behavior of the stream internal retries
111+
* @param mode retry mode
112+
* @return settings builder
113+
*/
114+
public Builder setRetryMode(RetryMode mode) {
115+
this.retryMode = mode;
116+
return this;
117+
}
118+
101119
public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
102120
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
103121
return this;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package tech.ydb.topic.settings;
2+
3+
/**
4+
*
5+
* @author Aleksandr Gorshenin
6+
*/
7+
public enum RetryMode {
8+
/** Never retry stream. After any error reader/writer will be stopped */
9+
NONE,
10+
11+
/** Don't retry first attempt to establish a stream, but after successful connection retry any error until stop */
12+
RECOVER,
13+
14+
/** Retry any error until reader/writer was stopped by client */
15+
ALWAYS,
16+
}

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

+17
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class WriterSettings {
1414
private final String messageGroupId;
1515
private final Long partitionId;
1616
private final Codec codec;
17+
private final RetryMode retryMode;
1718
private final long maxSendBufferMemorySize;
1819
private final int maxSendBufferMessagesCount;
1920

@@ -23,6 +24,7 @@ private WriterSettings(Builder builder) {
2324
this.messageGroupId = builder.messageGroupId;
2425
this.partitionId = builder.partitionId;
2526
this.codec = builder.codec;
27+
this.retryMode = builder.retryMode;
2628
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
2729
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
2830
}
@@ -51,6 +53,10 @@ public Codec getCodec() {
5153
return codec;
5254
}
5355

56+
public RetryMode getRetryMode() {
57+
return retryMode;
58+
}
59+
5460
public long getMaxSendBufferMemorySize() {
5561
return maxSendBufferMemorySize;
5662
}
@@ -68,6 +74,7 @@ public static class Builder {
6874
private String messageGroupId = null;
6975
private Long partitionId = null;
7076
private Codec codec = Codec.GZIP;
77+
private RetryMode retryMode = RetryMode.ALWAYS;
7178
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
7279
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
7380

@@ -125,6 +132,16 @@ public Builder setCodec(Codec codec) {
125132
return this;
126133
}
127134

135+
/**
136+
* Set {@link RetryMode} to define behavior of the stream internal retries
137+
* @param mode retry mode
138+
* @return settings builder
139+
*/
140+
public Builder setRetryMode(RetryMode mode) {
141+
this.retryMode = mode;
142+
return this;
143+
}
144+
128145
/**
129146
* Set memory usage limit for send buffer.
130147
* Writer will not accept new messages if memory usage exceeds this limit.

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler());
69+
super(settings.getRetryMode(), topicRpc.getScheduler());
7070
this.topicRpc = topicRpc;
7171
this.settings = settings;
7272
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)