Skip to content

Commit eaec07d

Browse files
committed
Attempting to send a gRPC Status.NOT_FOUND on when the block does not exist.
Signed-off-by: a-saksena <[email protected]>
1 parent e39f64d commit eaec07d

File tree

4 files changed

+43
-5
lines changed

4 files changed

+43
-5
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,7 @@ gradle-app.setting
4747
# JDT-specific (Eclipse Java Development Tools)
4848
.classpath
4949

50+
.idea
51+
.DS_Store
5052
# .env files
5153
server/docker/.env

protos/src/main/protobuf/blockstream.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ service BlockStreamGrpc {
4646
* message with the id of each block received.
4747
*/
4848
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
49+
50+
rpc GetBlock(Block) returns (Block) {}
4951
}
5052

5153
/**

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@
2121
import com.hedera.block.server.consumer.LiveStreamObserver;
2222
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
2323
import com.hedera.block.server.mediator.StreamMediator;
24+
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2425
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
26+
import io.grpc.Status;
2527
import io.grpc.stub.StreamObserver;
2628
import io.helidon.webserver.grpc.GrpcService;
2729

2830
import java.time.Clock;
31+
import java.util.Optional;
32+
33+
import static io.helidon.webserver.grpc.ResponseHelper.complete;
2934

3035
import static com.hedera.block.server.Constants.*;
3136

@@ -43,6 +48,7 @@ public class BlockStreamService implements GrpcService {
4348

4449
private final long timeoutThresholdMillis;
4550
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
51+
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
4652

4753
/**
4854
* Constructor for the BlockStreamService class.
@@ -51,10 +57,12 @@ public class BlockStreamService implements GrpcService {
5157
* @param streamMediator the stream mediator
5258
*/
5359
public BlockStreamService(final long timeoutThresholdMillis,
54-
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
60+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
61+
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
5562

5663
this.timeoutThresholdMillis = timeoutThresholdMillis;
5764
this.streamMediator = streamMediator;
65+
this.blockPersistenceHandler = blockPersistenceHandler;
5866
}
5967

6068
/**
@@ -87,6 +95,7 @@ public String serviceName() {
8795
public void update(final Routing routing) {
8896
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
8997
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
98+
routing.unary("GetBlock", this::getBlock);
9099
}
91100

92101
/**
@@ -129,6 +138,24 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f
129138

130139
return streamObserver;
131140
}
141+
142+
private void getBlock(BlockStreamServiceGrpcProto.Block block, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
143+
String message = "GET BLOCK RESPONSE! ";
144+
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
145+
Optional<BlockStreamServiceGrpcProto.Block> responseBlock = blockPersistenceHandler.read(block.getId());
146+
if(responseBlock.isPresent()) {
147+
LOGGER.log(System.Logger.Level.INFO, "SENDING BLOCK # " + block.getId());
148+
complete(responseObserver, responseBlock.get()); // TODO: Should return int and not quoted string
149+
} else {
150+
LOGGER.log(System.Logger.Level.INFO, "DID NOT FIND YOUR BLOCK");
151+
// TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND
152+
responseObserver.onError(Status.NOT_FOUND
153+
.withDescription("DID NOT FIND YOUR BLOCK")
154+
.asRuntimeException()
155+
);
156+
// complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance());
157+
}
158+
}
132159
}
133160

134161

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.helidon.config.Config;
2727
import io.helidon.webserver.WebServer;
2828
import io.helidon.webserver.grpc.GrpcRouting;
29-
import io.helidon.webserver.http.HttpRouting;
3029

3130
import java.io.IOException;
3231
import java.util.stream.Stream;
@@ -64,8 +63,11 @@ public static void main(final String[] args) {
6463

6564
// Initialize the block storage, cache, and service
6665
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
66+
67+
// TODO: Make timeoutThresholdMillis configurable
6768
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
68-
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
69+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
70+
new WriteThroughCacheHandler(blockStorage));
6971

7072
// Start the web server
7173
WebServer.builder()
@@ -79,13 +81,18 @@ public static void main(final String[] args) {
7981
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
8082
SERVICE_NAME,
8183
SERVER_STREAMING_METHOD_NAME,
82-
serverBidiStreamingMethod))
84+
serverBidiStreamingMethod)
85+
.unary(BlockStreamServiceGrpcProto.getDescriptor(),
86+
"BlockStreamGrpc",
87+
"GetBlock",
88+
Server::grpcGetBlock))
8389
.build()
8490
.start();
85-
8691
} catch (IOException e) {
8792
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
8893
throw new RuntimeException(e);
8994
}
9095
}
96+
97+
static void grpcGetBlock(BlockStreamServiceGrpcProto.BlockRequest request, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {}
9198
}

0 commit comments

Comments
 (0)