Skip to content

Commit 73b0ce0

Browse files
fix: using additional hedera-protobuf types
Signed-off-by: Matt Peterson <[email protected]>
1 parent 133167a commit 73b0ce0

File tree

8 files changed

+68
-29
lines changed

8 files changed

+68
-29
lines changed

protos/src/main/protobuf/blockstream.proto

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ message PublishStreamRequest {
3131
}
3232

3333
message PublishStreamResponse {
34-
uint64 block_number = 1;
34+
ItemAcknowledgement acknowledgement = 1;
35+
}
36+
37+
message ItemAcknowledgement {
38+
bytes item_ack = 1;
3539
}
3640

3741
message SubscribeStreamRequest {
@@ -40,6 +44,7 @@ message SubscribeStreamRequest {
4044

4145
message SubscribeStreamResponse {
4246
int32 status = 1;
47+
BlockItem block_item = 2;
4348
}
4449

4550
/**
@@ -51,7 +56,5 @@ message BlockItem {
5156

5257
int64 block_header = 1;
5358

54-
uint64 id = 2;
55-
56-
string value = 3;
59+
string value = 2;
5760
}

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
2121
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
2222
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
23+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
2324
import static com.hedera.block.server.Constants.*;
2425

2526
import com.google.protobuf.Descriptors;
@@ -101,7 +102,7 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
101102
}
102103

103104
private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(
104-
final StreamObserver<BlockItem> subscribeStreamRequestObserver) {
105+
final StreamObserver<SubscribeStreamResponse> subscribeStreamRequestObserver) {
105106
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
106107

107108
// Return a custom StreamObserver to handle streaming blocks from the producer.

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818

1919
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2020
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
21+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
2122

23+
import com.google.protobuf.Descriptors;
24+
import com.hedera.block.protos.BlockStreamService;
2225
import com.hedera.block.server.data.ObjectEvent;
2326
import com.hedera.block.server.mediator.StreamMediator;
2427
import io.grpc.stub.ServerCallStreamObserver;
@@ -37,7 +40,7 @@ public class ConsumerBlockItemObserver
3740

3841
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3942

40-
private final StreamObserver<BlockItem> subscribeStreamResponseObserver;
43+
private final StreamObserver<BlockStreamService.SubscribeStreamResponse> subscribeStreamResponseObserver;
4144

4245
private final long timeoutThresholdMillis;
4346
private final InstantSource producerLivenessClock;
@@ -58,7 +61,7 @@ public ConsumerBlockItemObserver(
5861
final long timeoutThresholdMillis,
5962
final InstantSource producerLivenessClock,
6063
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
61-
final StreamObserver<BlockItem> subscribeStreamResponseObserver) {
64+
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
6265

6366
this.timeoutThresholdMillis = timeoutThresholdMillis;
6467
this.producerLivenessClock = producerLivenessClock;
@@ -72,7 +75,7 @@ public ConsumerBlockItemObserver(
7275
// Unfortunately we have to cast the responseStreamObserver to a
7376
// ServerCallStreamObserver
7477
// to register the onCancelHandler.
75-
((ServerCallStreamObserver<BlockItem>) subscribeStreamResponseObserver)
78+
((ServerCallStreamObserver<SubscribeStreamResponse>) subscribeStreamResponseObserver)
7679
.setOnCancelHandler(
7780
() -> {
7881
LOGGER.log(
@@ -102,7 +105,12 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
102105
}
103106

104107
if (isReachedFirstBlock) {
105-
subscribeStreamResponseObserver.onNext(blockItem);
108+
SubscribeStreamResponse subscribeStreamResponse = SubscribeStreamResponse
109+
.newBuilder()
110+
.setBlockItem(blockItem)
111+
.build();
112+
113+
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
106114
}
107115
}
108116

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public LiveStreamMediatorImpl(
7373
public void publishEvent(BlockItem blockItem) {
7474

7575
// Publish the block for all subscribers to receive
76-
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem);
76+
LOGGER.log(System.Logger.Level.INFO, "Publishing BlockItem: {0}", blockItem);
7777
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));
7878

7979
// Block persistence

server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Long persist(final BlockItem blockItem) {
5151

5252
// Write-Through cache
5353
blockStorage.write(blockItem);
54-
return blockItem.getId();
54+
return blockItem.getBlockHeader();
5555
}
5656

5757
/**

server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
8181
*/
8282
@Override
8383
public Optional<Long> write(final BlockItem blockItem) {
84-
Long id = blockItem.getId();
84+
Long id = blockItem.getBlockHeader();
8585
final String fullPath = resolvePath(id);
8686

8787
try (FileOutputStream fos = new FileOutputStream(fullPath)) {

server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616

1717
package com.hedera.block.server.producer;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
21-
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
22-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
23-
19+
import com.google.protobuf.ByteString;
2420
import com.hedera.block.server.data.ObjectEvent;
2521
import com.hedera.block.server.mediator.StreamMediator;
2622
import io.grpc.stub.StreamObserver;
2723

24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.io.ObjectOutputStream;
27+
import java.security.MessageDigest;
28+
import java.security.NoSuchAlgorithmException;
29+
30+
import static com.hedera.block.protos.BlockStreamService.*;
31+
2832
/**
2933
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
3034
* service implementation. Helidon calls methods on this class as networking events occur with the
@@ -57,13 +61,24 @@ public ProducerBlockItemObserver(
5761
@Override
5862
public void onNext(final PublishStreamRequest publishStreamRequest) {
5963

60-
BlockItem blockItem = publishStreamRequest.getBlockItem();
64+
final BlockItem blockItem = publishStreamRequest.getBlockItem();
6165
streamMediator.publishEvent(blockItem);
6266

63-
// Send a response back to the upstream producer
64-
final PublishStreamResponse publishStreamResponse =
65-
PublishStreamResponse.newBuilder().setBlockNumber(blockItem.getId()).build();
66-
publishStreamResponseObserver.onNext(publishStreamResponse);
67+
try {
68+
// Send a response back to the upstream producer
69+
// TODO: Use real hash
70+
final ItemAcknowledgement itemAck =
71+
ItemAcknowledgement.newBuilder()
72+
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
73+
.build();
74+
final PublishStreamResponse publishStreamResponse =
75+
PublishStreamResponse.newBuilder()
76+
.setAcknowledgement(itemAck)
77+
.build();
78+
publishStreamResponseObserver.onNext(publishStreamResponse);
79+
} catch (IOException | NoSuchAlgorithmException e) {
80+
LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e);
81+
}
6782
}
6883

6984
/**
@@ -87,4 +102,18 @@ public void onCompleted() {
87102
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
88103
publishStreamResponseObserver.onCompleted();
89104
}
105+
106+
private static byte[] getFakeHash(BlockItem blockItem) throws IOException, NoSuchAlgorithmException {
107+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
108+
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
109+
objectOutputStream.writeObject(blockItem);
110+
}
111+
112+
// Get the serialized bytes
113+
byte[] serializedObject = byteArrayOutputStream.toByteArray();
114+
115+
// Calculate the SHA-256 hash
116+
MessageDigest digest = MessageDigest.getInstance("SHA-384");
117+
return digest.digest(serializedObject);
118+
}
90119
}

server/src/test/resources/producer.sh

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,17 @@ trap cleanup SIGINT
4949
do
5050

5151
if [[ i -eq 1 ]]; then
52-
echo "{\"block_item\": {\"block_header\": $iter,\"id\": $i,\"value\": \"Lorem ipsum dolor sit amet\"}}"
52+
echo "{\"block_item\": {\"block_header\": $iter,\"value\": \"Payload[...]\"}}"
5353
else
54-
echo "{\"block_item\": {\"block_header\": -1,\"id\": $i,\"value\": \"est dolor nulla\"}}"
55-
fi
56-
57-
58-
if [ $iter -eq $2 ]; then
59-
exit 0
54+
echo "{\"block_item\": {\"block_header\": 0,\"value\": \"Payload[...]\"}}"
6055
fi
6156

6257
sleep 0.5
6358
done
6459

60+
if [ $iter -eq $2 ]; then
61+
exit 0
62+
fi
6563
((iter++))
6664

6765
done

0 commit comments

Comments
 (0)