Skip to content

Commit fef178d

Browse files
fix: found a callback to register if a downstream consumer cancels
Signed-off-by: Matt Peterson <[email protected]>
1 parent d5b1a9f commit fef178d

File tree

2 files changed

+20
-21
lines changed

2 files changed

+20
-21
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> streamSour
118118
final var streamObserver = new ConsumerBlockItemObserver(
119119
timeoutThresholdMillis,
120120
Clock.systemDefaultZone(),
121-
Clock.systemDefaultZone(),
122121
streamMediator,
123122
responseStreamObserver);
124123

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2020
import com.hedera.block.server.data.ObjectEvent;
2121
import com.hedera.block.server.mediator.StreamMediator;
22+
import io.grpc.stub.ServerCallStreamObserver;
2223
import io.grpc.stub.StreamObserver;
2324

2425
import java.time.Clock;
@@ -40,9 +41,6 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4041
private final InstantSource producerLivenessClock;
4142
private long producerLivenessMillis;
4243

43-
private final InstantSource consumerLivenessClock;
44-
private long consumerLivenessMillis;
45-
4644
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
4745

4846
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
@@ -55,17 +53,29 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
5553
public ConsumerBlockItemObserver(
5654
final long timeoutThresholdMillis,
5755
final InstantSource producerLivenessClock,
58-
final InstantSource consumerLivenessClock,
5956
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator,
6057
final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
6158

6259
this.timeoutThresholdMillis = timeoutThresholdMillis;
6360
this.producerLivenessClock = producerLivenessClock;
64-
this.consumerLivenessClock = consumerLivenessClock;
65-
this.responseStreamObserver = responseStreamObserver;
6661

62+
// The ServerCallStreamObserver can be configured with a Runnable to
63+
// be executed if a downstream consumer cancels the stream without
64+
// sending an HTTP/2 End Stream DATA frame. If triggered, unsubscribe
65+
// this observer to avoid orphaning subscribed resources.
66+
if (responseStreamObserver instanceof ServerCallStreamObserver) {
67+
68+
// Unfortunately we have to cast the responseStreamObserver to a ServerCallStreamObserver
69+
// to register the onCancelHandler.
70+
((ServerCallStreamObserver<BlockStreamServiceGrpcProto.BlockItem>)responseStreamObserver)
71+
.setOnCancelHandler(() -> {
72+
LOGGER.log(System.Logger.Level.DEBUG, "Consumer cancelled stream. Unsubscribing observer.");
73+
streamMediator.unsubscribe(this);
74+
});
75+
}
76+
77+
this.responseStreamObserver = responseStreamObserver;
6778
this.producerLivenessMillis = producerLivenessClock.millis();
68-
this.consumerLivenessMillis = consumerLivenessClock.millis();
6979

7080
this.streamMediator = streamMediator;
7181
}
@@ -77,15 +87,9 @@ public ConsumerBlockItemObserver(
7787
@Override
7888
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> event, final long l, final boolean b) throws Exception {
7989

80-
// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
81-
if (isThresholdExceeded(consumerLivenessMillis)) {
82-
LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded.");
83-
streamMediator.unsubscribe(this);
84-
} else {
85-
// Refresh the producer liveness and pass the block to the observer.
86-
producerLivenessMillis = producerLivenessClock.millis();
87-
responseStreamObserver.onNext(event.get());
88-
}
90+
// Refresh the producer liveness and pass the block to the observer.
91+
producerLivenessMillis = producerLivenessClock.millis();
92+
responseStreamObserver.onNext(event.get());
8993
}
9094

9195
/**
@@ -100,10 +104,6 @@ public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItem
100104
if (isThresholdExceeded(producerLivenessMillis)) {
101105
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
102106
streamMediator.unsubscribe(this);
103-
} else {
104-
// Refresh the consumer liveness
105-
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockItemResponse);
106-
consumerLivenessMillis = consumerLivenessClock.millis();
107107
}
108108
}
109109

0 commit comments

Comments
 (0)