Skip to content

Commit 7dad6af

Browse files
fix: working with hedera-protobufs rpc definitions and types
Signed-off-by: Matt Peterson <[email protected]>
1 parent fef178d commit 7dad6af

File tree

16 files changed

+165
-184
lines changed

16 files changed

+165
-184
lines changed

protos/src/main/protobuf/blockstream.proto

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,29 @@ syntax = "proto3";
1717
*/
1818

1919
option java_package = "com.hedera.block.protos";
20-
option java_outer_classname = "BlockStreamServiceGrpcProto";
20+
option java_outer_classname = "BlockStreamService";
2121

22-
/**
23-
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
24-
* exchanging BlockItems with the Block Node server.
25-
*
26-
* A producer (e.g. Consensus Node) can use the StreamSink method to stream BlockItems to the
27-
* Block Node server. The Block Node server will respond with a BlockResponse message for
28-
* each BlockItem received.
29-
*
30-
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
31-
* BlockItems from the server. The consumer is expected to respond with a BlockResponse message
32-
* with the id of each BlockItem received.
33-
*/
34-
service BlockStreamGrpc {
35-
/**
36-
* StreamSink is a bidirectional streaming method that allows a producer to stream BlockItems
37-
* to the Block Node server. The server will respond with a BlockResponse message for each
38-
* BlockItem received.
39-
*/
40-
rpc StreamSink (stream BlockItem) returns (stream BlockItemResponse) {}
22+
service BlockStreamGrpcService {
4123

42-
/**
43-
* StreamSource is a bidirectional streaming method that allows a consumer to request a
44-
* stream of BlockItems from the server. The consumer is expected to respond with a BlockResponse
45-
* message with the id of each BlockItem received.
46-
*/
47-
rpc StreamSource (stream BlockItemResponse) returns (stream BlockItem) {}
24+
rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {}
25+
26+
rpc subscribeBlockStream (stream SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
27+
}
28+
29+
message PublishStreamRequest {
30+
BlockItem block_item = 1;
31+
}
32+
33+
message PublishStreamResponse {
34+
uint64 block_number = 1;
35+
}
36+
37+
message SubscribeStreamRequest {
38+
uint64 start_block_number = 1;
39+
}
40+
41+
message SubscribeStreamResponse {
42+
int32 status = 1;
4843
}
4944

5045
/**
@@ -63,16 +58,3 @@ message BlockItem {
6358
*/
6459
string value = 2;
6560
}
66-
67-
/**
68-
* A BlockItemResponse is a simple message that contains an id.
69-
* The BlockItemResponse is meant to confirm the receipt of a BlockItem.
70-
* A future use case may expand on this type to communicate a failure
71-
* condition where the BlockItem needs to be resent, etc.
72-
*/
73-
message BlockItemResponse {
74-
/**
75-
* The id of the BlockItem which was received.
76-
*/
77-
int64 id = 1;
78-
}

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.hedera.block.server;
1818

1919
import com.google.protobuf.Descriptors;
20-
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2120
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
2221
import com.hedera.block.server.data.ObjectEvent;
2322
import com.hedera.block.server.mediator.StreamMediator;
@@ -28,6 +27,11 @@
2827
import java.time.Clock;
2928

3029
import static com.hedera.block.server.Constants.*;
30+
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
31+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
32+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
33+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
34+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
3135

3236
/**
3337
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
@@ -42,15 +46,15 @@ public class BlockStreamService implements GrpcService {
4246
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4347

4448
private final long timeoutThresholdMillis;
45-
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
49+
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
4650

4751
/**
4852
* Constructor for the BlockStreamService class.
4953
*
5054
* @param timeoutThresholdMillis the timeout threshold in milliseconds
5155
*/
5256
public BlockStreamService(final long timeoutThresholdMillis,
53-
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator) {
57+
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator) {
5458
this.timeoutThresholdMillis = timeoutThresholdMillis;
5559
this.streamMediator = streamMediator;
5660
}
@@ -62,7 +66,7 @@ public BlockStreamService(final long timeoutThresholdMillis,
6266
*/
6367
@Override
6468
public Descriptors.FileDescriptor proto() {
65-
return BlockStreamServiceGrpcProto.getDescriptor();
69+
return com.hedera.block.protos.BlockStreamService.getDescriptor();
6670
}
6771

6872
/**
@@ -83,43 +87,25 @@ public String serviceName() {
8387
*/
8488
@Override
8589
public void update(final Routing routing) {
86-
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
87-
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
90+
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream);
91+
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
8892
}
8993

90-
/**
91-
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
92-
*
93-
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
94-
*
95-
* @return a custom StreamObserver to handle streaming blockItems from the producer to all subscribed consumer
96-
* via the streamMediator as well as sending responses back to the producer.
97-
*/
98-
private StreamObserver<BlockStreamServiceGrpcProto.BlockItem> streamSink(
99-
final StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> responseStreamObserver) {
94+
private StreamObserver<PublishStreamRequest> publishBlockStream(final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
10095
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
10196

102-
return new ProducerBlockItemObserver(streamMediator, responseStreamObserver);
97+
return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
10398
}
10499

105-
/**
106-
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
107-
*
108-
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
109-
* back to the server.
110-
*
111-
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
112-
* as handling responses from the consumer.
113-
*/
114-
private StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
100+
private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(final StreamObserver<BlockItem> subscribeStreamRequestObserver) {
115101
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
116102

117103
// Return a custom StreamObserver to handle streaming blocks from the producer.
118104
final var streamObserver = new ConsumerBlockItemObserver(
119105
timeoutThresholdMillis,
120106
Clock.systemDefaultZone(),
121107
streamMediator,
122-
responseStreamObserver);
108+
subscribeStreamRequestObserver);
123109

124110
streamMediator.subscribe(streamObserver);
125111

server/src/main/java/com/hedera/block/server/Constants.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private Constants() {}
2727
public static final String BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY = "blocknode.server.consumer.timeout.threshold";
2828

2929
// Constants specified in the service definition of the .proto file
30-
public static final String SERVICE_NAME = "BlockStreamGrpc";
31-
public static final String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
32-
public static final String SERVER_STREAMING_METHOD_NAME = "StreamSource";
30+
public static final String SERVICE_NAME = "BlockStreamGrpcService";
31+
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
32+
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";
3333
}

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.hedera.block.server;
1818

19-
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2019
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2120
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
2221
import com.hedera.block.server.persistence.storage.BlockStorage;
@@ -28,18 +27,26 @@
2827
import io.helidon.webserver.grpc.GrpcRouting;
2928

3029
import java.io.IOException;
31-
import java.util.stream.Stream;
3230

3331
import static com.hedera.block.server.Constants.*;
32+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
33+
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
34+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
35+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
36+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
3437

3538
/**
3639
* Main class for the block node server
3740
*/
3841
public class Server {
3942

43+
// public interface BidiStreamingMethod<ReqT, RespT> extends ServerCalls.StreamingRequestMethod<ReqT, RespT> {
44+
// @Override StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
45+
// }
46+
4047
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
41-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItem>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> clientBidiStreamingMethod;
42-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItemResponse>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> serverBidiStreamingMethod;
48+
private static ServerCalls.BidiStreamingMethod<StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>> clientBidiStreamingMethod;
49+
private static ServerCalls.BidiStreamingMethod<StreamObserver<SubscribeStreamRequest>, StreamObserver<SubscribeStreamResponse>> serverBidiStreamingMethod;
4350

4451
private static final System.Logger LOGGER = System.getLogger(Server.class.getName());
4552

@@ -62,7 +69,7 @@ public static void main(final String[] args) {
6269
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
6370

6471
// Initialize the block storage, cache, and service
65-
final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
72+
final BlockStorage<BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
6673
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
6774
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
6875

@@ -71,11 +78,11 @@ public static void main(final String[] args) {
7178
.port(8080)
7279
.addRouting(GrpcRouting.builder()
7380
.service(blockStreamService)
74-
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
81+
.bidi(com.hedera.block.protos.BlockStreamService.getDescriptor(),
7582
SERVICE_NAME,
7683
CLIENT_STREAMING_METHOD_NAME,
7784
clientBidiStreamingMethod)
78-
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
85+
.bidi(com.hedera.block.protos.BlockStreamService.getDescriptor(),
7986
SERVICE_NAME,
8087
SERVER_STREAMING_METHOD_NAME,
8188
serverBidiStreamingMethod))

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.hedera.block.server.consumer;
1818

19-
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2019
import com.hedera.block.server.data.ObjectEvent;
2120
import com.hedera.block.server.mediator.StreamMediator;
2221
import io.grpc.stub.ServerCallStreamObserver;
@@ -26,15 +25,18 @@
2625
import java.time.InstantSource;
2726
import java.util.concurrent.CountDownLatch;
2827

28+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
29+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
30+
2931
/**
3032
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
3133
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
3234
*/
33-
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {
35+
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockItem>, SubscribeStreamRequest> {
3436

3537
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3638

37-
private final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver;
39+
private final StreamObserver<BlockItem> subscribeStreamResponseObserver;
3840

3941
private final long timeoutThresholdMillis;
4042

@@ -43,18 +45,18 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4345

4446
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
4547

46-
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
48+
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
4749

4850
/**
4951
* Constructor for the LiveStreamObserverImpl class.
5052
*
51-
* @param responseStreamObserver the response stream observer
53+
* @param subscribeStreamResponseObserver the response stream observer
5254
*/
5355
public ConsumerBlockItemObserver(
5456
final long timeoutThresholdMillis,
5557
final InstantSource producerLivenessClock,
56-
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator,
57-
final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
58+
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
59+
final StreamObserver<BlockItem> subscribeStreamResponseObserver) {
5860

5961
this.timeoutThresholdMillis = timeoutThresholdMillis;
6062
this.producerLivenessClock = producerLivenessClock;
@@ -63,18 +65,18 @@ public ConsumerBlockItemObserver(
6365
// be executed if a downstream consumer cancels the stream without
6466
// sending an HTTP/2 End Stream DATA frame. If triggered, unsubscribe
6567
// this observer to avoid orphaning subscribed resources.
66-
if (responseStreamObserver instanceof ServerCallStreamObserver) {
68+
if (subscribeStreamResponseObserver instanceof ServerCallStreamObserver) {
6769

6870
// Unfortunately we have to cast the responseStreamObserver to a ServerCallStreamObserver
6971
// to register the onCancelHandler.
70-
((ServerCallStreamObserver<BlockStreamServiceGrpcProto.BlockItem>)responseStreamObserver)
72+
((ServerCallStreamObserver<BlockItem>)subscribeStreamResponseObserver)
7173
.setOnCancelHandler(() -> {
7274
LOGGER.log(System.Logger.Level.DEBUG, "Consumer cancelled stream. Unsubscribing observer.");
7375
streamMediator.unsubscribe(this);
7476
});
7577
}
7678

77-
this.responseStreamObserver = responseStreamObserver;
79+
this.subscribeStreamResponseObserver = subscribeStreamResponseObserver;
7880
this.producerLivenessMillis = producerLivenessClock.millis();
7981

8082
this.streamMediator = streamMediator;
@@ -85,11 +87,11 @@ public ConsumerBlockItemObserver(
8587
*
8688
*/
8789
@Override
88-
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> event, final long l, final boolean b) throws Exception {
90+
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) throws Exception {
8991

9092
// Refresh the producer liveness and pass the block to the observer.
9193
producerLivenessMillis = producerLivenessClock.millis();
92-
responseStreamObserver.onNext(event.get());
94+
subscribeStreamResponseObserver.onNext(event.get());
9395
}
9496

9597
/**
@@ -98,7 +100,7 @@ public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> eve
98100
* @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer.
99101
*/
100102
@Override
101-
public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItemResponse) {
103+
public void onNext(final SubscribeStreamRequest blockItemResponse) {
102104

103105
// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
104106
if (isThresholdExceeded(producerLivenessMillis)) {

0 commit comments

Comments
 (0)