@@ -247,6 +247,137 @@ public void testDuplicateBlockReceived() {
247
247
verify (publisher , never ()).publish (any ());
248
248
}
249
249
250
+ @ Test
251
+ @ DisplayName ("Test duplicate block items with null blockHash of ACK variation" )
252
+ public void testDuplicateBlockReceived_NullAckHashVariation () {
253
+
254
+ // given
255
+ when (serviceStatus .isRunning ()).thenReturn (true );
256
+ long latestAckedBlockNumber = 10L ;
257
+ BlockInfo latestAckedBlock = new BlockInfo (latestAckedBlockNumber );
258
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (latestAckedBlock );
259
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (latestAckedBlockNumber );
260
+
261
+ final List <BlockItemUnparsed > blockItems = generateBlockItemsUnparsedForWithBlockNumber (10 );
262
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
263
+ testClock ,
264
+ publisher ,
265
+ subscriptionHandler ,
266
+ helidonPublishPipeline ,
267
+ serviceStatus ,
268
+ consumerConfig ,
269
+ metricsService );
270
+
271
+ // when
272
+ producerBlockItemObserver .onNext (blockItems );
273
+
274
+ // then
275
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
276
+ .blockNumber (10L )
277
+ .blockAlreadyExists (true )
278
+ .build ();
279
+
280
+ final Acknowledgement acknowledgement =
281
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
282
+
283
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
284
+ .acknowledgement (acknowledgement )
285
+ .build ();
286
+
287
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
288
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
289
+ // verify that the duplicate block is not published
290
+ verify (publisher , never ()).publish (any ());
291
+ }
292
+
293
+ @ Test
294
+ @ DisplayName ("Test duplicate block items with null ACK variation" )
295
+ public void testDuplicateBlockReceived_NullAckVariation () {
296
+
297
+ // given
298
+ when (serviceStatus .isRunning ()).thenReturn (true );
299
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (null );
300
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (10L );
301
+
302
+ final List <BlockItemUnparsed > blockItems = generateBlockItemsUnparsedForWithBlockNumber (10 );
303
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
304
+ testClock ,
305
+ publisher ,
306
+ subscriptionHandler ,
307
+ helidonPublishPipeline ,
308
+ serviceStatus ,
309
+ consumerConfig ,
310
+ metricsService );
311
+
312
+ // when
313
+ producerBlockItemObserver .onNext (blockItems );
314
+
315
+ // then
316
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
317
+ .blockNumber (10L )
318
+ .blockAlreadyExists (true )
319
+ .build ();
320
+
321
+ final Acknowledgement acknowledgement =
322
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
323
+
324
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
325
+ .acknowledgement (acknowledgement )
326
+ .build ();
327
+
328
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
329
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
330
+ // verify that the duplicate block is not published
331
+ verify (publisher , never ()).publish (any ());
332
+ }
333
+
334
+ @ Test
335
+ @ DisplayName ("Test duplicate block items with different ACK number variation" )
336
+ public void testDuplicateBlockReceived_DiffAckNumberVariation () {
337
+
338
+ // given
339
+ when (serviceStatus .isRunning ()).thenReturn (true );
340
+ long latestAckedBlockNumber = 10L ;
341
+ long latestReceivedBlockNumber = 11L ;
342
+ Bytes fakeHash = Bytes .wrap ("fake_hash" );
343
+ BlockInfo latestAckedBlock = new BlockInfo (latestAckedBlockNumber );
344
+ latestAckedBlock .setBlockHash (fakeHash );
345
+ when (serviceStatus .getLatestAckedBlock ()).thenReturn (latestAckedBlock );
346
+ when (serviceStatus .getLatestReceivedBlockNumber ()).thenReturn (latestReceivedBlockNumber );
347
+
348
+ final List <BlockItemUnparsed > blockItems =
349
+ generateBlockItemsUnparsedForWithBlockNumber (latestReceivedBlockNumber );
350
+ final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver (
351
+ testClock ,
352
+ publisher ,
353
+ subscriptionHandler ,
354
+ helidonPublishPipeline ,
355
+ serviceStatus ,
356
+ consumerConfig ,
357
+ metricsService );
358
+
359
+ // when
360
+ producerBlockItemObserver .onNext (blockItems );
361
+
362
+ // then
363
+ final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement .newBuilder ()
364
+ .blockNumber (latestReceivedBlockNumber )
365
+ .blockAlreadyExists (true )
366
+ .build ();
367
+
368
+ final Acknowledgement acknowledgement =
369
+ Acknowledgement .newBuilder ().blockAck (blockAcknowledgement ).build ();
370
+
371
+ final PublishStreamResponse publishStreamResponse = PublishStreamResponse .newBuilder ()
372
+ .acknowledgement (acknowledgement )
373
+ .build ();
374
+
375
+ // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse
376
+ verify (helidonPublishPipeline , timeout (testTimeout ).times (1 )).onNext (publishStreamResponse );
377
+ // verify that the duplicate block is not published
378
+ verify (publisher , never ()).publish (any ());
379
+ }
380
+
250
381
@ Test
251
382
@ DisplayName ("Test future (ahead of expected) block received" )
252
383
public void testFutureBlockReceived () {
0 commit comments