Skip to content

Commit 58d2a3f

Browse files
committed
Collapsing 3 commits together.
Signed-off-by: a-saksena <[email protected]>
1 parent ef9e16d commit 58d2a3f

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import com.hedera.block.server.consumer.LiveStreamObserver;
2222
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
2323
import com.hedera.block.server.mediator.StreamMediator;
24+
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2425
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
26+
import io.grpc.Status;
2527
import io.grpc.stub.StreamObserver;
2628
import io.helidon.webserver.grpc.GrpcService;
2729

2830
import java.time.Clock;
31+
import java.util.Optional;
2932

3033
import static io.helidon.webserver.grpc.ResponseHelper.complete;
3134

@@ -45,6 +48,7 @@ public class BlockStreamService implements GrpcService {
4548

4649
private final long timeoutThresholdMillis;
4750
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
51+
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
4852

4953
/**
5054
* Constructor for the BlockStreamService class.
@@ -53,10 +57,12 @@ public class BlockStreamService implements GrpcService {
5357
* @param streamMediator the stream mediator
5458
*/
5559
public BlockStreamService(final long timeoutThresholdMillis,
56-
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
60+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
61+
final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
5762

5863
this.timeoutThresholdMillis = timeoutThresholdMillis;
5964
this.streamMediator = streamMediator;
65+
this.blockPersistenceHandler = blockPersistenceHandler;
6066
}
6167

6268
/**
@@ -133,11 +139,22 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f
133139
return streamObserver;
134140
}
135141

136-
private void getBlock(BlockStreamServiceGrpcProto.BlockRequest request, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
142+
private void getBlock(BlockStreamServiceGrpcProto.Block block, StreamObserver<BlockStreamServiceGrpcProto.Block> responseObserver) {
137143
String message = "GET BLOCK RESPONSE! ";
138144
LOGGER.log(System.Logger.Level.INFO, "GetBlock request received");
139-
BlockStreamServiceGrpcProto.Block response = BlockStreamServiceGrpcProto.Block.newBuilder().setValue(message).build();
140-
complete(responseObserver, response);
145+
Optional<BlockStreamServiceGrpcProto.Block> responseBlock = blockPersistenceHandler.read(block.getId());
146+
if(responseBlock.isPresent()) {
147+
LOGGER.log(System.Logger.Level.INFO, "SENDING BLOCK # " + block.getId());
148+
complete(responseObserver, responseBlock.get()); // TODO: Should return int and not quoted string
149+
} else {
150+
LOGGER.log(System.Logger.Level.INFO, "DID NOT FIND YOUR BLOCK");
151+
// TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND
152+
responseObserver.onError(Status.NOT_FOUND
153+
.withDescription("DID NOT FIND YOUR BLOCK")
154+
.asRuntimeException()
155+
);
156+
// complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance());
157+
}
141158
}
142159
}
143160

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,11 @@ public static void main(final String[] args) {
6363

6464
// Initialize the block storage, cache, and service
6565
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
66+
67+
// TODO: Make timeoutThresholdMillis configurable
6668
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
67-
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
69+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)),
70+
new WriteThroughCacheHandler(blockStorage));
6871

6972
// Start the web server
7073
WebServer.builder()

0 commit comments

Comments
 (0)