17
17
package com .hedera .block .server .consumer ;
18
18
19
19
import com .hedera .block .protos .BlockStreamServiceGrpcProto ;
20
- import com .hedera .block .server .mediator . StreamMediator ;
20
+ import com .hedera .block .server .persistence . BlockPersistenceHandler ;
21
21
import io .grpc .stub .StreamObserver ;
22
22
23
23
import java .time .Clock ;
24
24
import java .time .InstantSource ;
25
+ import java .util .concurrent .atomic .AtomicLong ;
25
26
26
27
/**
27
28
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
28
29
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
29
30
*/
30
- public class LiveStreamObserverImpl implements LiveStreamObserver <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto . BlockResponse > {
31
+ public class LiveStreamObserverImpl implements LiveStreamObserver <BlockStreamServiceGrpcProto .BlockResponse > {
31
32
32
33
private final System .Logger LOGGER = System .getLogger (getClass ().getName ());
33
34
34
- private final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto . BlockResponse > mediator ;
35
+ private final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block > blockPersistenceHandler ;
35
36
private final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ;
36
37
37
38
private final long timeoutThresholdMillis ;
38
-
39
- private final InstantSource producerLivenessClock ;
40
- private long producerLivenessMillis ;
41
-
42
- private final InstantSource consumerLivenessClock ;
43
- private long consumerLivenessMillis ;
39
+ private final AtomicLong consumerLivenessMillis ;
40
+ private final AtomicLong counter ;
44
41
45
42
/**
46
43
* Constructor for the LiveStreamObserverImpl class.
47
44
*
48
- * @param mediator the mediator
49
45
* @param responseStreamObserver the response stream observer
50
46
*/
51
47
public LiveStreamObserverImpl (
52
48
final long timeoutThresholdMillis ,
53
- final InstantSource producerLivenessClock ,
54
49
final InstantSource consumerLivenessClock ,
55
- final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto . BlockResponse > mediator ,
50
+ final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block > blockPersistenceHandler ,
56
51
final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
57
52
58
53
this .timeoutThresholdMillis = timeoutThresholdMillis ;
59
- this .producerLivenessClock = producerLivenessClock ;
60
- this .consumerLivenessClock = consumerLivenessClock ;
61
- this .mediator = mediator ;
54
+ this .blockPersistenceHandler = blockPersistenceHandler ;
62
55
this .responseStreamObserver = responseStreamObserver ;
63
56
64
- this .producerLivenessMillis = producerLivenessClock .millis ();
65
- this .consumerLivenessMillis = consumerLivenessClock .millis ();
66
- }
67
-
68
- /**
69
- * Pass the block to the observer provided by Helidon
70
- *
71
- * @param block the block to be passed to the observer
72
- */
73
- @ Override
74
- public void notify (final BlockStreamServiceGrpcProto .Block block ) {
75
-
76
- // Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
77
- if (consumerLivenessClock .millis () - consumerLivenessMillis > timeoutThresholdMillis ) {
78
- if (mediator .isSubscribed (this )) {
79
- LOGGER .log (System .Logger .Level .DEBUG , "Consumer timeout threshold exceeded. Unsubscribing observer." );
80
- mediator .unsubscribe (this );
81
- }
82
- } else {
83
- // Refresh the producer liveness and pass the block to the observer.
84
- producerLivenessMillis = producerLivenessClock .millis ();
85
- responseStreamObserver .onNext (block );
86
- }
57
+ this .consumerLivenessMillis = new AtomicLong (consumerLivenessClock .millis ());
58
+ // this.counter = new AtomicLong(blockPersistenceHandler.getLastBlockId());
59
+ this .counter = new AtomicLong (1 );
87
60
}
88
61
89
62
/**
@@ -94,12 +67,21 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
94
67
@ Override
95
68
public void onNext (final BlockStreamServiceGrpcProto .BlockResponse blockResponse ) {
96
69
97
- if (producerLivenessClock .millis () - producerLivenessMillis > timeoutThresholdMillis ) {
98
- LOGGER .log (System .Logger .Level .DEBUG , "Producer timeout threshold exceeded. Unsubscribing observer." );
99
- mediator .unsubscribe (this );
100
- } else {
101
- LOGGER .log (System .Logger .Level .DEBUG , "Received response block " + blockResponse );
102
- consumerLivenessMillis = consumerLivenessClock .millis ();
70
+ // update the consumer liveness clock
71
+ consumerLivenessMillis .set (Clock .systemDefaultZone ().millis ());
72
+
73
+ if (!isThresholdExceeded ()) {
74
+ emitBlocks ();
75
+ }
76
+ }
77
+
78
+ @ Override
79
+ public void emitBlocks () {
80
+ final long latestBlockId = counter .get ();
81
+ for (BlockStreamServiceGrpcProto .Block block : blockPersistenceHandler .readRange (latestBlockId , latestBlockId + 1 )) {
82
+ LOGGER .log (System .Logger .Level .INFO , "Thread: {0}-{1}, Emitting block: {2}" , Thread .currentThread ().threadId (), Thread .currentThread ().getName (), block .getId ());
83
+ responseStreamObserver .onNext (block );
84
+ counter .incrementAndGet ();
103
85
}
104
86
}
105
87
@@ -112,7 +94,6 @@ public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse
112
94
@ Override
113
95
public void onError (final Throwable t ) {
114
96
LOGGER .log (System .Logger .Level .ERROR , "Unexpected consumer stream communication failure: %s" .formatted (t ), t );
115
- mediator .unsubscribe (this );
116
97
}
117
98
118
99
/**
@@ -121,8 +102,17 @@ public void onError(final Throwable t) {
121
102
*/
122
103
@ Override
123
104
public void onCompleted () {
124
- LOGGER .log (System .Logger .Level .DEBUG , "gRPC connection completed. Unsubscribing observer." );
125
- mediator .unsubscribe (this );
126
- LOGGER .log (System .Logger .Level .DEBUG , "Unsubscribed observer." );
105
+ LOGGER .log (System .Logger .Level .INFO , "gRPC connection completed." );
106
+ }
107
+
108
+ private boolean isThresholdExceeded () {
109
+ final long currentTimeMillis = Clock .systemDefaultZone ().millis ();
110
+ final long elapsedMillis = currentTimeMillis - consumerLivenessMillis .get ();
111
+ if (elapsedMillis > timeoutThresholdMillis ) {
112
+ LOGGER .log (System .Logger .Level .INFO , "Elapsed milliseconds: " + elapsedMillis + ", timeout threshold: " + timeoutThresholdMillis );
113
+ return true ;
114
+ }
115
+
116
+ return false ;
127
117
}
128
118
}
0 commit comments