28
28
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
29
29
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
30
30
*/
31
- public class LiveStreamObserverImpl implements LiveStreamObserver <BlockStreamServiceGrpcProto .BlockResponse > {
31
+ public class ConsumerBlockStreamObserverImpl implements ConsumerBlockStreamObserver <BlockStreamServiceGrpcProto .BlockResponse > {
32
32
33
33
private final System .Logger LOGGER = System .getLogger (getClass ().getName ());
34
34
@@ -37,14 +37,14 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
37
37
38
38
private final long timeoutThresholdMillis ;
39
39
private final AtomicLong consumerLivenessMillis ;
40
- private final AtomicLong counter ;
40
+ private long counter ;
41
41
42
42
/**
43
43
* Constructor for the LiveStreamObserverImpl class.
44
44
*
45
45
* @param responseStreamObserver the response stream observer
46
46
*/
47
- public LiveStreamObserverImpl (
47
+ public ConsumerBlockStreamObserverImpl (
48
48
final long timeoutThresholdMillis ,
49
49
final InstantSource consumerLivenessClock ,
50
50
final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block > blockPersistenceHandler ,
@@ -55,8 +55,7 @@ public LiveStreamObserverImpl(
55
55
this .responseStreamObserver = responseStreamObserver ;
56
56
57
57
this .consumerLivenessMillis = new AtomicLong (consumerLivenessClock .millis ());
58
- // this.counter = new AtomicLong(blockPersistenceHandler.getLastBlockId());
59
- this .counter = new AtomicLong (1 );
58
+ this .counter = blockPersistenceHandler .getLastBlockId ();
60
59
}
61
60
62
61
/**
@@ -77,11 +76,13 @@ public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse
77
76
78
77
@ Override
79
78
public void emitBlocks () {
80
- final long latestBlockId = counter .get ();
81
- for (BlockStreamServiceGrpcProto .Block block : blockPersistenceHandler .readRange (latestBlockId , latestBlockId + 1 )) {
82
- LOGGER .log (System .Logger .Level .INFO , "Thread: {0}-{1}, Emitting block: {2}" , Thread .currentThread ().threadId (), Thread .currentThread ().getName (), block .getId ());
83
- responseStreamObserver .onNext (block );
84
- counter .incrementAndGet ();
79
+ long latestBlockId = blockPersistenceHandler .getLastBlockId ();
80
+ if (counter <= latestBlockId ) {
81
+ for (BlockStreamServiceGrpcProto .Block block : blockPersistenceHandler .readRange (counter , latestBlockId )) {
82
+ LOGGER .log (System .Logger .Level .INFO , "Thread: {0}-{1}, Emitting block: {2}" , Thread .currentThread ().threadId (), Thread .currentThread ().getName (), block .getId ());
83
+ responseStreamObserver .onNext (block );
84
+ counter += 1 ;
85
+ }
85
86
}
86
87
}
87
88
@@ -108,7 +109,7 @@ public void onCompleted() {
108
109
private boolean isThresholdExceeded () {
109
110
final long currentTimeMillis = Clock .systemDefaultZone ().millis ();
110
111
final long elapsedMillis = currentTimeMillis - consumerLivenessMillis .get ();
111
- if (elapsedMillis > timeoutThresholdMillis ) {
112
+ if (elapsedMillis > timeoutThresholdMillis ) {
112
113
LOGGER .log (System .Logger .Level .INFO , "Elapsed milliseconds: " + elapsedMillis + ", timeout threshold: " + timeoutThresholdMillis );
113
114
return true ;
114
115
}
0 commit comments