Skip to content

Commit 51ab972

Browse files
wip: first bidi implementation
Signed-off-by: Matt Peterson <[email protected]>
1 parent add6070 commit 51ab972

File tree

9 files changed

+128
-102
lines changed

9 files changed

+128
-102
lines changed

consumer.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
echo "Starting consumer..."
44
starting_block=$1
5-
account_id=$2
65

7-
payload="{\"id\": $starting_block, \"accountId\": \"$account_id\"}"
6+
payload="{\"id\": $starting_block}"
87

98
grpcurl -vv -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d "$payload" localhost:8080 BlockStreamGrpc/StreamSource
109

protos/src/main/protobuf/blockstream.proto

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,14 @@ option java_outer_classname = "BlockStreamServiceGrpcProto";
55

66
service BlockStreamGrpc {
77
rpc StreamSink(stream Block) returns (stream BlockResponse) {}
8-
rpc StreamSource(BlockRequest) returns (stream Block) {}
8+
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
99
}
1010

1111
message Block {
1212
int64 id = 1;
1313
string value = 2;
1414
}
1515

16-
message BlockRequest {
17-
int64 id = 1;
18-
string accountId = 2;
19-
}
20-
2116
message BlockResponse {
2217
int64 id = 1;
2318
}

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,21 @@
2121
import com.google.protobuf.Descriptors;
2222
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2323
import com.hedera.block.server.observe.InboundBlockStreamObserver;
24-
import com.hedera.block.server.observe.subscribe.HistoricStreamObserver;
2524
import com.hedera.block.server.observe.mediate.StreamMediator;
2625
import com.hedera.block.server.observe.mediate.StreamMediatorImpl;
26+
import com.hedera.block.server.observe.subscribe.HistoricStreamObserver;
27+
import com.hedera.block.server.observe.subscribe.LiveStreamObserver;
2728
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2829
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
2930
import com.hedera.block.server.persistence.cache.BlockCache;
3031
import com.hedera.block.server.persistence.cache.LRUCache;
3132
import com.hedera.block.server.persistence.storage.BlockStorage;
3233
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
33-
import io.grpc.stub.ServerCallStreamObserver;
3434
import io.grpc.stub.StreamObserver;
3535
import io.helidon.config.Config;
3636
import io.helidon.webserver.grpc.GrpcService;
3737

3838
import java.io.IOException;
39-
import java.util.Optional;
4039
import java.util.logging.Logger;
4140

4241
import static com.hedera.block.server.Constants.*;
@@ -75,40 +74,26 @@ public String serviceName() {
7574

7675
@Override
7776
public void update(Routing routing) {
78-
routing.clientStream(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
79-
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::streamSource);
77+
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
78+
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
8079
}
8180

82-
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> observer) {
81+
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
8382
logger.info("Executing bidirectional streamSink method");
8483

85-
return new InboundBlockStreamObserver(streamMediator::onNext, observer);
84+
return new InboundBlockStreamObserver(streamMediator, responseStreamObserver);
8685
}
8786

