Skip to content

Commit 2c30b16

Browse files
wip: added awaitTimeout
Signed-off-by: Matt Peterson <[email protected]>
1 parent 17ea42d commit 2c30b16

File tree

3 files changed

+19
-2
lines changed

3 files changed

+19
-2
lines changed

server/src/main/java/com/hedera/block/server/consumer/BlockEventHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
import io.grpc.stub.StreamObserver;
2020

2121
public interface BlockEventHandler<U, V> extends StreamObserver<V>, EventHandler<U> {
22+
void awaitShutdown() throws InterruptedException;
2223
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockStreamObserver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.time.Clock;
2525
import java.time.InstantSource;
26+
import java.util.concurrent.CountDownLatch;
2627

2728
/**
2829
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
@@ -42,6 +43,8 @@ public class ConsumerBlockStreamObserver implements BlockEventHandler<ObjectEven
4243
private final InstantSource consumerLivenessClock;
4344
private long consumerLivenessMillis;
4445

46+
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
47+
4548
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
4649

4750
/**
@@ -135,4 +138,9 @@ private boolean isThresholdExceeded(long livenessMillis) {
135138

136139
return false;
137140
}
141+
142+
@Override
143+
public void awaitShutdown() throws InterruptedException {
144+
shutdownLatch.await();
145+
}
138146
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,19 @@ public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGr
6464

6565
@Override
6666
public void publishEvent(BlockStreamServiceGrpcProto.Block block) {
67+
68+
// Publish the block for all subscribers to receive
6769
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", block);
6870
ringBuffer.publishEvent((event, sequence) -> event.set(block));
69-
71+
72+
// Block persistence
7073
blockPersistenceHandler.persist(block);
7174
}
7275

7376
@Override
7477
public void subscribe(final BlockEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
7578

79+
// Initialize the batch event processor and set it on the ring buffer
7680
final var batchEventProcessor = new BatchEventProcessorBuilder()
7781
.build(ringBuffer, ringBuffer.newBarrier(), handler);
7882

@@ -90,7 +94,11 @@ public void unsubscribe(final BlockEventHandler<ObjectEvent<BlockStreamServiceGr
9094
batchEventProcessor.halt();
9195

9296
// Wait for shutdown the complete
93-
// handler.awaitShutdown();
97+
try {
98+
handler.awaitShutdown();
99+
} catch (InterruptedException e) {
100+
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while waiting for shutdown", e);
101+
}
94102

95103
// Remove the gating sequence from the ring buffer
96104
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());

0 commit comments

Comments
 (0)