@@ -46,8 +46,8 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
46
46
*
47
47
*/
48
48
public LiveStreamObserverImpl (final long timeoutThresholdMillis ,
49
- StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > mediator ,
50
- StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
49
+ final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > mediator ,
50
+ final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
51
51
52
52
this .mediator = mediator ;
53
53
this .responseStreamObserver = responseStreamObserver ;
@@ -65,14 +65,14 @@ public LiveStreamObserverImpl(final long timeoutThresholdMillis,
65
65
@ Override
66
66
public void notify (final BlockStreamServiceGrpcProto .Block block ) {
67
67
68
- if (System .currentTimeMillis () - this . consumerLivenessMillis > timeoutThresholdMillis ) {
68
+ if (System .currentTimeMillis () - consumerLivenessMillis > timeoutThresholdMillis ) {
69
69
if (mediator .isSubscribed (this )) {
70
70
LOGGER .log (System .Logger .Level .DEBUG , "Consumer timeout threshold exceeded. Unsubscribing observer." );
71
71
mediator .unsubscribe (this );
72
72
}
73
73
} else {
74
- this . producerLivenessMillis = System .currentTimeMillis ();
75
- this . responseStreamObserver .onNext (block );
74
+ producerLivenessMillis = System .currentTimeMillis ();
75
+ responseStreamObserver .onNext (block );
76
76
}
77
77
}
78
78
@@ -84,14 +84,14 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
84
84
@ Override
85
85
public void onNext (final BlockStreamServiceGrpcProto .BlockResponse blockResponse ) {
86
86
87
- if (System .currentTimeMillis () - this . producerLivenessMillis > timeoutThresholdMillis ) {
87
+ if (System .currentTimeMillis () - producerLivenessMillis > timeoutThresholdMillis ) {
88
88
if (mediator .isSubscribed (this )) {
89
89
LOGGER .log (System .Logger .Level .DEBUG , "Producer timeout threshold exceeded. Unsubscribing observer." );
90
90
mediator .unsubscribe (this );
91
91
}
92
92
} else {
93
93
LOGGER .log (System .Logger .Level .DEBUG , "Received response block " + blockResponse );
94
- this . consumerLivenessMillis = System .currentTimeMillis ();
94
+ consumerLivenessMillis = System .currentTimeMillis ();
95
95
}
96
96
}
97
97
0 commit comments