Skip to content

Commit 606562d

Browse files
fix: added test coverage
Signed-off-by: Matt Peterson <[email protected]>
1 parent d82ccc6 commit 606562d

File tree

4 files changed

+279
-4
lines changed

4 files changed

+279
-4
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void update(final Routing routing) {
9494
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
9595
}
9696

97-
private StreamObserver<PublishStreamRequest> publishBlockStream(
97+
StreamObserver<PublishStreamRequest> publishBlockStream(
9898
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
9999
LOGGER.log(
100100
System.Logger.Level.DEBUG,
@@ -104,7 +104,7 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
104104
streamMediator, publishStreamResponseObserver, itemAckBuilder);
105105
}
106106

107-
private void subscribeBlockStream(
107+
void subscribeBlockStream(
108108
final SubscribeStreamRequest subscribeStreamRequest,
109109
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
110110
LOGGER.log(

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ public class Server {
3535

3636
// Function stubs to satisfy the routing param signatures. The implementations are in the
3737
// service class.
38-
private static ServerCalls.BidiStreamingMethod<
38+
static ServerCalls.BidiStreamingMethod<
3939
StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>>
4040
clientBidiStreamingMethod;
4141

42-
public static ServerCalls.ServerStreamingMethod<
42+
static ServerCalls.ServerStreamingMethod<
4343
SubscribeStreamRequest, StreamObserver<SubscribeStreamResponse>>
4444
serverStreamingMethod;
4545

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server;
18+
19+
import static com.hedera.block.protos.BlockStreamService.*;
20+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement;
21+
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
27+
import com.google.protobuf.Descriptors;
28+
import com.hedera.block.server.data.ObjectEvent;
29+
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
30+
import com.hedera.block.server.mediator.StreamMediator;
31+
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
32+
import com.hedera.block.server.persistence.storage.BlockAsDirReader;
33+
import com.hedera.block.server.persistence.storage.BlockAsDirWriter;
34+
import com.hedera.block.server.persistence.storage.BlockReader;
35+
import com.hedera.block.server.persistence.storage.BlockWriter;
36+
import com.hedera.block.server.producer.ItemAckBuilder;
37+
import com.hedera.block.server.util.TestUtils;
38+
import io.grpc.stub.StreamObserver;
39+
import java.io.IOException;
40+
import java.nio.file.Files;
41+
import java.nio.file.Path;
42+
import java.security.NoSuchAlgorithmException;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.function.Consumer;
46+
47+
import io.helidon.config.Config;
48+
import io.helidon.config.MapConfigSource;
49+
import io.helidon.config.spi.ConfigSource;
50+
import org.junit.jupiter.api.AfterEach;
51+
import org.junit.jupiter.api.BeforeEach;
52+
import org.junit.jupiter.api.Test;
53+
import org.junit.jupiter.api.extension.ExtendWith;
54+
import org.mockito.Mock;
55+
import org.mockito.junit.jupiter.MockitoExtension;
56+
57+
@ExtendWith(MockitoExtension.class)
58+
public class BlockStreamServiceIT {
59+
60+
private final System.Logger LOGGER = System.getLogger(getClass().getName());
61+
private final Object lock = new Object();
62+
63+
@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
64+
65+
@Mock private StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
66+
67+
@Mock private SubscribeStreamRequest subscribeStreamRequest;
68+
69+
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver1;
70+
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver2;
71+
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver3;
72+
73+
@Mock private BlockReader<Block> blockReader;
74+
@Mock private BlockWriter<BlockItem> blockWriter;
75+
@Mock private Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> testCallback;
76+
77+
private static final String TEMP_DIR = "block-node-unit-test-dir";
78+
private static final String JUNIT = "my-junit-test";
79+
80+
private Path testPath;
81+
private Config testConfig;
82+
83+
@BeforeEach
84+
public void setUp() throws IOException {
85+
testPath = Files.createTempDirectory(TEMP_DIR);
86+
LOGGER.log(System.Logger.Level.INFO, "Created temp directory: " + testPath.toString());
87+
88+
Map<String, String> testProperties = Map.of(JUNIT, testPath.toString());
89+
ConfigSource testConfigSource = MapConfigSource.builder().map(testProperties).build();
90+
testConfig = Config.builder(testConfigSource).build();
91+
}
92+
93+
@AfterEach
94+
public void tearDown() {
95+
TestUtils.deleteDirectory(testPath.toFile());
96+
}
97+
98+
@Test
99+
public void testPublishBlockStreamRegistrationAndExec()
100+
throws InterruptedException, IOException, NoSuchAlgorithmException {
101+
102+
final BlockStreamService blockStreamService =
103+
new BlockStreamService(50L, new ItemAckBuilder(), streamMediator);
104+
105+
final StreamObserver<PublishStreamRequest> streamObserver =
106+
blockStreamService.publishBlockStream(publishStreamResponseObserver);
107+
108+
final BlockItem blockItem = generateBlockItems(1).get(0);
109+
final PublishStreamRequest publishStreamRequest =
110+
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();
111+
112+
// Calling onNext() as Helidon will
113+
streamObserver.onNext(publishStreamRequest);
114+
115+
synchronized (lock) {
116+
lock.wait(50);
117+
}
118+
119+
final ItemAcknowledgement itemAck = new ItemAckBuilder().buildAck(blockItem);
120+
final PublishStreamResponse publishStreamResponse =
121+
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
122+
123+
// Verify the BlockItem message is sent to the mediator
124+
verify(streamMediator, times(1)).publishEvent(blockItem);
125+
126+
// Verify our custom StreamObserver implementation builds and sends
127+
// a response back to the producer
128+
verify(publishStreamResponseObserver, times(1)).onNext(publishStreamResponse);
129+
130+
// Close the stream as Helidon does
131+
streamObserver.onCompleted();
132+
133+
synchronized (lock) {
134+
lock.wait(50);
135+
}
136+
137+
// verify the onCompleted() method is invoked on the wrapped StreamObserver
138+
verify(publishStreamResponseObserver, times(1)).onCompleted();
139+
}
140+
141+
@Test
142+
public void testSubscribeBlockStream() throws InterruptedException {
143+
final var streamMediator =
144+
new LiveStreamMediatorImpl(
145+
new WriteThroughCacheHandler(blockReader, blockWriter), testCallback);
146+
147+
// Build the BlockStreamService
148+
final BlockStreamService blockStreamService =
149+
new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator);
150+
151+
// Subscribe the consumers
152+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
153+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
154+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3);
155+
156+
// Subscribe the producer
157+
final StreamObserver<PublishStreamRequest> streamObserver =
158+
blockStreamService.publishBlockStream(publishStreamResponseObserver);
159+
160+
// Build the BlockItem
161+
final List<BlockItem> blockItems = generateBlockItems(1);
162+
final PublishStreamRequest publishStreamRequest =
163+
PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build();
164+
165+
// Calling onNext() with a BlockItem
166+
streamObserver.onNext(publishStreamRequest);
167+
168+
synchronized (lock) {
169+
lock.wait(50);
170+
}
171+
172+
final SubscribeStreamResponse subscribeStreamResponse =
173+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build();
174+
175+
verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
176+
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
177+
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);
178+
179+
}
180+
181+
@Test
182+
public void testFullHappyPath() throws IOException, InterruptedException {
183+
184+
// Initialize with concrete a concrete BlockReader, BlockWriter and Mediator
185+
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
186+
final BlockWriter<BlockItem> blockWriter = new BlockAsDirWriter(JUNIT, testConfig);
187+
final var streamMediator =
188+
new LiveStreamMediatorImpl(
189+
new WriteThroughCacheHandler(blockReader, blockWriter), testCallback);
190+
191+
// Build the BlockStreamService
192+
final BlockStreamService blockStreamService =
193+
new BlockStreamService(1000L, new ItemAckBuilder(), streamMediator);
194+
195+
// Pass a StreamObserver to the producer as Helidon will
196+
final StreamObserver<PublishStreamRequest> streamObserver =
197+
blockStreamService.publishBlockStream(publishStreamResponseObserver);
198+
199+
final List<BlockItem> blockItems = generateBlockItems(1);
200+
201+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
202+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
203+
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver3);
204+
205+
final PublishStreamRequest publishStreamRequest =
206+
PublishStreamRequest.newBuilder().setBlockItem(blockItems.get(0)).build();
207+
streamObserver.onNext(publishStreamRequest);
208+
209+
synchronized (lock) {
210+
lock.wait(50);
211+
}
212+
213+
final SubscribeStreamResponse subscribeStreamResponse =
214+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.get(0)).build();
215+
verify(subscribeStreamObserver1, times(1)).onNext(subscribeStreamResponse);
216+
verify(subscribeStreamObserver2, times(1)).onNext(subscribeStreamResponse);
217+
verify(subscribeStreamObserver3, times(1)).onNext(subscribeStreamResponse);
218+
219+
}
220+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.hedera.block.server;
2+
3+
import com.google.protobuf.Descriptors;
4+
import com.hedera.block.server.data.ObjectEvent;
5+
import com.hedera.block.server.mediator.StreamMediator;
6+
import com.hedera.block.server.producer.ItemAckBuilder;
7+
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.api.extension.ExtendWith;
9+
import org.mockito.Mock;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
12+
import java.io.IOException;
13+
import java.security.NoSuchAlgorithmException;
14+
15+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.Mockito.never;
19+
import static org.mockito.Mockito.verify;
20+
21+
@ExtendWith(MockitoExtension.class)
22+
public class BlockStreamServiceTest {
23+
24+
@Mock
25+
private ItemAckBuilder itemAckBuilder;
26+
27+
@Mock private StreamMediator<ObjectEvent<com.hedera.block.protos.BlockStreamService.BlockItem>, com.hedera.block.protos.BlockStreamService.BlockItem> streamMediator;
28+
29+
@Test
30+
public void testServiceName() throws IOException, NoSuchAlgorithmException {
31+
final BlockStreamService blockStreamService =
32+
new BlockStreamService(50L, itemAckBuilder, streamMediator);
33+
34+
// Verify the service name
35+
assertEquals(Constants.SERVICE_NAME, blockStreamService.serviceName());
36+
37+
// Verify other methods not invoked
38+
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
39+
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
40+
}
41+
42+
@Test
43+
public void testProto() throws IOException, NoSuchAlgorithmException {
44+
final BlockStreamService blockStreamService =
45+
new BlockStreamService(50L, itemAckBuilder, streamMediator);
46+
Descriptors.FileDescriptor fileDescriptor = blockStreamService.proto();
47+
48+
// Verify the current rpc methods
49+
assertEquals(2, fileDescriptor.getServices().get(0).getMethods().size());
50+
51+
// Verify other methods not invoked
52+
verify(itemAckBuilder, never()).buildAck(any(BlockItem.class));
53+
verify(streamMediator, never()).publishEvent(any(BlockItem.class));
54+
}
55+
}

0 commit comments

Comments
 (0)