Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ab6dfba

Browse files
committedJan 23, 2025
Added base mock implementation for unit tests
1 parent fe2fc99 commit ab6dfba

File tree

3 files changed

+237
-1
lines changed

3 files changed

+237
-1
lines changed
 

‎topic/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,21 @@
4040
<artifactId>zstd-jni</artifactId>
4141
<version>1.5.2-5</version>
4242
</dependency>
43+
4344
<dependency>
4445
<groupId>junit</groupId>
4546
<artifactId>junit</artifactId>
4647
<scope>test</scope>
4748
</dependency>
49+
<dependency>
50+
<groupId>org.mockito</groupId>
51+
<artifactId>mockito-inline</artifactId>
52+
<scope>test</scope>
53+
</dependency>
4854
<dependency>
4955
<groupId>tech.ydb.test</groupId>
5056
<artifactId>ydb-junit4-support</artifactId>
57+
<scope>test</scope>
5158
</dependency>
5259
<dependency>
5360
<groupId>org.apache.logging.log4j</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Queue;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ConcurrentLinkedQueue;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.stream.Collectors;
11+
import java.util.stream.IntStream;
12+
import java.util.stream.LongStream;
13+
14+
import org.junit.Assert;
15+
import org.junit.Before;
16+
import org.mockito.Mockito;
17+
import org.mockito.invocation.InvocationOnMock;
18+
import org.mockito.stubbing.Answer;
19+
import org.mockito.stubbing.OngoingStubbing;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import tech.ydb.core.Status;
24+
import tech.ydb.core.StatusCode;
25+
import tech.ydb.core.grpc.GrpcReadStream;
26+
import tech.ydb.core.grpc.GrpcReadWriteStream;
27+
import tech.ydb.core.grpc.GrpcTransport;
28+
import tech.ydb.proto.StatusCodesProtos;
29+
import tech.ydb.proto.topic.YdbTopic;
30+
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
31+
import tech.ydb.topic.TopicClient;
32+
33+
/**
34+
*
35+
* @author Aleksandr Gorshenin
36+
*/
37+
public class BaseMockedTest {
38+
private static final Logger logger = LoggerFactory.getLogger(BaseMockedTest.class);
39+
40+
private interface WriteStream extends
41+
GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> {
42+
}
43+
44+
private final GrpcTransport transport = Mockito.mock(GrpcTransport.class);
45+
private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class);
46+
private final ScheduledFuture<?> emptyFuture = Mockito.mock(ScheduledFuture.class);
47+
private final WriteStream writeStream = Mockito.mock(WriteStream.class);
48+
private final SchedulerAssert schedulerHelper = new SchedulerAssert();
49+
50+
protected final TopicClient client = TopicClient.newClient(transport)
51+
.setCompressionExecutor(Runnable::run) // Disable compression in separate executors
52+
.build();
53+
54+
private volatile MockedWriteStream streamMock = null;
55+
56+
@Before
57+
public void beforeEach() {
58+
streamMock = null;
59+
60+
Mockito.when(transport.getScheduler()).thenReturn(scheduler);
61+
Mockito.when(transport.readWriteStreamCall(Mockito.eq(TopicServiceGrpc.getStreamWriteMethod()), Mockito.any()))
62+
.thenReturn(writeStream);
63+
64+
// Every writeStream.start updates mockedWriteStream
65+
Mockito.when(writeStream.start(Mockito.any())).thenAnswer(defaultStreamMockAnswer());
66+
67+
// Every writeStream.senbNext add message from client to mockedWriteStream.sent list
68+
Mockito.doAnswer((Answer<Void>) (InvocationOnMock iom) -> {
69+
streamMock.sent.add(iom.getArgument(0, YdbTopic.StreamWriteMessage.FromClient.class));
70+
return null;
71+
}).when(writeStream).sendNext(Mockito.any());
72+
73+
Mockito.when(scheduler.schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any()))
74+
.thenAnswer((InvocationOnMock iom) -> {
75+
logger.debug("mock scheduled task");
76+
schedulerHelper.tasks.add(iom.getArgument(0, Runnable.class));
77+
return emptyFuture;
78+
});
79+
}
80+
81+
protected MockedWriteStream currentStream() {
82+
return streamMock;
83+
}
84+
85+
protected SchedulerAssert getScheduler() {
86+
return schedulerHelper;
87+
}
88+
89+
protected OngoingStubbing<CompletableFuture<Status>> mockStreams() {
90+
return Mockito.when(writeStream.start(Mockito.any()));
91+
}
92+
93+
protected Answer<CompletableFuture<Status>> defaultStreamMockAnswer() {
94+
return (InvocationOnMock iom) -> {
95+
streamMock = new MockedWriteStream(iom.getArgument(0));
96+
return streamMock.streamFuture;
97+
};
98+
}
99+
100+
protected Answer<CompletableFuture<Status>> errorStreamMockAnswer(StatusCode code) {
101+
return (iom) -> {
102+
streamMock = null;
103+
return CompletableFuture.completedFuture(Status.of(code));
104+
};
105+
}
106+
107+
protected static class SchedulerAssert {
108+
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
109+
110+
public SchedulerAssert hasNoTasks() {
111+
Assert.assertTrue(tasks.isEmpty());
112+
return this;
113+
}
114+
115+
public SchedulerAssert hasTasks(int count) {
116+
Assert.assertEquals(count, tasks.size());
117+
return this;
118+
}
119+
120+
public SchedulerAssert executeNextTasks(int count) {
121+
Assert.assertTrue(count <= tasks.size());
122+
123+
CompletableFuture.runAsync(() -> {
124+
logger.debug("execute {} scheduled tasks", count);
125+
for (int idx = 0; idx < count; idx++) {
126+
tasks.poll().run();
127+
}
128+
}).join();
129+
return this;
130+
}
131+
}
132+
133+
protected static class MockedWriteStream {
134+
private final GrpcReadWriteStream.Observer<YdbTopic.StreamWriteMessage.FromServer> observer;
135+
private final CompletableFuture<Status> streamFuture = new CompletableFuture<>();
136+
private final List<YdbTopic.StreamWriteMessage.FromClient> sent = new ArrayList<>();
137+
private volatile int sentIdx = 0;
138+
139+
public MockedWriteStream(GrpcReadStream.Observer<YdbTopic.StreamWriteMessage.FromServer> observer) {
140+
this.observer = observer;
141+
}
142+
143+
public void complete(Status status) {
144+
streamFuture.complete(status);
145+
}
146+
147+
public void complete(Throwable th) {
148+
streamFuture.completeExceptionally(th);
149+
}
150+
151+
public void hasNoNewMessages() {
152+
Assert.assertTrue(sentIdx >= sent.size());
153+
}
154+
155+
public Checker nextMsg() {
156+
Assert.assertTrue(sentIdx < sent.size());
157+
return new Checker(sent.get(sentIdx++));
158+
}
159+
160+
public void responseErrorBadRequest() {
161+
YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder()
162+
.setStatus(StatusCodesProtos.StatusIds.StatusCode.BAD_REQUEST)
163+
.build();
164+
observer.onNext(msg);
165+
}
166+
167+
public void responseInit(long lastSeqNo) {
168+
responseInit(lastSeqNo, 123, "mocked", new int[] { 0, 1, 2});
169+
}
170+
171+
public void responseInit(long lastSeqNo, long partitionId, String sessionId, int[] codecs) {
172+
YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder()
173+
.setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS)
174+
.setInitResponse(YdbTopic.StreamWriteMessage.InitResponse.newBuilder()
175+
.setLastSeqNo(lastSeqNo)
176+
.setPartitionId(partitionId)
177+
.setSessionId(sessionId)
178+
.setSupportedCodecs(YdbTopic.SupportedCodecs.newBuilder()
179+
.addAllCodecs(IntStream.of(codecs).boxed().collect(Collectors.toList())))
180+
).build();
181+
observer.onNext(msg);
182+
}
183+
184+
public void responseWriteWritten(long firstSeqNo, int messagesCount) {
185+
List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acks = LongStream
186+
.range(firstSeqNo, firstSeqNo + messagesCount)
187+
.mapToObj(seqNo -> YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.newBuilder()
188+
.setSeqNo(seqNo)
189+
.setWritten(YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Written.newBuilder())
190+
.build())
191+
.collect(Collectors.toList());
192+
193+
YdbTopic.StreamWriteMessage.FromServer msg = YdbTopic.StreamWriteMessage.FromServer.newBuilder()
194+
.setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS)
195+
.setWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse.newBuilder().addAllAcks(acks))
196+
.build();
197+
observer.onNext(msg);
198+
}
199+
200+
protected class Checker {
201+
private final YdbTopic.StreamWriteMessage.FromClient msg;
202+
203+
public Checker(YdbTopic.StreamWriteMessage.FromClient msg) {
204+
this.msg = msg;
205+
}
206+
207+
public Checker isInit() {
208+
Assert.assertTrue(msg.hasInitRequest());
209+
return this;
210+
}
211+
212+
public Checker hasInitPath(String path) {
213+
Assert.assertEquals(path, msg.getInitRequest().getPath());
214+
return this;
215+
}
216+
217+
public Checker isWrite() {
218+
Assert.assertTrue(msg.hasWriteRequest());
219+
return this;
220+
}
221+
222+
public Checker hasWrite(int codec, int messagesCount) {
223+
Assert.assertEquals(codec, msg.getWriteRequest().getCodec());
224+
Assert.assertEquals(messagesCount, msg.getWriteRequest().getMessagesCount());
225+
return this;
226+
}
227+
}
228+
}
229+
}

‎topic/src/test/resources/log4j2.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<Configuration status="WARN">
33
<Appenders>
44
<Console name="Console" target="SYSTEM_OUT">
5-
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
5+
<PatternLayout pattern="[%level]\t%d{HH:mm:ss.SSS} [%t] %36.36logger{36} - %msg%n"/>
66
</Console>
77
</Appenders>
88

0 commit comments

Comments
 (0)
Please sign in to comment.