Skip to content

Commit 713beed

Browse files
fix: only send blockitems after the beginning of the next block is reached
Signed-off-by: Matt Peterson <[email protected]>
1 parent c2828fa commit 713beed

File tree

3 files changed

+20
-6
lines changed

3 files changed

+20
-6
lines changed

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4545

4646
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4747

48+
private boolean streamStarted;
49+
4850
/**
4951
* Constructor for the LiveStreamObserverImpl class.
5052
*
@@ -96,11 +98,19 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
9698
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
9799
producerLivenessMillis = currentMillis;
98100

99-
// Construct a response
100-
final SubscribeStreamResponse subscribeStreamResponse =
101-
SubscribeStreamResponse.newBuilder().setBlockItem(event.get()).build();
101+
// Only start sending BlockItems after we've reached
102+
// the beginning of a block.
103+
final BlockItem blockItem = event.get();
104+
if (!streamStarted && blockItem.hasHeader()) {
105+
streamStarted = true;
106+
}
107+
108+
if (streamStarted) {
109+
final SubscribeStreamResponse subscribeStreamResponse =
110+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
102111

103-
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
112+
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
113+
}
104114
}
105115
}
106116

server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static com.hedera.block.server.util.TestClock.buildClockOutsideWindow;
2323
import static org.mockito.Mockito.*;
2424

25+
import com.hedera.block.protos.BlockStreamService;
2526
import com.hedera.block.server.data.ObjectEvent;
2627
import com.hedera.block.server.mediator.StreamMediator;
2728
import io.grpc.stub.StreamObserver;
@@ -51,7 +52,9 @@ public void testProducerTimeoutWithinWindow() {
5152
streamMediator,
5253
responseStreamObserver);
5354

54-
final BlockItem blockItem = BlockItem.newBuilder().build();
55+
final BlockStreamService.BlockHeader blockHeader =
56+
BlockStreamService.BlockHeader.newBuilder().setBlockNumber(1).build();
57+
final BlockItem blockItem = BlockItem.newBuilder().setHeader(blockHeader).build();
5558
when(objectEvent.get()).thenReturn(blockItem);
5659

5760
final SubscribeStreamResponse subscribeStreamResponse =

server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public void testMediatorPublishEventToSubscribers() throws InterruptedException
146146
streamMediator.isSubscribed(concreteObserver3),
147147
"Expected the mediator to have observer3 subscribed");
148148

149-
final BlockItem blockItem = BlockItem.newBuilder().build();
149+
final BlockHeader blockHeader = BlockHeader.newBuilder().setBlockNumber(1).build();
150+
final BlockItem blockItem = BlockItem.newBuilder().setHeader(blockHeader).build();
150151
final SubscribeStreamResponse subscribeStreamResponse =
151152
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
152153

0 commit comments

Comments
 (0)