diff --git a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java index af824f463..097b2c905 100644 --- a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java +++ b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java @@ -33,6 +33,7 @@ import java.time.Clock; import java.util.List; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -135,7 +136,42 @@ public Pipeline open( case subscribeBlockStream -> Pipelines .serverStreaming() .mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options)) - .method(this::subscribeBlockStream) + .method((subscribeStreamRequest, helidonConsumerObserver) -> subscribeBlockStream( + subscribeStreamRequest, helidonConsumerObserver, Executors.newSingleThreadExecutor())) + .mapResponse(reply -> createSubscribeStreamResponse(reply, options)) + .respondTo(replies) + .build(); + }; + } catch (Exception e) { + replies.onError(e); + return Pipelines.noop(); + } + } + + @NonNull + public Pipeline open( + final @NonNull Method method, + final @NonNull RequestOptions options, + final @NonNull Pipeline replies, + final @NonNull Executor executor) { + + final var m = (BlockStreamMethod) method; + try { + return switch (m) { + case publishBlockStream -> { + notifier.unsubscribeAllExpired(); + yield Pipelines., PublishStreamResponse>bidiStreaming() + .mapRequest(bytes -> parsePublishStreamRequest(bytes, options)) + .method(this::publishBlockStream) + .mapResponse(bytes -> createPublishStreamResponse(bytes, options)) + .respondTo(replies) + .build(); + } + case subscribeBlockStream -> Pipelines + .serverStreaming() + .mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options)) + .method((subscribeStreamRequest, helidonConsumerObserver) -> + subscribeBlockStream(subscribeStreamRequest, helidonConsumerObserver, executor)) .mapResponse(reply -> createSubscribeStreamResponse(reply, options)) .respondTo(replies) .build(); @@ -193,7 +229,8 @@ Pipeline> publishBlockStream( */ void subscribeBlockStream( @NonNull final SubscribeStreamRequest subscribeStreamRequest, - @NonNull final Pipeline helidonConsumerObserver) { + @NonNull final Pipeline helidonConsumerObserver, + @NonNull final Executor executor) { LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method"); @@ -223,7 +260,7 @@ void subscribeBlockStream( // stream (endBlockNumber is 0) if (subscribeStreamRequest.endBlockNumber() == 0) { final var liveStreamEventHandler = LiveStreamEventHandlerBuilder.build( - new ExecutorCompletionService<>(Executors.newSingleThreadExecutor()), + new ExecutorCompletionService<>(executor), Clock.systemDefaultZone(), streamMediator, helidonConsumerObserver, diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 35f143455..ad3d6eaad 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.hedera.block.server.mediator; +import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockStreamMediatorError; import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY; @@ -30,6 +31,7 @@ import com.hedera.block.server.persistence.storage.write.AsyncNoOpWriterFactory; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; +import com.hedera.block.server.util.BlockingExecutorService; import com.hedera.block.server.util.PersistTestUtils; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.hapi.block.BlockItemSetUnparsed; @@ -148,7 +150,9 @@ void testUnsubscribeEach() throws InterruptedException, IOException { } @Test - void testMediatorPersistenceWithoutSubscribers() throws IOException { + void testMediatorPersistenceWithoutSubscribers() throws IOException, InterruptedException { + // 1 block is expected to be processed, so the expected tasks param is set to 1 + final BlockingExecutorService executor = new BlockingExecutorService(1, 1); final BlockNodeContext blockNodeContext = TestConfigUtil.getTestBlockNodeContext(); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); final LiveStreamMediator streamMediator = LiveStreamMediatorBuilder.newBuilder(blockNodeContext, serviceStatus) @@ -167,7 +171,7 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException { serviceStatus, ackHandlerMock, writerFactory, - executorMock, + executor, archiverMock, persistenceStorageConfig); streamMediator.subscribe(handler); @@ -175,13 +179,12 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException { // Acting as a producer, notify the mediator of a new block streamMediator.publish(blockItemUnparsed); + // Wait all the tasks to complete before the assertions to avoid flakiness + executor.waitTasksToComplete(); + // Verify the counter was incremented assertEquals(10, blockNodeContext.metricsService().get(LiveBlockItems).get()); - - // @todo(642) we need to employ the same technique here to inject a writer that will ensure - // the tasks are complete before we can verify the metrics for blocks persisted - // the test will pass without flaking if we do that - // assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get()); + assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get()); } @Test diff --git a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java index 153ad3cb2..6d82555ab 100644 --- a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java @@ -6,16 +6,19 @@ import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY; import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_LIVE_ROOT_PATH_KEY; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; +import static com.hedera.block.server.util.TestUtils.onEventLatchCountdown; import static com.hedera.hapi.block.SubscribeStreamResponseCode.READ_STREAM_NOT_AVAILABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.hedera.block.server.ack.AckHandler; import com.hedera.block.server.ack.AckHandlerImpl; @@ -36,6 +39,7 @@ import com.hedera.block.server.persistence.storage.write.AsyncNoOpWriterFactory; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; +import com.hedera.block.server.util.BlockingExecutorService; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.block.server.verification.StreamVerificationHandlerImpl; import com.hedera.block.server.verification.VerificationConfig; @@ -66,13 +70,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; @@ -155,14 +159,31 @@ void setUp() throws IOException { assertThat(testConfigLiveRootPath).isEqualTo(testLiveRootPath); } - @Disabled("@todo(642) make test deterministic via correct executor injection") @Test @Timeout(value = JUNIT_TIMEOUT, unit = TimeUnit.MILLISECONDS) - void testPublishBlockStreamRegistrationAndExecution() throws IOException { + void testPublishBlockStreamRegistrationAndExecution() throws IOException, InterruptedException { final int numberOfBlocks = 1; - final ExecutorService executor = Executors.newFixedThreadPool(numberOfBlocks); + + final ExecutorService persistenceExecutor = Executors.newFixedThreadPool(numberOfBlocks); + final BlockingExecutorService subscriberExecutor1 = new BlockingExecutorService(10, 1); + final BlockingExecutorService subscriberExecutor2 = new BlockingExecutorService(10, 1); + final BlockingExecutorService subscriberExecutor3 = new BlockingExecutorService(10, 1); + + // The PublishStreamObserver logic is executed via lmax BatchEventProcessors internally, + // so BlockingExecutorService approach is not applicable. + CountDownLatch publishStreamObserversLatch = new CountDownLatch(3); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver1) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver2) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver3) + .onNext(any()); + final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = - buildBlockStreamService(blockReaderMock, executor); + buildBlockStreamService(blockReaderMock, persistenceExecutor); // Register 3 producers - Opening a pipeline is not enough to register a producer. // pipeline.onNext() must be invoked to register the producer at the Helidon PBJ layer. @@ -187,19 +208,22 @@ void testPublishBlockStreamRegistrationAndExecution() throws IOException { .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver1) + subscribeStreamObserver1, + subscriberExecutor1) .onNext(buildEmptySubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver2) + subscribeStreamObserver2, + subscriberExecutor2) .onNext(buildEmptySubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver3) + subscribeStreamObserver3, + subscriberExecutor3) .onNext(buildEmptySubscribeStreamRequest()); final List blockItems = generateBlockItemsUnparsed(numberOfBlocks); @@ -213,49 +237,62 @@ void testPublishBlockStreamRegistrationAndExecution() throws IOException { producerPipeline.onNext(PublishStreamRequestUnparsed.PROTOBUF.toBytes(publishStreamRequest)); } - // Verify all 10 BlockItems were sent to each of the 3 consumers - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver1, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver2, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver3, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onNext(any()); - verify(helidonPublishStreamObserver2, timeout(testTimeout).times(1)).onNext(any()); - verify(helidonPublishStreamObserver3, timeout(testTimeout).times(1)).onNext(any()); - // Close the stream as Helidon does helidonPublishStreamObserver1.onComplete(); + // Wait for subscribers and publishers to finish execution before assertions + subscriberExecutor1.waitTasksToComplete(); + subscriberExecutor2.waitTasksToComplete(); + subscriberExecutor3.waitTasksToComplete(); + publishStreamObserversLatch.await(); + + // Verify all 10 BlockItems were sent to each of the 3 consumers + verify(subscribeStreamObserver1, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver1, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver1, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver2, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver2, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver2, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver3, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver3, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver3, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(helidonPublishStreamObserver1, times(1)).onNext(any()); + verify(helidonPublishStreamObserver2, times(1)).onNext(any()); + verify(helidonPublishStreamObserver3, times(1)).onNext(any()); + // verify the onCompleted() method is invoked on the wrapped StreamObserver - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onComplete(); + verify(helidonPublishStreamObserver1, times(1)).onComplete(); } - @Disabled("@todo(642) make test deterministic via correct executor injection") @Test @Timeout(value = JUNIT_TIMEOUT, unit = TimeUnit.MILLISECONDS) - void testFullProducerConsumerHappyPath() throws IOException { + void testFullProducerConsumerHappyPath() throws IOException, InterruptedException { final int numberOfBlocks = 5; - final ExecutorService executor = Executors.newFixedThreadPool(numberOfBlocks); + final ExecutorService persistenceExecutor = Executors.newFixedThreadPool(numberOfBlocks); + final BlockingExecutorService subscriberExecutor1 = new BlockingExecutorService(10, 1); + final BlockingExecutorService subscriberExecutor2 = new BlockingExecutorService(10, 1); + final BlockingExecutorService subscriberExecutor3 = new BlockingExecutorService(10, 1); + + // The PublishStreamObserver logic is executed via lmax BatchEventProcessors internally, + // so BlockingExecutorService approach is not applicable. + CountDownLatch publishStreamObserversLatch = new CountDownLatch(3 * numberOfBlocks); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver1) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver2) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver3) + .onNext(any()); + // Use a real BlockWriter to test the full integration final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = - buildBlockStreamService(blockReaderMock, executor); + buildBlockStreamService(blockReaderMock, persistenceExecutor); // Register 3 producers - Opening a pipeline is not enough to register a producer. // pipeline.onNext() must be invoked to register the producer at the Helidon PBJ layer. @@ -280,19 +317,22 @@ void testFullProducerConsumerHappyPath() throws IOException { .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver1) + subscribeStreamObserver1, + subscriberExecutor1) .onNext(buildEmptySubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver2) + subscribeStreamObserver2, + subscriberExecutor2) .onNext(buildEmptySubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver3) + subscribeStreamObserver3, + subscriberExecutor3) .onNext(buildEmptySubscribeStreamRequest()); final List blockItems = generateBlockItemsUnparsed(numberOfBlocks); @@ -306,18 +346,20 @@ void testFullProducerConsumerHappyPath() throws IOException { producerPipeline.onNext(PublishStreamRequestUnparsed.PROTOBUF.toBytes(publishStreamRequest)); } + subscriberExecutor1.waitTasksToComplete(); + subscriberExecutor2.waitTasksToComplete(); + subscriberExecutor3.waitTasksToComplete(); + publishStreamObserversLatch.await(); + // Verify the subscribers received the data - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems, false); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems, false); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems, false); // Verify the producers received all the responses - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); - verify(helidonPublishStreamObserver2, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); - verify(helidonPublishStreamObserver3, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); + verify(helidonPublishStreamObserver1, times(numberOfBlocks)).onNext(any()); + verify(helidonPublishStreamObserver2, times(numberOfBlocks)).onNext(any()); + verify(helidonPublishStreamObserver3, times(numberOfBlocks)).onNext(any()); } @Test @@ -391,18 +433,18 @@ void testFullWithSubscribersAddedDynamically() throws IOException { } // Verify subscribers who were listening before the stream started - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems, true); // Verify subscribers added while the stream was in progress. // The Helidon-provided StreamObserver onNext() method will only // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems, true); } @Test @@ -539,18 +581,18 @@ void testSubAndUnsubWhileStreaming() throws InterruptedException, IOException { } // Verify subscribers who were listening before the stream started - verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3, blockItems, true); // Verify subscribers added while the stream was in progress. // The Helidon-provided StreamObserver onNext() method will only // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems, true); producerPipeline.onComplete(); } @@ -701,7 +743,8 @@ private static void verifySubscribeStreamResponse( int blockItemsToWait, int blockItemsToSkip, Pipeline pipeline, - List blockItems) { + List blockItems, + boolean verifyWithTimeout) { // Each block has 10 BlockItems. Verify all the BlockItems // in a given block per iteration. for (int block = 0; block < numberOfBlocks; block += 10) { @@ -714,9 +757,15 @@ private static void verifySubscribeStreamResponse( final Bytes bodySubStreamResponse = buildSubscribeStreamResponse(bodyBlockItem); final BlockItemUnparsed stateProofBlockItem = blockItems.get(block + 9); final Bytes stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); - verify(pipeline, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); - verify(pipeline, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); - verify(pipeline, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); + if (verifyWithTimeout) { + verify(pipeline, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); + verify(pipeline, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); + verify(pipeline, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); + } else { + verify(pipeline, times(1)).onNext(headerSubStreamResponse); + verify(pipeline, times(8)).onNext(bodySubStreamResponse); + verify(pipeline, times(1)).onNext(stateProofStreamResponse); + } } } @@ -739,8 +788,8 @@ private static Bytes buildEndOfStreamResponse() { private BlockVerificationSessionFactory getBlockVerificationSessionFactory() { final VerificationConfig config = blockNodeContext.configuration().getConfigData(VerificationConfig.class); final SignatureVerifierDummy signatureVerifier = mock(SignatureVerifierDummy.class); - when(signatureVerifier.verifySignature(any(), any())).thenReturn(true); final ExecutorService executorService = ForkJoinPool.commonPool(); + lenient().when(signatureVerifier.verifySignature(any(), any())).thenReturn(true); return new BlockVerificationSessionFactory( config, blockNodeContext.metricsService(), signatureVerifier, executorService); } @@ -756,7 +805,7 @@ private PbjBlockStreamServiceProxy buildBlockStreamService( final AckHandler blockManager = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover, blockNodeContext.metricsService()); final BlockVerificationSessionFactory blockVerificationSessionFactory = getBlockVerificationSessionFactory(); - final BlockVerificationService BlockVerificationService = new BlockVerificationServiceImpl( + final BlockVerificationService blockVerificationService = new BlockVerificationServiceImpl( blockNodeContext.metricsService(), blockVerificationSessionFactory, blockManager); final AsyncNoOpWriterFactory writerFactory = new AsyncNoOpWriterFactory(blockManager, blockNodeContext.metricsService()); @@ -771,7 +820,7 @@ private PbjBlockStreamServiceProxy buildBlockStreamService( archiverMock, persistenceStorageConfig); final StreamVerificationHandlerImpl streamVerificationHandler = new StreamVerificationHandlerImpl( - streamMediator, notifier, blockNodeContext.metricsService(), serviceStatus, BlockVerificationService); + streamMediator, notifier, blockNodeContext.metricsService(), serviceStatus, blockVerificationService); return new PbjBlockStreamServiceProxy( streamMediator, serviceStatus, diff --git a/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java b/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java new file mode 100644 index 000000000..35d36ec63 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.server.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class BlockingExecutorService extends ThreadPoolExecutor { + // Useful for testing async logic. Blocks the test thread when waitTasksToComplete() is called. + // Takes as param the expected number of tasks to finish before continuing the test thread. + // (Additional tasks, if any, are executed before releasing, the queue should be empty) + // Takes the pool size as a second parameter + private final int expectedTasks; + private int completedTasks = 0; + private final CountDownLatch countDownLatch; + + public BlockingExecutorService(int expectedTasks, int poolSize) { + super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.expectedTasks = expectedTasks; + this.countDownLatch = new CountDownLatch(1); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + completedTasks++; + if (getQueue().isEmpty() && completedTasks >= expectedTasks) { + countDownLatch.countDown(); + } + } + + public void waitTasksToComplete() throws InterruptedException { + countDownLatch.await(); + } +} diff --git a/server/src/test/java/com/hedera/block/server/util/TestUtils.java b/server/src/test/java/com/hedera/block/server/util/TestUtils.java index a8480edd8..2cfe2aae6 100644 --- a/server/src/test/java/com/hedera/block/server/util/TestUtils.java +++ b/server/src/test/java/com/hedera/block/server/util/TestUtils.java @@ -6,6 +6,8 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.mockito.stubbing.Answer; public final class TestUtils { private TestUtils() {} @@ -43,4 +45,14 @@ public static FileAttribute> getNoRead() { public static FileAttribute> getNoWrite() { return PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(NO_WRITE)); } + + public static Answer onEventLatchCountdown(CountDownLatch latch) { + return invocation -> { + if (latch.getCount() == 0) { + throw new IllegalStateException("Event calls exceeded"); + } + latch.countDown(); + return null; + }; + } }