Skip to content

Commit 44bc7de

Browse files
committed
Tests for RetryMode
1 parent 0f8a74b commit 44bc7de

File tree

1 file changed

+233
-0
lines changed

1 file changed

+233
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.concurrent.ExecutionException;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.TimeoutException;
6+
7+
import org.junit.Assert;
8+
import org.junit.Test;
9+
10+
import tech.ydb.core.Status;
11+
import tech.ydb.core.StatusCode;
12+
import tech.ydb.topic.settings.RetryMode;
13+
import tech.ydb.topic.settings.WriterSettings;
14+
import tech.ydb.topic.write.Message;
15+
import tech.ydb.topic.write.SyncWriter;
16+
17+
/**
18+
*
19+
* @author Aleksandr Gorshenin
20+
*/
21+
public class RetryModeTest extends BaseMockedTest {
22+
23+
@Test
24+
public void alwaysRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
25+
mockStreams()
26+
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE))
27+
.then(defaultStreamMockAnswer())
28+
.then(errorStreamMockAnswer(StatusCode.OVERLOADED))
29+
.then(defaultStreamMockAnswer()); // and repeat
30+
31+
SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
32+
.setTopicPath("/mocked_topic")
33+
.setRetryMode(RetryMode.ALWAYS)
34+
.build());
35+
writer.init();
36+
37+
// Retry #1 - TRANSPORT_UNAVAILABLE
38+
Assert.assertNull(currentStream());
39+
getScheduler().hasTasks(1).executeNextTasks(1);
40+
41+
MockedWriteStream stream1 = currentStream();
42+
stream1.nextMsg().isInit().hasInitPath("/mocked_topic");
43+
stream1.hasNoNewMessages();
44+
stream1.responseInit(0);
45+
46+
writer.send(Message.of("test-message".getBytes()));
47+
stream1.nextMsg().isWrite().hasWrite(2, 1);
48+
stream1.responseWriteWritten(1, 1);
49+
50+
stream1.complete(Status.SUCCESS);
51+
52+
// Retry #2 - Stream is closed by server
53+
getScheduler().hasTasks(1).executeNextTasks(1);
54+
55+
// Retry #3 - OVERLOADED
56+
getScheduler().hasTasks(1).executeNextTasks(1);
57+
58+
MockedWriteStream stream2 = currentStream();
59+
Assert.assertNotEquals(stream1, stream2);
60+
61+
stream2.nextMsg().isInit().hasInitPath("/mocked_topic");
62+
stream2.hasNoNewMessages();
63+
stream2.responseErrorBadRequest();
64+
65+
// Retry #4 - Stream send bad request
66+
getScheduler().hasTasks(1).executeNextTasks(1);
67+
68+
MockedWriteStream stream3 = currentStream();
69+
Assert.assertNotEquals(stream2, stream3);
70+
71+
stream3.nextMsg().isInit().hasInitPath("/mocked_topic");
72+
stream3.hasNoNewMessages();
73+
stream3.responseInit(1);
74+
75+
writer.send(Message.of("other-message".getBytes()));
76+
stream3.nextMsg().isWrite().hasWrite(2, 1);
77+
stream3.responseWriteWritten(2, 1);
78+
79+
writer.flush();
80+
writer.shutdown(1, TimeUnit.SECONDS);
81+
stream3.complete(Status.SUCCESS);
82+
}
83+
84+
@Test
85+
public void disabledRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException {
86+
mockStreams()
87+
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE));
88+
89+
WriterSettings settings = WriterSettings.newBuilder()
90+
.setTopicPath("/mocked_topic")
91+
.setRetryMode(RetryMode.NONE)
92+
.build();
93+
94+
SyncWriter writer = client.createSyncWriter(settings);
95+
writer.init();
96+
97+
// No stream and no retries in scheduler
98+
Assert.assertNull(currentStream());
99+
getScheduler().hasNoTasks();
100+
101+
RuntimeException ex = Assert.assertThrows(RuntimeException.class,
102+
() -> writer.send(Message.of("test-message".getBytes())));
103+
Assert.assertEquals("Writer is already stopped", ex.getMessage());
104+
105+
writer.shutdown(1, TimeUnit.SECONDS);
106+
}
107+
108+
@Test
109+
public void disabledRetryStreamCloseTest() throws InterruptedException, ExecutionException, TimeoutException {
110+
WriterSettings settings = WriterSettings.newBuilder()
111+
.setTopicPath("/mocked_topic")
112+
.setRetryMode(RetryMode.NONE)
113+
.build();
114+
115+
SyncWriter writer = client.createSyncWriter(settings);
116+
writer.init();
117+
118+
MockedWriteStream stream1 = currentStream();
119+
stream1.nextMsg().isInit().hasInitPath("/mocked_topic");
120+
stream1.hasNoNewMessages();
121+
stream1.responseInit(0);
122+
123+
// Even successful completing closes writer
124+
stream1.complete(Status.SUCCESS);
125+
126+
RuntimeException ex = Assert.assertThrows(RuntimeException.class,
127+
() -> writer.send(Message.of("test-message".getBytes())));
128+
Assert.assertEquals("Writer is already stopped", ex.getMessage());
129+
130+
writer.shutdown(1, TimeUnit.SECONDS);
131+
}
132+
133+
@Test
134+
public void disabledRetryStreamErrorTest() throws InterruptedException, ExecutionException, TimeoutException {
135+
WriterSettings settings = WriterSettings.newBuilder()
136+
.setTopicPath("/mocked_topic")
137+
.setRetryMode(RetryMode.NONE)
138+
.build();
139+
140+
SyncWriter writer = client.createSyncWriter(settings);
141+
writer.init();
142+
143+
MockedWriteStream stream1 = currentStream();
144+
stream1.nextMsg().isInit().hasInitPath("/mocked_topic");
145+
stream1.hasNoNewMessages();
146+
stream1.responseInit(0);
147+
148+
stream1.responseErrorBadRequest();
149+
stream1.complete(Status.SUCCESS);
150+
151+
RuntimeException ex = Assert.assertThrows(RuntimeException.class,
152+
() -> writer.send(Message.of("test-message".getBytes())));
153+
Assert.assertEquals("Writer is already stopped", ex.getMessage());
154+
155+
writer.shutdown(1, TimeUnit.SECONDS);
156+
}
157+
158+
@Test
159+
public void recoverRetryNetworkErrorTest() throws InterruptedException, ExecutionException, TimeoutException {
160+
mockStreams()
161+
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE));
162+
163+
WriterSettings settings = WriterSettings.newBuilder()
164+
.setTopicPath("/mocked_topic")
165+
.setRetryMode(RetryMode.RECOVER)
166+
.build();
167+
168+
SyncWriter writer = client.createSyncWriter(settings);
169+
writer.init();
170+
171+
// No stream and no retries in scheduler
172+
Assert.assertNull(currentStream());
173+
getScheduler().hasNoTasks();
174+
175+
RuntimeException ex = Assert.assertThrows(RuntimeException.class,
176+
() -> writer.send(Message.of("test-message".getBytes())));
177+
Assert.assertEquals("Writer is already stopped", ex.getMessage());
178+
179+
writer.shutdown(1, TimeUnit.SECONDS);
180+
}
181+
182+
@Test
183+
public void recoverRetryWriterTest() throws InterruptedException, ExecutionException, TimeoutException {
184+
mockStreams()
185+
.then(defaultStreamMockAnswer())
186+
.then(errorStreamMockAnswer(StatusCode.OVERLOADED))
187+
.then(errorStreamMockAnswer(StatusCode.TRANSPORT_UNAVAILABLE))
188+
.then(errorStreamMockAnswer(StatusCode.OVERLOADED))
189+
.then(defaultStreamMockAnswer()); // and repeat
190+
191+
SyncWriter writer = client.createSyncWriter(WriterSettings.newBuilder()
192+
.setTopicPath("/mocked_topic")
193+
.setRetryMode(RetryMode.RECOVER)
194+
.build());
195+
writer.init();
196+
197+
MockedWriteStream stream1 = currentStream();
198+
stream1.nextMsg().isInit().hasInitPath("/mocked_topic");
199+
stream1.hasNoNewMessages();
200+
stream1.responseInit(0);
201+
202+
writer.send(Message.of("test-message".getBytes()));
203+
stream1.nextMsg().isWrite().hasWrite(2, 1);
204+
stream1.responseWriteWritten(1, 1);
205+
206+
stream1.complete(new RuntimeException("io exception"));
207+
208+
// Retry #1 - Stream is by runtime exception
209+
getScheduler().hasTasks(1).executeNextTasks(1);
210+
211+
// Retry #2 - OVERLOADED
212+
getScheduler().hasTasks(1).executeNextTasks(1);
213+
// Retry #3 - TRANSPORT_UNAVAILABLE
214+
getScheduler().hasTasks(1).executeNextTasks(1);
215+
// Retry #4 - OVERLOADED
216+
getScheduler().hasTasks(1).executeNextTasks(1);
217+
218+
MockedWriteStream stream2 = currentStream();
219+
Assert.assertNotEquals(stream1, stream2);
220+
221+
stream2.nextMsg().isInit().hasInitPath("/mocked_topic");
222+
stream2.hasNoNewMessages();
223+
stream2.responseInit(1);
224+
225+
writer.send(Message.of("other-message".getBytes()));
226+
stream2.nextMsg().isWrite().hasWrite(2, 1);
227+
stream2.responseWriteWritten(2, 1);
228+
229+
writer.flush();
230+
writer.shutdown(1, TimeUnit.SECONDS);
231+
stream2.complete(Status.SUCCESS);
232+
}
233+
}

0 commit comments

Comments
 (0)