88-
private void streamSource(BlockStreamServiceGrpcProto.BlockRequest blockRequest, StreamObserver<BlockStreamServiceGrpcProto.Block> observer) {
87+
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
8988
logger.info("Executing streamSource method");
90-
logger.info("Block request: " + blockRequest);
91-
92-
// ServerCallStreamObserver<BlockStreamServiceGrpcProto.Block> serverCallStreamObserver = (ServerCallStreamObserver<BlockStreamServiceGrpcProto.Block>)observer;
93-
// serverCallStreamObserver.setOnCancelHandler(() -> {
94-
// logger.info(">>> Cancelling streamSink method");
95-
// });
96-
//
97-
// serverCallStreamObserver.setOnCloseHandler(() -> {
98-
// logger.info(">>> Closing streamSink method");
99-
// });
100-
//
101-
// serverCallStreamObserver.setOnReadyHandler(() -> {
102-
// logger.info(">>> Ready streamSink method");
103-
// });
104-
105-
StreamObserver<BlockStreamServiceGrpcProto.Block> h = new HistoricStreamObserver(
106-
blockRequest.getId(),
107-
100,
89+
90+
LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamObserver = new HistoricStreamObserver(
10891
streamMediator,
109-
blockPersistenceHandler,
110-
observer);
111-
streamMediator.subscribe(h::onNext);
92+
responseStreamObserver,
93+
blockPersistenceHandler);
94+
streamMediator.subscribe(streamObserver);
95+
96+
return streamObserver;
11297
}
11398
}
11499

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
public class Server {
3636

3737
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
38-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Empty>> clientBidiStreamingMethod;
39-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
38+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
39+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
4040

4141
private Server() {
4242
// Not meant to be instantiated

server/src/main/java/com/hedera/block/server/observe/InboundBlockStreamObserver.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020

2121

2222
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
23+
import com.hedera.block.server.observe.mediate.StreamMediator;
2324
import io.grpc.stub.StreamObserver;
2425

25-
import java.util.function.Consumer;
2626
import java.util.logging.Logger;
2727

2828
public class InboundBlockStreamObserver implements StreamObserver<BlockStreamServiceGrpcProto.Block> {
2929

30-
private final Consumer<BlockStreamServiceGrpcProto.Block> streamMediator;
30+
private final StreamMediator<BlockStreamServiceGrpcProto.Block> streamMediator;
3131
private final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> outboundBlockStreamObserver;
3232
private final Logger logger = Logger.getLogger(getClass().getName());
3333

34-
public InboundBlockStreamObserver(Consumer<BlockStreamServiceGrpcProto.Block> streamMediator,
35-
34+
public InboundBlockStreamObserver(StreamMediator<BlockStreamServiceGrpcProto.Block> streamMediator,
3635
StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> outboundBlockStreamObserver) {
3736
this.streamMediator = streamMediator;
3837
this.outboundBlockStreamObserver = outboundBlockStreamObserver;
@@ -42,7 +41,7 @@ public InboundBlockStreamObserver(Consumer<BlockStreamServiceGrpcProto.Block> st
4241
public void onNext(BlockStreamServiceGrpcProto.Block block) {
4342
logger.fine("onNext called");
4443

45-
this.streamMediator.accept(block);
44+
streamMediator.onNext(block);
4645

4746
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().setId(block.getId()).build();
4847
this.outboundBlockStreamObserver.onNext(blockResponse);

server/src/main/java/com/hedera/block/server/observe/mediate/StreamMediator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
package com.hedera.block.server.observe.mediate;
2020

21+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
22+
import com.hedera.block.server.BlockStreamService;
23+
import com.hedera.block.server.observe.subscribe.LiveStreamObserver;
2124
import io.grpc.stub.StreamObserver;
2225

2326
import java.util.function.Consumer;
2427

2528
public interface StreamMediator<V> extends StreamObserver<V> {
26-
void subscribe(Consumer<V> c);
29+
void subscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> c);
30+
void unsubscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> c);
2731
}

server/src/main/java/com/hedera/block/server/observe/mediate/StreamMediatorImpl.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
package com.hedera.block.server.observe.mediate;
2020

2121
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
22+
import com.hedera.block.server.observe.subscribe.LiveStreamObserver;
2223
import com.hedera.block.server.persistence.BlockPersistenceHandler;
23-
import io.grpc.stub.StreamObserver;
2424

25-
import java.util.*;
25+
import java.util.Queue;
2626
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.function.Consumer;
2827

2928
public class StreamMediatorImpl implements StreamMediator<BlockStreamServiceGrpcProto.Block> {
30-
private final Queue<Consumer<BlockStreamServiceGrpcProto.Block>> subscribers;
29+
30+
private final Queue<LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block>> subscribers;
3131
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
3232

3333
public StreamMediatorImpl(BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
@@ -36,37 +36,38 @@ public StreamMediatorImpl(BlockPersistenceHandler<BlockStreamServiceGrpcProto.Bl
3636
}
3737

3838
@Override
39-
public void subscribe(Consumer<BlockStreamServiceGrpcProto.Block> consumer) {
40-
this.subscribers.add(consumer);
39+
public void subscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> liveStreamObserver) {
40+
subscribers.add(liveStreamObserver);
41+
}
42+
43+
@Override
44+
public void unsubscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> liveStreamObserver) {
45+
4146
}
4247

4348
@Override
4449
public void onNext(BlockStreamServiceGrpcProto.Block block) {
4550

4651
// Proxy the block to all subscribers
47-
// int window = subscribers.size();
48-
// for (int i = 0;i < window;i++) {
49-
// Consumer<BlockStreamServiceGrpcProto.Block> consumer = subscribers.poll();
50-
// if (consumer != null) {
51-
// consumer.accept(block);
52-
// }
53-
// }
52+
for (LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> subscriber : subscribers) {
53+
subscriber.notify(block);
54+
}
5455

5556
// Persist the block
5657
blockPersistenceHandler.persist(block);
5758
}
5859

5960
@Override
6061
public void onError(Throwable t) {
61-
// for (Consumer<BlockStreamServiceGrpcProto.Block> o : subscribers) {
62-
// o.onError(t);
62+
// for (StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> subscriber : subscribers) {
63+
// subscriber.onError(t);
6364
// }
6465
}
6566

6667
@Override
6768
public void onCompleted() {
68-
// for (StreamObserver<BlockStreamServiceGrpcProto.Block> o : subscribers.values()) {
69-
// o.onCompleted();
69+
// for (StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> subscriber : subscribers) {
70+
// subscriber.onCompleted();
7071
// }
7172
}
7273
}

server/src/main/java/com/hedera/block/server/observe/subscribe/HistoricStreamObserver.java

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,82 +23,100 @@
2323
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2424
import io.grpc.stub.StreamObserver;
2525

26+
import java.time.Duration;
2627
import java.util.LinkedList;
28+
import java.util.Optional;
2729
import java.util.Queue;
28-
import java.util.function.Consumer;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.logging.Logger;
2932

30-
public class HistoricStreamObserver implements StreamObserver<BlockStreamServiceGrpcProto.Block> {
33+
public class HistoricStreamObserver implements LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> {
34+
35+
private final Logger logger = Logger.getLogger(getClass().getName());
3136

32-
private final long requestedStartingBlockId;
33-
private final int offset;
3437
private final StreamMediator<BlockStreamServiceGrpcProto.Block> mediator;
38+
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
3539
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
36-
private final StreamObserver<BlockStreamServiceGrpcProto.Block> streamObserver;
3740

38-
private final Queue<BlockStreamServiceGrpcProto.Block> liveStream;
3941
private final Queue<BlockStreamServiceGrpcProto.Block> historicStream;
4042

41-
private long firstLiveBlockId = -1;
42-
private long counter;
43-
44-
43+
private final Queue<BlockStreamServiceGrpcProto.Block> blocks = new LinkedBlockingQueue<>();
4544

4645
public HistoricStreamObserver(
47-
long requestedStartingBlockId,
48-
int offset,
4946
StreamMediator<BlockStreamServiceGrpcProto.Block> mediator,
50-
BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler,
51-
StreamObserver<BlockStreamServiceGrpcProto.Block> streamObserver) {
47+
StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver,
48+
BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
5249

53-
this.requestedStartingBlockId = requestedStartingBlockId;
54-
this.offset = offset;
5550
this.mediator = mediator;
51+
this.responseStreamObserver = responseStreamObserver;
5652
this.blockPersistenceHandler = blockPersistenceHandler;
57-
this.streamObserver = streamObserver;
5853

59-
this.liveStream = new LinkedList<>();
6054
this.historicStream = new LinkedList<>();
6155
}
6256

6357
@Override
64-
public void onNext(BlockStreamServiceGrpcProto.Block block) {
65-
if (firstLiveBlockId == -1) {
66-
firstLiveBlockId = block.getId();
67-
}
68-
69-
// Buffer the live blocks
70-
liveStream.add(block);
58+
public void notify(BlockStreamServiceGrpcProto.Block block) {
59+
blocks.add(block);
60+
}
7161

72-
if (counter < firstLiveBlockId - 1) {
62+
@Override
63+
public void onNext(BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
64+
logger.info("Received response block " + blockResponse);
7365

74-
// Fetch the historic blocks
75-
historicStream.addAll(blockPersistenceHandler.readRange(requestedStartingBlockId, firstLiveBlockId - 1));
76-
while (historicStream.peek() != null) {
77-
// Send the historic blocks downstream to the subscriber
78-
streamObserver.onNext(historicStream.poll());
79-
counter++;
66+
while (true) {
67+
while (!blocks.isEmpty()) {
68+
responseStreamObserver.onNext(blocks.poll());
8069
}
81-
} else {
8270

83-
while (liveStream.peek() != null) {
84-
// Send the buffered blocks downstream to the subscriber
85-
streamObserver.onNext(liveStream.poll());
71+
try {
72+
Thread.sleep(Duration.ofMillis(50));
73+
} catch (InterruptedException e) {
74+
throw new RuntimeException(e);
8675
}
8776
}
88-
89-
this.mediator.subscribe(this::onNext);
9077
}
9178

79+
// @Override
80+
// public void onNext(BlockStreamServiceGrpcProto.Block block) {
81+
// if (firstLiveBlockId == -1) {
82+
// firstLiveBlockId = block.getId();
83+
// }
84+
//
85+
// // Buffer the live blocks
86+
// liveStream.add(block);
87+
//
88+
// if (counter < firstLiveBlockId - 1) {
89+
//
90+
// // Fetch the historic blocks
91+
// historicStream.addAll(blockPersistenceHandler.readRange(1, firstLiveBlockId - 1));
92+
// while (historicStream.peek() != null) {
93+
// // Send the historic blocks downstream to the subscriber
94+
// outboundStreamObserver.onNext(historicStream.poll());
95+
// counter++;
96+
// }
97+
// } else {
98+
//
99+
// while (liveStream.peek() != null) {
100+
// // Send the buffered blocks downstream to the subscriber
101+
// outboundStreamObserver.onNext(liveStream.poll());
102+
// }
103+
// }
104+
//
105+
// this.mediator.subscribe(this::onNext);
106+
// }
107+
92108
@Override
93109
public void onError(Throwable t) {
94-
streamObserver.onError(t);
110+
// outboundStreamObserver.onError(t);
95111
}
96112

97113
@Override
98114
public void onCompleted() {
99-
streamObserver.onCompleted();
115+
logger.info("Completed");
116+
// outboundStreamObserver.onCompleted();
100117
}
101118

119+
102120
// private static class StreamGenerator {
103121
//
104122
// private final int inactivityLimit;

0 commit comments

Comments
 (0)