Skip to content

Commit 7f207a3

Browse files
push to streaming might be fixed
Signed-off-by: Matt Peterson <[email protected]>
1 parent e8e2b31 commit 7f207a3

File tree

7 files changed

+60
-178
lines changed

7 files changed

+60
-178
lines changed

consumer.sh

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,29 @@ echo "Starting consumer..."
66
GRPC_SERVER="localhost:8080"
77
GRPC_METHOD="BlockStreamGrpc/StreamSource"
88

9-
# Function to handle received messages and send replies
10-
handle_streaming() {
11-
while read -r line; do
12-
id=$(echo "$line" | jq -r '.id')
13-
if [[ $id != "null" ]]; then
14-
echo "{\"id\": $id}"
15-
fi
16-
done
17-
}
18-
19-
# Trap SIGINT to clean up
209
trap "echo 'Received SIGINT, stopping...'; exit 0" SIGINT
2110

2211
payload="{\"id\": 0}"
2312

13+
(
14+
iter=$1
15+
while true; do
16+
echo "{\"id\": $iter}"
17+
18+
if [ $iter -eq $2 ]; then
19+
exit 0
20+
fi
21+
22+
((iter++))
23+
24+
sleep 0.25
25+
26+
done
27+
) | grpcurl -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d @ $GRPC_SERVER $GRPC_METHOD
28+
2429
#grpcurl -vv -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d "$payload" localhost:8080 BlockStreamGrpc/StreamSource
2530

2631
# Initiate the bidirectional streaming connection
27-
grpcurl -vv -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d "$payload" $GRPC_SERVER $GRPC_METHOD
28-
#{} # Initial empty request if needed
29-
EOF | handle_streaming | grpcurl -plaintext -d @ $GRPC_SERVER $GRPC_METHOD --format json
32+
#grpcurl -vv -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d "$payload" $GRPC_SERVER $GRPC_METHOD | handle_streaming | grpcurl -plaintext -d @ $GRPC_SERVER $GRPC_METHOD
33+
#grpcurl -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d @ $GRPC_SERVER $GRPC_METHOD
34+
#handle_streaming | grpcurl -plaintext -d @ $GRPC_SERVER $GRPC_METHOD --format json

producer.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ trap cleanup SIGINT
5151

5252
((iter++))
5353

