@@ -331,6 +331,52 @@ public void testDuplicateBlockReceived_NullAckVariation() {
331
331
verify (publisher , never ()).publish (any ());
332
332
}
333
333
334
+ @ Test
335
+ @ DisplayName ("Test duplicate block items with different ACK number variation" )
336
+ public void testDuplicateBlockReceived_DiffAckNumberVariation () {
337
+
338
+ // given
339
+ when (serviceStatus .isRunning ()).thenReturn (true );
340
+ long latestAckedBlockNumber = 10L ;
341
+ long latestReceivedBlockNumber = 11L ;
342
+ Bytes fakeHash = Bytes .wrap ("fake_hash" );
343
+ BlockInfo latestAckedBlock = new BlockInfo (latestAckedBlockNumber );
344
+ latestAckedBlock .setBlockHash (fakeHash );
345
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (latestAckedBlock );
346
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (latestReceivedBlockNumber );
347
+
348
+ final List <BlockItemUnparsed > blockItems = generateBlockItemsUnparsedForWithBlockNumber (latestReceivedBlockNumber );
349
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
350
+ testClock ,
351
+ publisher ,
352
+ subscriptionHandler ,
353
+ helidonPublishPipeline ,
354
+ serviceStatus ,
355
+ consumerConfig ,
356
+ metricsService );
357
+
358
+ // when
359
+ producerBlockItemObserver .onNext (blockItems );
360
+
361
+ // then
362
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
363
+ .blockNumber (latestReceivedBlockNumber )
364
+ .blockAlreadyExists (true )
365
+ .build ();
366
+
367
+ final Acknowledgement acknowledgement =
368
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
369
+
370
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
371
+ .acknowledgement (acknowledgement )
372
+ .build ();
373
+
374
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
375
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
376
+ // verify that the duplicate block is not published
377
+ verify (publisher , never ()).publish (any ());
378
+ }
379
+
334
380
@ Test
335
381
@ DisplayName ("Test future (ahead of expected) block received" )
336
382
public void testFutureBlockReceived () {
0 commit comments