Skip to content

Commit 9fa5a54

Browse files
fix: consumers will only start receiving blockitems at the beginning of the next full block
Signed-off-by: Matt Peterson <[email protected]>
1 parent 7dad6af commit 9fa5a54

File tree

4 files changed

+36
-19
lines changed

4 files changed

+36
-19
lines changed

protos/src/main/protobuf/blockstream.proto

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,10 @@ message SubscribeStreamResponse {
4848
* It will be replaced with a PBJ implementation in the future.
4949
*/
5050
message BlockItem {
51-
/**
52-
* The id of the block. Each block id should be unique.
53-
*/
54-
int64 id = 1;
55-
56-
/**
57-
* The value of the block. The value can be any string.
58-
*/
59-
string value = 2;
51+
52+
int64 block_header = 1;
53+
54+
uint64 id = 2;
55+
56+
string value = 3;
6057
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
3939
private final StreamObserver<BlockItem> subscribeStreamResponseObserver;
4040

4141
private final long timeoutThresholdMillis;
42-
4342
private final InstantSource producerLivenessClock;
4443
private long producerLivenessMillis;
4544

4645
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
4746

4847
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
4948

49+
private boolean isReachedFirstBlock;
50+
5051
/**
5152
* Constructor for the LiveStreamObserverImpl class.
5253
*
@@ -91,16 +92,23 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
9192

9293
// Refresh the producer liveness and pass the block to the observer.
9394
producerLivenessMillis = producerLivenessClock.millis();
94-
subscribeStreamResponseObserver.onNext(event.get());
95+
96+
final BlockItem blockItem = event.get();
97+
if (!isReachedFirstBlock && blockItem.getBlockHeader() > 0) {
98+
isReachedFirstBlock = true;
99+
}
100+
101+
if (isReachedFirstBlock) {
102+
subscribeStreamResponseObserver.onNext(blockItem);
103+
}
95104
}
96105

97106
/**
98107
* The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream.
99108
*
100-
* @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer.
101109
*/
102110
@Override
103-
public void onNext(final SubscribeStreamRequest blockItemResponse) {
111+
public void onNext(final SubscribeStreamRequest subscribeStreamRequest) {
104112

105113
// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
106114
if (isThresholdExceeded(producerLivenessMillis)) {

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void publishEvent(BlockItem blockItem) {
7272
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));
7373

7474
// Block persistence
75-
blockPersistenceHandler.persist(blockItem);
75+
// blockPersistenceHandler.persist(blockItem);
7676
}
7777

7878
@Override

server/src/test/resources/producer.sh

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,27 @@ trap cleanup SIGINT
4343
(
4444
iter=$1
4545
while true; do
46-
echo "{\"block_item\": {\"id\": $iter,\"value\": \"est dolor nulla\"}}"
4746

48-
if [ $iter -eq $2 ]; then
49-
exit 0
50-
fi
47+
# Generate 10 BlockItems per Block
48+
for ((i=1; i<=10; i++))
49+
do
50+
51+
if [[ i -eq 1 ]]; then
52+
echo "{\"block_item\": {\"block_header\": $iter,\"id\": $i,\"value\": \"Lorem ipsum dolor sit amet\"}}"
53+
else
54+
echo "{\"block_item\": {\"block_header\": -1,\"id\": $i,\"value\": \"est dolor nulla\"}}"
55+
fi
56+
57+
58+
if [ $iter -eq $2 ]; then
59+
exit 0
60+
fi
61+
62+
sleep 0.5
63+
done
5164

5265
((iter++))
5366

54-
sleep 1
5567
done
5668
) | grpcurl -vv -plaintext -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD &
5769

0 commit comments

Comments
 (0)