54-
sleep 1
54+
sleep 0.25
5555
done
5656
) | grpcurl -vv -plaintext -proto ./protos/src/main/protobuf/blockstream.proto -d @ $GRPC_SERVER $GRPC_METHOD &
5757

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.hedera.block.server.observe.mediate.StreamMediatorImpl;
2626
import com.hedera.block.server.observe.subscribe.HistoricStreamObserver;
2727
import com.hedera.block.server.observe.subscribe.LiveStreamObserver;
28-
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2928
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
3029
import com.hedera.block.server.persistence.cache.BlockCache;
3130
import com.hedera.block.server.persistence.cache.LRUCache;
@@ -43,8 +42,7 @@
4342
public class BlockStreamService implements GrpcService {
4443

4544
private final Logger logger = Logger.getLogger(getClass().getName());
46-
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
47-
private final StreamMediator<BlockStreamServiceGrpcProto.Block> streamMediator;
45+
private final StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamMediator;
4846

4947
public BlockStreamService() {
5048
try {
@@ -54,8 +52,7 @@ public BlockStreamService() {
5452

5553
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
5654
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(10L);
57-
this.blockPersistenceHandler = new WriteThroughCacheHandler(blockStorage, blockCache);
58-
this.streamMediator = new StreamMediatorImpl(blockPersistenceHandler);
55+
this.streamMediator = new StreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
5956

6057
} catch (IOException e) {
6158
throw new RuntimeException(e);
@@ -85,12 +82,11 @@ private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObser
8582
}
8683

8784
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
88-
logger.info("Executing streamSource method");
85+
logger.info("Executing bidirectional streamSource method");
8986

9087
LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamObserver = new HistoricStreamObserver(
9188
streamMediator,
92-
responseStreamObserver,
93-
blockPersistenceHandler);
89+
responseStreamObserver);
9490
streamMediator.subscribe(streamObserver);
9591

9692
return streamObserver;

server/src/main/java/com/hedera/block/server/observe/InboundBlockStreamObserver.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727

2828
public class InboundBlockStreamObserver implements StreamObserver<BlockStreamServiceGrpcProto.Block> {
2929

30-
private final StreamMediator<BlockStreamServiceGrpcProto.Block> streamMediator;
31-
private final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> outboundBlockStreamObserver;
30+
private final StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamMediator;
31+
private final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver;
3232
private final Logger logger = Logger.getLogger(getClass().getName());
3333

34-
public InboundBlockStreamObserver(StreamMediator<BlockStreamServiceGrpcProto.Block> streamMediator,
35-
StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> outboundBlockStreamObserver) {
34+
public InboundBlockStreamObserver(StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamMediator,
35+
StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
3636
this.streamMediator = streamMediator;
37-
this.outboundBlockStreamObserver = outboundBlockStreamObserver;
37+
this.responseStreamObserver = responseStreamObserver;
3838
}
3939

4040
@Override
@@ -44,18 +44,17 @@ public void onNext(BlockStreamServiceGrpcProto.Block block) {
4444
streamMediator.onNext(block);
4545

4646
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().setId(block.getId()).build();
47-
this.outboundBlockStreamObserver.onNext(blockResponse);
47+
this.responseStreamObserver.onNext(blockResponse);
4848
}
4949

5050
@Override
5151
public void onError(Throwable t) {
5252
logger.severe("onError called: " + t.getMessage());
53-
this.outboundBlockStreamObserver.onError(t);
5453
}
5554

5655
@Override
5756
public void onCompleted() {
58-
logger.fine("onCompleted called");
59-
this.outboundBlockStreamObserver.onCompleted();
57+
logger.info("InboundBlockStreamObserver onCompleted called");
58+
this.streamMediator.unsubscribeAll();
6059
}
6160
}

server/src/main/java/com/hedera/block/server/observe/mediate/StreamMediator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525

2626
import java.util.function.Consumer;
2727

28-
public interface StreamMediator<V> extends StreamObserver<V> {
29-
void subscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> c);
30-
void unsubscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> c);
28+
public interface StreamMediator<V, S> {
29+
void subscribe(LiveStreamObserver<V, S> observer);
30+
void unsubscribe(LiveStreamObserver<V, S> observer);
31+
void unsubscribeAll();
32+
void onNext(S block);
3133
}

server/src/main/java/com/hedera/block/server/observe/mediate/StreamMediatorImpl.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,30 @@
2525
import java.util.Queue;
2626
import java.util.concurrent.LinkedBlockingQueue;
2727

28-
public class StreamMediatorImpl implements StreamMediator<BlockStreamServiceGrpcProto.Block> {
28+
public class StreamMediatorImpl implements StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> {
2929

30-
private final Queue<LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block>> subscribers;
31-
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
30+
private final Queue<LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block>> subscribers = new LinkedBlockingQueue<>();
31+
// private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
3232

3333
public StreamMediatorImpl(BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
34-
this.blockPersistenceHandler = blockPersistenceHandler;
35-
this.subscribers = new LinkedBlockingQueue<>();
34+
// this.blockPersistenceHandler = blockPersistenceHandler;
3635
}
3736

3837
@Override
3938
public void subscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> liveStreamObserver) {
40-
subscribers.add(liveStreamObserver);
39+
this.subscribers.add(liveStreamObserver);
4140
}
4241

4342
@Override
4443
public void unsubscribe(LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> liveStreamObserver) {
44+
if (!this.subscribers.isEmpty()) {
45+
this.subscribers.remove(liveStreamObserver);
46+
}
47+
}
4548

49+
@Override
50+
public void unsubscribeAll() {
51+
this.subscribers.clear();
4652
}
4753

4854
@Override
@@ -56,18 +62,4 @@ public void onNext(BlockStreamServiceGrpcProto.Block block) {
5662
// Persist the block
5763
// blockPersistenceHandler.persist(block);
5864
}
59-
60-
@Override
61-
public void onError(Throwable t) {
62-
// for (StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> subscriber : subscribers) {
63-
// subscriber.onError(t);
64-
// }
65-
}
66-
67-
@Override
68-
public void onCompleted() {
69-
// for (StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> subscriber : subscribers) {
70-
// subscriber.onCompleted();
71-
// }
72-
}
7365
}

server/src/main/java/com/hedera/block/server/observe/subscribe/HistoricStreamObserver.java

Lines changed: 11 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -24,158 +24,46 @@
2424
import io.grpc.stub.StreamObserver;
2525

2626
import java.time.Duration;
27-
import java.util.LinkedList;
28-
import java.util.Optional;
29-
import java.util.Queue;
27+
import java.util.*;
3028
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.function.Consumer;
3130
import java.util.logging.Logger;
3231

3332
public class HistoricStreamObserver implements LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> {
3433

3534
private final Logger logger = Logger.getLogger(getClass().getName());
3635

37-
private final StreamMediator<BlockStreamServiceGrpcProto.Block> mediator;
36+
private final StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> mediator;
3837
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
39-
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
40-
41-
private final Queue<BlockStreamServiceGrpcProto.Block> historicStream;
42-
43-
private final Queue<BlockStreamServiceGrpcProto.Block> blocks = new LinkedBlockingQueue<>();
4438

4539
public HistoricStreamObserver(
46-
StreamMediator<BlockStreamServiceGrpcProto.Block> mediator,
47-
StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver,
48-
BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
40+
StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> mediator,
41+
StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
4942

5043
this.mediator = mediator;
5144
this.responseStreamObserver = responseStreamObserver;
52-
this.blockPersistenceHandler = blockPersistenceHandler;
53-
54-
this.historicStream = new LinkedList<>();
5545
}
5646

5747
@Override
5848
public void notify(BlockStreamServiceGrpcProto.Block block) {
59-
blocks.add(block);
49+
this.responseStreamObserver.onNext(block);
6050
}
6151

6252
@Override
6353
public void onNext(BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
6454
logger.info("Received response block " + blockResponse);
65-
66-
while (true) {
67-
while (!blocks.isEmpty()) {
68-
responseStreamObserver.onNext(blocks.poll());
69-
}
70-
71-
try {
72-
Thread.sleep(Duration.ofMillis(50));
73-
} catch (InterruptedException e) {
74-
throw new RuntimeException(e);
75-
}
76-
}
7755
}
7856

79-
// @Override
80-
// public void onNext(BlockStreamServiceGrpcProto.Block block) {
81-
// if (firstLiveBlockId == -1) {
82-
// firstLiveBlockId = block.getId();
83-
// }
84-
//
85-
// // Buffer the live blocks
86-
// liveStream.add(block);
87-
//
88-
// if (counter < firstLiveBlockId - 1) {
89-
//
90-
// // Fetch the historic blocks
91-
// historicStream.addAll(blockPersistenceHandler.readRange(1, firstLiveBlockId - 1));
92-
// while (historicStream.peek() != null) {
93-
// // Send the historic blocks downstream to the subscriber
94-
// outboundStreamObserver.onNext(historicStream.poll());
95-
// counter++;
96-
// }
97-
// } else {
98-
//
99-
// while (liveStream.peek() != null) {
100-
// // Send the buffered blocks downstream to the subscriber
101-
// outboundStreamObserver.onNext(liveStream.poll());
102-
// }
103-
// }
104-
//
105-
// this.mediator.subscribe(this::onNext);
106-
// }
107-
10857
@Override
10958
public void onError(Throwable t) {
110-
// outboundStreamObserver.onError(t);
59+
logger.severe("onError: " + t.getMessage());
60+
mediator.unsubscribe(this);
11161
}
11262

11363
@Override
11464
public void onCompleted() {
115-
logger.info("Completed");
116-
// outboundStreamObserver.onCompleted();
65+
logger.info("gRPC connection completed. Unsubscribing observer.");
66+
mediator.unsubscribe(this);
67+
logger.info("Unsubscribed observer.");
11768
}
118-
119-
120-
// private static class StreamGenerator {
121-
//
122-
// private final int inactivityLimit;
123-
// private final BlockPersistenceHandler blockPersistenceHandler;
124-
// private final StreamObserver<BlockStreamServiceGrpcProto.Block> observer;
125-
//
126-
// StreamGenerator(int inactivityLimit,
127-
// BlockPersistenceHandler blockPersistenceHandler,
128-
// StreamObserver<BlockStreamServiceGrpcProto.Block> observer) {
129-
// this.inactivityLimit = inactivityLimit;
130-
// this.blockPersistenceHandler = blockPersistenceHandler;
131-
// this.observer = observer;
132-
// }
133-
//
134-
// public void generate(long blockId) {
135-
//
136-
// AtomicInteger inactivityCounter = new AtomicInteger();
137-
// while (true) {
138-
// Queue<BlockStreamServiceGrpcProto.Block> blocks = blockPersistenceHandler.readFrom(blockId, x -> true);
139-
// blockId = generate(blockId, inactivityCounter, blocks);
140-
// if (blockId == -1) {
141-
// break;
142-
// }
143-
// }
144-
// }
145-
//
146-
// private long generate(long blockId, AtomicInteger inactivityCounter, Queue<BlockStreamServiceGrpcProto.Block> blocks) {
147-
//
148-
// if (blocks.isEmpty()) {
149-
// if (inactivityCounter.get() >= inactivityLimit) {
150-
//// logger.debug("Thread inactivity limit reached. Disconnecting client.");
151-
//// logger.debug("Object: " + this);
152-
// observer.onCompleted();
153-
// return -1;
154-
// }
155-
//
156-
// try {
157-
// Thread.sleep(1000);
158-
// inactivityCounter.incrementAndGet();
159-
// } catch (InterruptedException e) {
160-
// throw new RuntimeException(e);
161-
// }
162-
//
163-
// return blockId;
164-
// }
165-
//
166-
//// logger.debug("Generator: Id: " + state);
167-
//// logger.debug("Generator: block count: " + blocks.size());
168-
//
169-
// inactivityCounter.set(0);
170-
//
171-
// for (BlockStreamServiceGrpcProto.Block block : blocks) {
172-
// observer.onNext(block);
173-
// blockId = block.getId();
174-
// }
175-
//
176-
// blockId++;
177-
//
178-
// return blockId;
179-
// }
180-
// }
18169
}

0 commit comments

Comments
 (0)