@@ -247,6 +247,90 @@ public void testDuplicateBlockReceived() {
247
247
verify (publisher , never ()).publish (any ());
248
248
}
249
249
250
+ @ Test
251
+ @ DisplayName ("Test duplicate block items with null blockHash of ACK variation" )
252
+ public void testDuplicateBlockReceived_NullAckHashVariation () {
253
+
254
+ // given
255
+ when (serviceStatus .isRunning ()).thenReturn (true );
256
+ long latestAckedBlockNumber = 10L ;
257
+ BlockInfo latestAckedBlock = new BlockInfo (latestAckedBlockNumber );
258
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (latestAckedBlock );
259
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (latestAckedBlockNumber );
260
+
261
+ final List <BlockItemUnparsed > blockItems = generateBlockItemsUnparsedForWithBlockNumber (10 );
262
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
263
+ testClock ,
264
+ publisher ,
265
+ subscriptionHandler ,
266
+ helidonPublishPipeline ,
267
+ serviceStatus ,
268
+ consumerConfig ,
269
+ metricsService );
270
+
271
+ // when
272
+ producerBlockItemObserver .onNext (blockItems );
273
+
274
+ // then
275
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
276
+ .blockNumber (10L )
277
+ .blockAlreadyExists (true )
278
+ .build ();
279
+
280
+ final Acknowledgement acknowledgement =
281
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
282
+
283
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
284
+ .acknowledgement (acknowledgement )
285
+ .build ();
286
+
287
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
288
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
289
+ // verify that the duplicate block is not published
290
+ verify (publisher , never ()).publish (any ());
291
+ }
292
+
293
+ @ Test
294
+ @ DisplayName ("Test duplicate block items with null ACK variation" )
295
+ public void testDuplicateBlockReceived_NullAckVariation () {
296
+
297
+ // given
298
+ when (serviceStatus .isRunning ()).thenReturn (true );
299
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (null );
300
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (10L );
301
+
302
+ final List <BlockItemUnparsed > blockItems = generateBlockItemsUnparsedForWithBlockNumber (10 );
303
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
304
+ testClock ,
305
+ publisher ,
306
+ subscriptionHandler ,
307
+ helidonPublishPipeline ,
308
+ serviceStatus ,
309
+ consumerConfig ,
310
+ metricsService );
311
+
312
+ // when
313
+ producerBlockItemObserver .onNext (blockItems );
314
+
315
+ // then
316
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
317
+ .blockNumber (10L )
318
+ .blockAlreadyExists (true )
319
+ .build ();
320
+
321
+ final Acknowledgement acknowledgement =
322
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
323
+
324
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
325
+ .acknowledgement (acknowledgement )
326
+ .build ();
327
+
328
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
329
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
330
+ // verify that the duplicate block is not published
331
+ verify (publisher , never ()).publish (any ());
332
+ }
333
+
250
334
@ Test
251
335
@ DisplayName ("Test future (ahead of expected) block received" )
252
336
public void testFutureBlockReceived () {
0 commit comments