Skip to content

Commit 6e392a8

Browse files
fix: removed bidi interface requirements after server streaming change
Signed-off-by: Matt Peterson <[email protected]>
1 parent 7cc9abd commit 6e392a8

File tree

4 files changed

+16
-56
lines changed

4 files changed

+16
-56
lines changed

server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
package com.hedera.block.server.consumer;
1818

1919
import com.lmax.disruptor.EventHandler;
20-
import io.grpc.stub.StreamObserver;
2120

22-
public interface BlockItemEventHandler<U, V> extends StreamObserver<V>, EventHandler<U> {
21+
public interface BlockItemEventHandler<U> extends EventHandler<U> {
2322
void awaitShutdown() throws InterruptedException;
2423
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
* the downstream consumer via the notify method and manage the bidirectional stream to the consumer
3232
* via the onNext, onError, and onCompleted methods.
3333
*/
34-
public class ConsumerBlockItemObserver
35-
implements BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest> {
34+
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockItem>> {
3635

3736
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3837

@@ -89,55 +88,19 @@ public ConsumerBlockItemObserver(
8988
@Override
9089
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {
9190

92-
// Refresh the producer liveness and pass the block to the observer.
93-
producerLivenessMillis = producerLivenessClock.millis();
94-
95-
final BlockItem blockItem = event.get();
96-
final SubscribeStreamResponse subscribeStreamResponse =
97-
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
98-
99-
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
100-
}
101-
102-
/**
103-
* The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the
104-
* bidirectional stream.
105-
*/
106-
@Override
107-
public void onNext(final SubscribeStreamRequest subscribeStreamRequest) {
108-
109-
// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
11091
if (isThresholdExceeded(producerLivenessMillis)) {
111-
LOGGER.log(
112-
System.Logger.Level.DEBUG,
113-
"Producer timeout threshold exceeded. Unsubscribing observer.");
11492
streamMediator.unsubscribe(this);
115-
}
116-
}
93+
} else {
11794

118-
/**
119-
* The onError() method is triggered by Helidon when an error occurs on the bidirectional stream
120-
* to the downstream consumer. Unsubscribe the observer from the mediator.
121-
*
122-
* @param t the error occurred on the stream
123-
*/
124-
@Override
125-
public void onError(final Throwable t) {
126-
LOGGER.log(
127-
System.Logger.Level.ERROR,
128-
"Unexpected consumer stream communication failure: %s".formatted(t),
129-
t);
130-
}
95+
// Refresh the producer liveness and pass the block to the observer.
96+
producerLivenessMillis = producerLivenessClock.millis();
13197

132-
/**
133-
* The onCompleted() method is triggered by Helidon when the bidirectional stream to the
134-
* downstream consumer is completed. This implementation will then unsubscribe the observer from
135-
* the mediator.
136-
*/
137-
@Override
138-
public void onCompleted() {
139-
LOGGER.log(System.Logger.Level.DEBUG, "gRPC connection completed.");
140-
streamMediator.unsubscribe(this);
98+
final BlockItem blockItem = event.get();
99+
final SubscribeStreamResponse subscribeStreamResponse =
100+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
101+
102+
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
103+
}
141104
}
142105

143106
private boolean isThresholdExceeded(long livenessMillis) {

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class LiveStreamMediatorImpl
4646
private final ExecutorService executor;
4747

4848
private final Map<
49-
BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest>,
49+
BlockItemEventHandler<ObjectEvent<BlockItem>>,
5050
BatchEventProcessor<ObjectEvent<BlockItem>>>
5151
subscribers = new HashMap<>();
5252

@@ -80,8 +80,7 @@ public void publishEvent(BlockItem blockItem) {
8080
}
8181

8282
@Override
83-
public void subscribe(
84-
final BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest> handler) {
83+
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockItem>> handler) {
8584

8685
// Initialize the batch event processor and set it on the ring buffer
8786
final var batchEventProcessor =
@@ -95,8 +94,7 @@ public void subscribe(
9594
}
9695

9796
@Override
98-
public void unsubscribe(
99-
final BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest> handler) {
97+
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockItem>> handler) {
10098
final var batchEventProcessor = subscribers.remove(handler);
10199

102100
// Stop the processor

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public interface StreamMediator<U, V> {
3636

3737
void publishEvent(final BlockItem blockItem);
3838

39-
void subscribe(final BlockItemEventHandler<U, V> handler);
39+
void subscribe(final BlockItemEventHandler<U> handler);
4040

41-
void unsubscribe(final BlockItemEventHandler<U, V> handler);
41+
void unsubscribe(final BlockItemEventHandler<U> handler);
4242
}

0 commit comments

Comments
 (0)