Skip to content

Commit

Permalink
chore: improvements for test repeatability (hiero-ledger#766)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kehayov <[email protected]>
  • Loading branch information
AlexKehayov committed Mar 5, 2025
1 parent ab846e5 commit ab3259d
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -135,7 +136,42 @@ public Pipeline<? super Bytes> open(
case subscribeBlockStream -> Pipelines
.<SubscribeStreamRequest, SubscribeStreamResponseUnparsed>serverStreaming()
.mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options))
.method(this::subscribeBlockStream)
.method((subscribeStreamRequest, helidonConsumerObserver) -> subscribeBlockStream(
subscribeStreamRequest, helidonConsumerObserver, Executors.newSingleThreadExecutor()))
.mapResponse(reply -> createSubscribeStreamResponse(reply, options))
.respondTo(replies)
.build();
};
} catch (Exception e) {
replies.onError(e);
return Pipelines.noop();
}
}

@NonNull
public Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Pipeline<? super Bytes> replies,
final @NonNull Executor executor) {

final var m = (BlockStreamMethod) method;
try {
return switch (m) {
case publishBlockStream -> {
notifier.unsubscribeAllExpired();
yield Pipelines.<List<BlockItemUnparsed>, PublishStreamResponse>bidiStreaming()
.mapRequest(bytes -> parsePublishStreamRequest(bytes, options))
.method(this::publishBlockStream)
.mapResponse(bytes -> createPublishStreamResponse(bytes, options))
.respondTo(replies)
.build();
}
case subscribeBlockStream -> Pipelines
.<SubscribeStreamRequest, SubscribeStreamResponseUnparsed>serverStreaming()
.mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options))
.method((subscribeStreamRequest, helidonConsumerObserver) ->
subscribeBlockStream(subscribeStreamRequest, helidonConsumerObserver, executor))
.mapResponse(reply -> createSubscribeStreamResponse(reply, options))
.respondTo(replies)
.build();
Expand Down Expand Up @@ -193,7 +229,8 @@ Pipeline<List<BlockItemUnparsed>> publishBlockStream(
*/
void subscribeBlockStream(
@NonNull final SubscribeStreamRequest subscribeStreamRequest,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver) {
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> helidonConsumerObserver,
@NonNull final Executor executor) {

LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method");

Expand Down Expand Up @@ -223,7 +260,7 @@ void subscribeBlockStream(
// stream (endBlockNumber is 0)
if (subscribeStreamRequest.endBlockNumber() == 0) {
final var liveStreamEventHandler = LiveStreamEventHandlerBuilder.build(
new ExecutorCompletionService<>(Executors.newSingleThreadExecutor()),
new ExecutorCompletionService<>(executor),
Clock.systemDefaultZone(),
streamMediator,
helidonConsumerObserver,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.mediator;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockStreamMediatorError;
import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY;
Expand Down Expand Up @@ -30,6 +31,7 @@
import com.hedera.block.server.persistence.storage.write.AsyncNoOpWriterFactory;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.service.ServiceStatusImpl;
import com.hedera.block.server.util.BlockingExecutorService;
import com.hedera.block.server.util.PersistTestUtils;
import com.hedera.block.server.util.TestConfigUtil;
import com.hedera.hapi.block.BlockItemSetUnparsed;
Expand Down Expand Up @@ -148,7 +150,9 @@ void testUnsubscribeEach() throws InterruptedException, IOException {
}

@Test
void testMediatorPersistenceWithoutSubscribers() throws IOException {
void testMediatorPersistenceWithoutSubscribers() throws IOException, InterruptedException {
// 1 block is expected to be processed, so the expected tasks param is set to 1
final BlockingExecutorService executor = new BlockingExecutorService(1, 1);
final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext();
final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext);
final LiveStreamMediator streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus)
Expand All @@ -167,21 +171,20 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException {
serviceStatus,
ackHandlerMock,
writerFactory,
executorMock,
executor,
archiverMock,
persistenceStorageConfig);
streamMediator.subscribe(handler);

// Acting as a producer, notify the mediator of a new block
streamMediator.publish(blockItemUnparsed);

// Wait all the tasks to complete before the assertions to avoid flakiness
executor.waitTasksToComplete();

// Verify the counter was incremented
assertEquals(10, blockNodeContext.metricsService().get(LiveBlockItems).get());

// @todo(642) we need to employ the same technique here to inject a writer that will ensure
// the tasks are complete before we can verify the metrics for blocks persisted
// the test will pass without flaking if we do that
// assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get());
assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get());
}

@Test
Expand Down
Loading

0 comments on commit ab3259d

Please sign in to comment.