From 342abb3df393e13972ed8ce80a8e18716cf0991d Mon Sep 17 00:00:00 2001 From: Alex Kehayov Date: Wed, 5 Mar 2025 10:50:59 +0200 Subject: [PATCH] chore: improvements for test repeatability (#766) Signed-off-by: Alex Kehayov --- .../pbj/PbjBlockStreamServiceProxy.java | 42 +++- .../mediator/LiveStreamMediatorImplTest.java | 17 +- .../PbjBlockStreamServiceIntegrationTest.java | 219 ++++++++++++------ .../server/util/BlockingExecutorService.java | 35 +++ .../hedera/block/server/util/TestUtils.java | 12 + 5 files changed, 241 insertions(+), 84 deletions(-) create mode 100644 server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java diff --git a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java index e2a61d41e..969aaa76c 100644 --- a/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java +++ b/server/src/main/java/com/hedera/block/server/pbj/PbjBlockStreamServiceProxy.java @@ -130,7 +130,37 @@ public Pipeline open( final @NonNull RequestOptions options, final @NonNull Pipeline replies) { - final var m = (BlockStreamMethod) method; + return openPipeline( + (BlockStreamMethod) method, + options, + replies, + openRangeHistoricStreamingExecutorService, + closedRangeHistoricStreamingExecutorService); + } + + @NonNull + public Pipeline open( + final @NonNull Method method, + final @NonNull RequestOptions options, + final @NonNull Pipeline replies, + final @NonNull ExecutorService openRangeHistoricStreamingExecutorService, + final @NonNull ExecutorService closedRangeHistoricStreamingExecutorService) { + + return openPipeline( + (BlockStreamMethod) method, + options, + replies, + openRangeHistoricStreamingExecutorService, + closedRangeHistoricStreamingExecutorService); + } + + private Pipeline openPipeline( + @NonNull BlockStreamMethod method, + @NonNull RequestOptions options, + @NonNull Pipeline replies, + @NonNull ExecutorService openRangeHistoricStreamingExecutorService, + @NonNull ExecutorService closedRangeHistoricStreamingExecutorService) { + final var m = method; try { return switch (m) { case publishBlockStream -> { @@ -145,7 +175,11 @@ public Pipeline open( case subscribeBlockStream -> Pipelines .serverStreaming() .mapRequest(bytes -> parseSubscribeStreamRequest(bytes, options)) - .method(this::subscribeBlockStream) + .method((subscribeStreamRequest, helidonConsumerObserver) -> subscribeBlockStream( + subscribeStreamRequest, + helidonConsumerObserver, + openRangeHistoricStreamingExecutorService, + closedRangeHistoricStreamingExecutorService)) .mapResponse(reply -> createSubscribeStreamResponse(reply, options)) .respondTo(replies) .build(); @@ -201,7 +235,9 @@ Pipeline> publishBlockStream( */ void subscribeBlockStream( @NonNull final SubscribeStreamRequest subscribeStreamRequest, - @NonNull final Pipeline helidonConsumerObserver) { + @NonNull final Pipeline helidonConsumerObserver, + @NonNull final ExecutorService openRangeHistoricStreamingExecutorService, + @NonNull final ExecutorService closedRangeHistoricStreamingExecutorService) { LOGGER.log(DEBUG, "Executing Server Streaming subscribeBlockStream gRPC method"); diff --git a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java index 05bba82d5..e96bf355a 100644 --- a/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java +++ b/server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.hedera.block.server.mediator; +import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.LiveBlockItems; import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Gauge.Consumers; import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_ARCHIVE_ROOT_PATH_KEY; @@ -32,6 +33,7 @@ import com.hedera.block.server.service.ServiceConfig; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; +import com.hedera.block.server.util.BlockingExecutorService; import com.hedera.block.server.util.PersistTestUtils; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.hapi.block.BlockItemSetUnparsed; @@ -174,7 +176,9 @@ void testUnsubscribeEach() throws InterruptedException { } @Test - void testMediatorPersistenceWithoutSubscribers() throws IOException { + void testMediatorPersistenceWithoutSubscribers() throws IOException, InterruptedException { + // 1 block is expected to be processed, so the expected tasks param is set to 1 + final BlockingExecutorService executor = new BlockingExecutorService(1, 1); final ServiceStatus serviceStatus = new ServiceStatusImpl(serviceConfig); final LiveStreamMediator streamMediator = LiveStreamMediatorBuilder.newBuilder( metricsService, mediatorConfig, serviceStatus) @@ -192,7 +196,7 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException { serviceStatus, ackHandlerMock, writerFactory, - executorMock, + executor, archiverMock, pathResolverMock, persistenceStorageConfig); @@ -201,13 +205,12 @@ void testMediatorPersistenceWithoutSubscribers() throws IOException { // Acting as a producer, notify the mediator of a new block streamMediator.publish(blockItemUnparsed); + // Wait all the tasks to complete before the assertions to avoid flakiness + executor.waitTasksToComplete(); + // Verify the counter was incremented assertEquals(10, metricsService.get(LiveBlockItems).get()); - - // @todo(642) we need to employ the same technique here to inject a writer that will ensure - // the tasks are complete before we can verify the metrics for blocks persisted - // the test will pass without flaking if we do that - // assertEquals(1, blockNodeContext.metricsService().get(BlocksPersisted).get()); + assertEquals(1, metricsService.get(BlocksPersisted).get()); } @Test diff --git a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java index 14cb92916..0f341878c 100644 --- a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java @@ -7,16 +7,19 @@ import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_LIVE_ROOT_PATH_KEY; import static com.hedera.block.server.util.PersistTestUtils.PERSISTENCE_STORAGE_UNVERIFIED_ROOT_PATH_KEY; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; +import static com.hedera.block.server.util.TestUtils.onEventLatchCountdown; import static com.hedera.hapi.block.SubscribeStreamResponseCode.READ_STREAM_NOT_AVAILABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.hedera.block.server.ack.AckHandler; import com.hedera.block.server.ack.AckHandlerImpl; @@ -35,15 +38,20 @@ import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; import com.hedera.block.server.persistence.storage.archive.LocalBlockArchiver; +import com.hedera.block.server.persistence.storage.compression.NoOpCompression; +import com.hedera.block.server.persistence.storage.path.BlockAsLocalFilePathResolver; import com.hedera.block.server.persistence.storage.path.BlockPathResolver; import com.hedera.block.server.persistence.storage.read.BlockReader; +import com.hedera.block.server.persistence.storage.remove.BlockAsLocalFileRemover; import com.hedera.block.server.persistence.storage.remove.BlockRemover; +import com.hedera.block.server.persistence.storage.write.AsyncBlockAsLocalFileWriterFactory; import com.hedera.block.server.persistence.storage.write.AsyncBlockWriterFactory; import com.hedera.block.server.persistence.storage.write.AsyncNoOpWriterFactory; import com.hedera.block.server.producer.ProducerConfig; import com.hedera.block.server.service.ServiceConfig; import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; +import com.hedera.block.server.util.BlockingExecutorService; import com.hedera.block.server.util.TestConfigUtil; import com.hedera.block.server.verification.StreamVerificationHandlerImpl; import com.hedera.block.server.verification.VerificationConfig; @@ -76,13 +84,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; @@ -191,14 +199,31 @@ void setUp() throws IOException { mediatorConfig = config.getConfigData(MediatorConfig.class); } - @Disabled("@todo(642) make test deterministic via correct executor injection") @Test @Timeout(value = JUNIT_TIMEOUT, unit = TimeUnit.MILLISECONDS) - void testPublishBlockStreamRegistrationAndExecution() throws IOException { + void testPublishBlockStreamRegistrationAndExecution() throws IOException, InterruptedException { final int numberOfBlocks = 1; - final ExecutorService executor = Executors.newFixedThreadPool(numberOfBlocks); + + final ExecutorService persistenceExecutor = Executors.newFixedThreadPool(numberOfBlocks); + final BlockingExecutorService subscriberExecutor1 = new BlockingExecutorService(1, 1); + final BlockingExecutorService subscriberExecutor2 = new BlockingExecutorService(1, 1); + final BlockingExecutorService subscriberExecutor3 = new BlockingExecutorService(1, 1); + + // The PublishStreamObserver logic is executed via lmax BatchEventProcessors internally, + // so BlockingExecutorService approach is not applicable. + CountDownLatch publishStreamObserversLatch = new CountDownLatch(3); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver1) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver2) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver3) + .onNext(any()); + final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = - buildBlockStreamService(blockReaderMock, executor); + buildBlockStreamService(blockReaderMock, persistenceExecutor, false, 0); // Register 3 producers - Opening a pipeline is not enough to register a producer. // pipeline.onNext() must be invoked to register the producer at the Helidon PBJ layer. @@ -223,19 +248,25 @@ void testPublishBlockStreamRegistrationAndExecution() throws IOException { .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver1) + subscribeStreamObserver1, + subscriberExecutor1, + subscriberExecutor1) .onNext(buildLiveStreamSubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver2) + subscribeStreamObserver2, + subscriberExecutor2, + subscriberExecutor2) .onNext(buildLiveStreamSubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver3) + subscribeStreamObserver3, + subscriberExecutor3, + subscriberExecutor3) .onNext(buildLiveStreamSubscribeStreamRequest()); final List blockItems = generateBlockItemsUnparsed(numberOfBlocks); @@ -249,49 +280,62 @@ void testPublishBlockStreamRegistrationAndExecution() throws IOException { producerPipeline.onNext(PublishStreamRequestUnparsed.PROTOBUF.toBytes(publishStreamRequest)); } - // Verify all 10 BlockItems were sent to each of the 3 consumers - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver1, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver1, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver2, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver2, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.getFirst())); - verify(subscribeStreamObserver3, timeout(testTimeout).times(8)) - .onNext(buildSubscribeStreamResponse(blockItems.get(1))); - verify(subscribeStreamObserver3, timeout(testTimeout).times(1)) - .onNext(buildSubscribeStreamResponse(blockItems.get(9))); - - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onNext(any()); - verify(helidonPublishStreamObserver2, timeout(testTimeout).times(1)).onNext(any()); - verify(helidonPublishStreamObserver3, timeout(testTimeout).times(1)).onNext(any()); - // Close the stream as Helidon does helidonPublishStreamObserver1.onComplete(); + // Wait for subscribers and publishers to finish execution before assertions + subscriberExecutor1.waitTasksToComplete(); + subscriberExecutor2.waitTasksToComplete(); + subscriberExecutor3.waitTasksToComplete(); + publishStreamObserversLatch.await(); + + // Verify all 10 BlockItems were sent to each of the 3 consumers + verify(subscribeStreamObserver1, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver1, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver1, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver2, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver2, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver2, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(subscribeStreamObserver3, times(1)).onNext(buildSubscribeStreamResponse(blockItems.getFirst())); + verify(subscribeStreamObserver3, times(8)).onNext(buildSubscribeStreamResponse(blockItems.get(1))); + verify(subscribeStreamObserver3, times(1)).onNext(buildSubscribeStreamResponse(blockItems.get(9))); + + verify(helidonPublishStreamObserver1, times(1)).onNext(any()); + verify(helidonPublishStreamObserver2, times(1)).onNext(any()); + verify(helidonPublishStreamObserver3, times(1)).onNext(any()); + // verify the onCompleted() method is invoked on the wrapped StreamObserver - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(1)).onComplete(); + verify(helidonPublishStreamObserver1, times(1)).onComplete(); } - @Disabled("@todo(642) make test deterministic via correct executor injection") @Test @Timeout(value = JUNIT_TIMEOUT, unit = TimeUnit.MILLISECONDS) - void testFullProducerConsumerHappyPath() throws IOException { + void testFullProducerConsumerHappyPath() throws IOException, InterruptedException { final int numberOfBlocks = 5; - final ExecutorService executor = Executors.newFixedThreadPool(numberOfBlocks); + final ExecutorService persistenceExecutor = Executors.newFixedThreadPool(numberOfBlocks); + final BlockingExecutorService subscriberExecutor1 = new BlockingExecutorService(1, 1); + final BlockingExecutorService subscriberExecutor2 = new BlockingExecutorService(1, 1); + final BlockingExecutorService subscriberExecutor3 = new BlockingExecutorService(1, 1); + + // The PublishStreamObserver logic is executed via lmax BatchEventProcessors internally, + // so BlockingExecutorService approach is not applicable. + CountDownLatch publishStreamObserversLatch = new CountDownLatch(3 * numberOfBlocks); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver1) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver2) + .onNext(any()); + doAnswer(onEventLatchCountdown(publishStreamObserversLatch)) + .when(helidonPublishStreamObserver3) + .onNext(any()); + // Use a real BlockWriter to test the full integration final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = - buildBlockStreamService(blockReaderMock, executor); + buildBlockStreamService(blockReaderMock, persistenceExecutor, false, 0); // Register 3 producers - Opening a pipeline is not enough to register a producer. // pipeline.onNext() must be invoked to register the producer at the Helidon PBJ layer. @@ -316,19 +360,25 @@ void testFullProducerConsumerHappyPath() throws IOException { .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver1) + subscribeStreamObserver1, + subscriberExecutor1, + subscriberExecutor1) .onNext(buildLiveStreamSubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver2) + subscribeStreamObserver2, + subscriberExecutor2, + subscriberExecutor2) .onNext(buildLiveStreamSubscribeStreamRequest()); pbjBlockStreamServiceProxy .open( PbjBlockStreamService.BlockStreamMethod.subscribeBlockStream, optionsMock, - subscribeStreamObserver3) + subscribeStreamObserver3, + subscriberExecutor3, + subscriberExecutor3) .onNext(buildLiveStreamSubscribeStreamRequest()); final List blockItems = generateBlockItemsUnparsed(numberOfBlocks); @@ -342,18 +392,20 @@ void testFullProducerConsumerHappyPath() throws IOException { producerPipeline.onNext(PublishStreamRequestUnparsed.PROTOBUF.toBytes(publishStreamRequest)); } + subscriberExecutor1.waitTasksToComplete(); + subscriberExecutor2.waitTasksToComplete(); + subscriberExecutor3.waitTasksToComplete(); + publishStreamObserversLatch.await(); + // Verify the subscribers received the data - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems, false); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems, false); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems, false); // Verify the producers received all the responses - verify(helidonPublishStreamObserver1, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); - verify(helidonPublishStreamObserver2, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); - verify(helidonPublishStreamObserver3, timeout(testTimeout).times(numberOfBlocks)) - .onNext(any()); + verify(helidonPublishStreamObserver1, times(numberOfBlocks)).onNext(any()); + verify(helidonPublishStreamObserver2, times(numberOfBlocks)).onNext(any()); + verify(helidonPublishStreamObserver3, times(numberOfBlocks)).onNext(any()); } @Test @@ -363,7 +415,7 @@ void testFullWithSubscribersAddedDynamically() throws IOException, InterruptedEx final ExecutorService executor = Executors.newFixedThreadPool(1); final PbjBlockStreamServiceProxy pbjBlockStreamServiceProxy = - buildBlockStreamService(blockReaderMock, executor); + buildBlockStreamService(blockReaderMock, executor, true, 1); // Register a producer final Pipeline producerPipeline = pbjBlockStreamServiceProxy.open( @@ -436,18 +488,18 @@ void testFullWithSubscribersAddedDynamically() throws IOException, InterruptedEx } // Verify subscribers who were listening before the stream started - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver1, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver2, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, numberOfBlocks, subscribeStreamObserver3, blockItems, true); // Verify subscribers added while the stream was in progress. // The Helidon-provided StreamObserver onNext() method will only // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems, true); } @Test @@ -585,18 +637,18 @@ void testSubAndUnsubWhileStreaming() throws InterruptedException, IOException { } // Verify subscribers who were listening before the stream started - verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 0, 10, subscribeStreamObserver1, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, 60, subscribeStreamObserver2, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 0, 70, subscribeStreamObserver3, blockItems, true); // Verify subscribers added while the stream was in progress. // The Helidon-provided StreamObserver onNext() method will only // be called once a Header BlockItem is reached. So, pass in // the number of BlockItems to wait to verify that the method // was called. - verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems); - verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems); + verifySubscribeStreamResponse(numberOfBlocks, 59, numberOfBlocks, subscribeStreamObserver4, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 79, numberOfBlocks, subscribeStreamObserver5, blockItems, true); + verifySubscribeStreamResponse(numberOfBlocks, 89, numberOfBlocks, subscribeStreamObserver6, blockItems, true); producerPipeline.onComplete(); } @@ -751,7 +803,8 @@ private static void verifySubscribeStreamResponse( int blockItemsToWait, int blockItemsToSkip, Pipeline pipeline, - List blockItems) { + List blockItems, + boolean verifyWithTimeout) { // Each block has 10 BlockItems. Verify all the BlockItems // in a given block per iteration. for (int block = 0; block < numberOfBlocks; block += 10) { @@ -764,9 +817,15 @@ private static void verifySubscribeStreamResponse( final Bytes bodySubStreamResponse = buildSubscribeStreamResponse(bodyBlockItem); final BlockItemUnparsed stateProofBlockItem = blockItems.get(block + 9); final Bytes stateProofStreamResponse = buildSubscribeStreamResponse(stateProofBlockItem); - verify(pipeline, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); - verify(pipeline, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); - verify(pipeline, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); + if (verifyWithTimeout) { + verify(pipeline, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse); + verify(pipeline, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse); + verify(pipeline, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse); + } else { + verify(pipeline, times(1)).onNext(headerSubStreamResponse); + verify(pipeline, times(8)).onNext(bodySubStreamResponse); + verify(pipeline, times(1)).onNext(stateProofStreamResponse); + } } } @@ -788,18 +847,21 @@ private static Bytes buildEndOfStreamResponse() { private BlockVerificationSessionFactory getBlockVerificationSessionFactory() { final SignatureVerifierDummy signatureVerifier = mock(SignatureVerifierDummy.class); - when(signatureVerifier.verifySignature(any(), any())).thenReturn(true); final ExecutorService executorService = ForkJoinPool.commonPool(); + lenient().when(signatureVerifier.verifySignature(any(), any())).thenReturn(true); return new BlockVerificationSessionFactory( verificationConfig, metricsService, signatureVerifier, executorService); } private PbjBlockStreamServiceProxy buildBlockStreamService( - final BlockReader blockReader, final ExecutorService persistenceExecutor) + final BlockReader blockReader, + final ExecutorService persistenceExecutor, + final boolean mockPersistence, + final long lastAckedBlock) throws IOException { final BlockRemover blockRemover = mock(BlockRemover.class); final ServiceStatus serviceStatus = new ServiceStatusImpl(serviceConfig); - serviceStatus.setLatestAckedBlock(new BlockInfo(1L)); + serviceStatus.setLatestAckedBlock(new BlockInfo(lastAckedBlock)); final LiveStreamMediator streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), new ConcurrentHashMap<>(32), serviceStatus); final Notifier notifier = @@ -809,7 +871,16 @@ private PbjBlockStreamServiceProxy buildBlockStreamService( final BlockVerificationSessionFactory blockVerificationSessionFactory = getBlockVerificationSessionFactory(); final BlockVerificationService BlockVerificationService = new BlockVerificationServiceImpl(metricsService, blockVerificationSessionFactory, blockManager); - final AsyncNoOpWriterFactory writerFactory = new AsyncNoOpWriterFactory(blockManager, metricsService); + final BlockAsLocalFilePathResolver pathResolver = new BlockAsLocalFilePathResolver(persistenceStorageConfig); + final AsyncBlockWriterFactory writerFactory; + if (mockPersistence) { + writerFactory = new AsyncNoOpWriterFactory(blockManager, metricsService); + } else { + final BlockAsLocalFileRemover blockRemoverReal = new BlockAsLocalFileRemover(pathResolver); + writerFactory = new AsyncBlockAsLocalFileWriterFactory( + pathResolver, blockRemoverReal, NoOpCompression.newInstance(), blockManager, metricsService); + } + final StreamPersistenceHandlerImpl blockNodeEventHandler = new StreamPersistenceHandlerImpl( streamMediator, notifier, @@ -819,7 +890,7 @@ private PbjBlockStreamServiceProxy buildBlockStreamService( writerFactory, persistenceExecutor, archiverMock, - pathResolverMock, + mockPersistence ? pathResolverMock : pathResolver, persistenceStorageConfig); final StreamVerificationHandlerImpl streamVerificationHandler = new StreamVerificationHandlerImpl( streamMediator, notifier, metricsService, serviceStatus, BlockVerificationService); diff --git a/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java b/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java new file mode 100644 index 000000000..35d36ec63 --- /dev/null +++ b/server/src/test/java/com/hedera/block/server/util/BlockingExecutorService.java @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.server.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class BlockingExecutorService extends ThreadPoolExecutor { + // Useful for testing async logic. Blocks the test thread when waitTasksToComplete() is called. + // Takes as param the expected number of tasks to finish before continuing the test thread. + // (Additional tasks, if any, are executed before releasing, the queue should be empty) + // Takes the pool size as a second parameter + private final int expectedTasks; + private int completedTasks = 0; + private final CountDownLatch countDownLatch; + + public BlockingExecutorService(int expectedTasks, int poolSize) { + super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.expectedTasks = expectedTasks; + this.countDownLatch = new CountDownLatch(1); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + completedTasks++; + if (getQueue().isEmpty() && completedTasks >= expectedTasks) { + countDownLatch.countDown(); + } + } + + public void waitTasksToComplete() throws InterruptedException { + countDownLatch.await(); + } +} diff --git a/server/src/test/java/com/hedera/block/server/util/TestUtils.java b/server/src/test/java/com/hedera/block/server/util/TestUtils.java index a8480edd8..2cfe2aae6 100644 --- a/server/src/test/java/com/hedera/block/server/util/TestUtils.java +++ b/server/src/test/java/com/hedera/block/server/util/TestUtils.java @@ -6,6 +6,8 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.mockito.stubbing.Answer; public final class TestUtils { private TestUtils() {} @@ -43,4 +45,14 @@ public static FileAttribute> getNoRead() { public static FileAttribute> getNoWrite() { return PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(NO_WRITE)); } + + public static Answer onEventLatchCountdown(CountDownLatch latch) { + return invocation -> { + if (latch.getCount() == 0) { + throw new IllegalStateException("Event calls exceeded"); + } + latch.countDown(); + return null; + }; + } }