Skip to content

Commit

Permalink
chore: improvements for test repeatability (#766)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kehayov <[email protected]>
  • Loading branch information
AlexKehayov committed Mar 11, 2025
1 parent c42d534 commit 14f1d62
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,37 @@ public Pipeline<? super Bytes> open(
final @NonNull RequestOptions options,
final @NonNull Pipeline<? super Bytes> replies) {

final var m = (BlockStreamMethod) method;
return openPipeline(
(BlockStreamMethod) method,
options,
replies,
openRangeHistoricStreamingExecutorService,
closedRangeHistoricStreamingExecutorService);
}

@NonNull
public Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Pipeline<? super Bytes> replies,
final @NonNull ExecutorService openRangeHistoricStreamingExecutorService,
final @NonNull ExecutorService closedRangeHistoricStreamingExecutorService) {

return openPipeline(
(BlockStreamMethod) method,
options,
replies,
openRangeHistoricStreamingExecutorService,
closedRangeHistoricStreamingExecutorService);
}

private Pipeline<? super Bytes> openPipeline(
@NonNull BlockStreamMethod method,
@NonNull RequestOptions options,
@NonNull Pipeline<? super Bytes> replies,
@NonNull ExecutorService openRangeHistoricStreamingExecutorService,
@NonNull ExecutorService closedRangeHistoricStreamingExecutorService) {
final var m = method;
try {
return switch (m) {
case publishBlockStream -> {
Expand All @@ -145,7 +175,11 @@ public Pipeline<? super Bytes> open(
case subscribeBlockStream -> Pipelines
.<SubscribeStreamRequest, SubscribeStreamResponseUnparsed>serverStreaming()
.mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options))
.method(this::subscribeBlockStream)
.method((subscribeStreamRequest, helidonConsumerObserver) -> subscribeBlockStream(
subscribeStreamRequest,
helidonConsumerObserver,
openRangeHistoricStreamingExecutorService,
closedRangeHistoricStreamingExecutorService))
.mapResponse(reply -> createSubscribeStreamResponse(reply, options))
.respondTo(replies)
.build();
Expand Down Expand Up @@ -201,7 +235,9 @@ Pipeline<List<BlockItemUnparsed>> publishBlockStream(
*/
void subscribeBlockStream(
@NonNull final SubscribeStreamRequest subscribeStreamRequest,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver) {
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
@NonNull final ExecutorService openRangeHistoricStreamingExecutorService,
@NonNull final ExecutorService closedRangeHistoricStreamingExecutorService) {

LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.mediator;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Gauge.Consumers;
import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY;
Expand Down Expand Up @@ -32,6 +33,7 @@
import com.hedera.block.server.service.ServiceConfig;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.service.ServiceStatusImpl;
import com.hedera.block.server.util.BlockingExecutorService;
import com.hedera.block.server.util.PersistTestUtils;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.hapi.block.BlockItemSetUnparsed;
Expand Down Expand Up @@ -174,7 +176,9 @@ void testUnsubscribeEach() throws InterruptedException {
}

@Test
void testMediatorPersistenceWithoutSubscribers() throws IOException {
void testMediatorPersistenceWithoutSubscribers() throws IOException, InterruptedException {
// 1 block is expected to be processed, so the expected tasks param is set to 1
final BlockingExecutorService executor = new BlockingExecutorService(1, 1);
final ServiceStatus serviceStatus = new ServiceStatusImpl(serviceConfig);
final LiveStreamMediator streamMediator = LiveStreamMediatorBuilder.newBuilder(
metricsService, mediatorConfig, serviceStatus)
Expand All @@ -192,7 +196,7 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException {
serviceStatus,
ackHandlerMock,
writerFactory,
executorMock,
executor,
archiverMock,
pathResolverMock,
persistenceStorageConfig);
Expand All @@ -201,13 +205,12 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException {
// Acting as a producer, notify the mediator of a new block
streamMediator.publish(blockItemUnparsed);

// Wait all the tasks to complete before the assertions to avoid flakiness
executor.waitTasksToComplete();

// Verify the counter was incremented
assertEquals(10, metricsService.get(LiveBlockItems).get());

// @todo(642) we need to employ the same technique here to inject a writer that will ensure
// the tasks are complete before we can verify the metrics for blocks persisted
// the test will pass without flaking if we do that
// assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get());
assertEquals(1, metricsService.get(BlocksPersisted).get());
}

@Test
Expand Down
Loading

0 comments on commit 14f1d62

Please sign in to comment.