Skip to content

Commit 35d4219

Browse files
committed
chore: improvements for test repeatability (#642)
Signed-off-by: Alex Kehayov <[email protected]>
1 parent 3cfe6bf commit 35d4219

File tree

5 files changed

+227
-83
lines changed

5 files changed

+227
-83
lines changed

server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.time.Clock;
3434
import java.util.List;
3535
import java.util.Objects;
36+
import java.util.concurrent.Executor;
3637
import java.util.concurrent.ExecutorCompletionService;
3738
import java.util.concurrent.ExecutorService;
3839
import java.util.concurrent.Executors;
@@ -135,7 +136,42 @@ public Pipeline<? super Bytes> open(
135136
case subscribeBlockStream -> Pipelines
136137
.<SubscribeStreamRequest, SubscribeStreamResponseUnparsed>serverStreaming()
137138
.mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options))
138-
.method(this::subscribeBlockStream)
139+
.method((subscribeStreamRequest, helidonConsumerObserver) -> subscribeBlockStream(
140+
subscribeStreamRequest, helidonConsumerObserver, Executors.newSingleThreadExecutor()))
141+
.mapResponse(reply -> createSubscribeStreamResponse(reply, options))
142+
.respondTo(replies)
143+
.build();
144+
};
145+
} catch (Exception e) {
146+
replies.onError(e);
147+
return Pipelines.noop();
148+
}
149+
}
150+
151+
@NonNull
152+
public Pipeline<? super Bytes> openWithSpecifiedExecutor(
153+
final @NonNull Method method,
154+
final @NonNull RequestOptions options,
155+
final @NonNull Pipeline<? super Bytes> replies,
156+
final @NonNull Executor executor) {
157+
158+
final var m = (BlockStreamMethod) method;
159+
try {
160+
return switch (m) {
161+
case publishBlockStream -> {
162+
notifier.unsubscribeAllExpired();
163+
yield Pipelines.<List<BlockItemUnparsed>, PublishStreamResponse>bidiStreaming()
164+
.mapRequest(bytes -> parsePublishStreamRequest(bytes, options))
165+
.method(this::publishBlockStream)
166+
.mapResponse(bytes -> createPublishStreamResponse(bytes, options))
167+
.respondTo(replies)
168+
.build();
169+
}
170+
case subscribeBlockStream -> Pipelines
171+
.<SubscribeStreamRequest, SubscribeStreamResponseUnparsed>serverStreaming()
172+
.mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options))
173+
.method((subscribeStreamRequest, helidonConsumerObserver) ->
174+
subscribeBlockStream(subscribeStreamRequest, helidonConsumerObserver, executor))
139175
.mapResponse(reply -> createSubscribeStreamResponse(reply, options))
140176
.respondTo(replies)
141177
.build();
@@ -193,7 +229,8 @@ Pipeline<List<BlockItemUnparsed>> publishBlockStream(
193229
*/
194230
void subscribeBlockStream(
195231
@NonNull final SubscribeStreamRequest subscribeStreamRequest,
196-
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver) {
232+
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
233+
@NonNull final Executor executor) {
197234

198235
LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method");
199236

@@ -223,7 +260,7 @@ void subscribeBlockStream(
223260
// stream (endBlockNumber is 0)
224261
if (subscribeStreamRequest.endBlockNumber() == 0) {
225262
final var liveStreamEventHandler = LiveStreamEventHandlerBuilder.build(
226-
new ExecutorCompletionService<>(Executors.newSingleThreadExecutor()),
263+
new ExecutorCompletionService<>(executor),
227264
Clock.systemDefaultZone(),
228265
streamMediator,
229266
helidonConsumerObserver,

server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package com.hedera.block.server.mediator;
33

4+
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;
45
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems;
56
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockStreamMediatorError;
67
import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY;
@@ -30,6 +31,7 @@
3031
import com.hedera.block.server.persistence.storage.write.AsyncNoOpWriterFactory;
3132
import com.hedera.block.server.service.ServiceStatus;
3233
import com.hedera.block.server.service.ServiceStatusImpl;
34+
import com.hedera.block.server.util.BlockingExecutorService;
3335
import com.hedera.block.server.util.PersistTestUtils;
3436
import com.hedera.block.server.util.TestConfigUtil;
3537
import com.hedera.hapi.block.BlockItemSetUnparsed;
@@ -148,7 +150,9 @@ void testUnsubscribeEach() throws InterruptedException, IOException {
148150
}
149151

150152
@Test
151-
void testMediatorPersistenceWithoutSubscribers() throws IOException {
153+
void testMediatorPersistenceWithoutSubscribers() throws IOException, InterruptedException {
154+
// 1 block is expected to be processed, so the expected tasks param is set to 1
155+
final BlockingExecutorService executor = new BlockingExecutorService(1, 1);
152156
final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
153157
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
154158
final LiveStreamMediator streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus)
@@ -167,21 +171,20 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException {
167171
serviceStatus,
168172
ackHandlerMock,
169173
writerFactory,
170-
executorMock,
174+
executor,
171175
archiverMock,
172176
persistenceStorageConfig);
173177
streamMediator.subscribe(handler);
174178

175179
// Acting as a producer, notify the mediator of a new block
176180
streamMediator.publish(blockItemUnparsed);
177181

182+
// Wait all the tasks to complete before the assertions to avoid flakiness
183+
executor.waitTasksToComplete();
184+
178185
// Verify the counter was incremented
179186
assertEquals(10, blockNodeContext.metricsService().get(LiveBlockItems).get());
180-
181-
// @todo(642) we need to employ the same technique here to inject a writer that will ensure
182-
// the tasks are complete before we can verify the metrics for blocks persisted
183-
// the test will pass without flaking if we do that
184-
// assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get());
187+
assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get());
185188
}
186189

187190
@Test

0 commit comments

Comments
 (0)