Skip to content

Commit df154c3

Browse files
sanding and javadoc
Signed-off-by: Matt Peterson <[email protected]>
1 parent 7f207a3 commit df154c3

24 files changed

+689
-360
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,71 +20,99 @@
2020

2121
import com.google.protobuf.Descriptors;
2222
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
23-
import com.hedera.block.server.observe.InboundBlockStreamObserver;
24-
import com.hedera.block.server.observe.mediate.StreamMediator;
25-
import com.hedera.block.server.observe.mediate.StreamMediatorImpl;
26-
import com.hedera.block.server.observe.subscribe.HistoricStreamObserver;
27-
import com.hedera.block.server.observe.subscribe.LiveStreamObserver;
28-
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
29-
import com.hedera.block.server.persistence.cache.BlockCache;
30-
import com.hedera.block.server.persistence.cache.LRUCache;
31-
import com.hedera.block.server.persistence.storage.BlockStorage;
32-
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
23+
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
24+
import com.hedera.block.server.mediator.StreamMediator;
25+
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
26+
import com.hedera.block.server.consumer.LiveStreamObserver;
3327
import io.grpc.stub.StreamObserver;
34-
import io.helidon.config.Config;
3528
import io.helidon.webserver.grpc.GrpcService;
3629

37-
import java.io.IOException;
3830
import java.util.logging.Logger;
3931

4032
import static com.hedera.block.server.Constants.*;
4133

34+
/**
35+
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
36+
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
37+
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
38+
*
39+
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
40+
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
41+
*
42+
*/
4243
public class BlockStreamService implements GrpcService {
4344

4445
private final Logger logger = Logger.getLogger(getClass().getName());
45-
private final StreamMediator<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamMediator;
46-
47-
public BlockStreamService() {
48-
try {
49-
50-
Config config = Config.create();
51-
Config.global(config);
52-
53-
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
54-
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(10L);
55-
this.streamMediator = new StreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
56-
57-
} catch (IOException e) {
58-
throw new RuntimeException(e);
59-
}
46+
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
47+
48+
/**
49+
* Constructor for the BlockStreamService class. It initializes the StreamMediator.
50+
*
51+
* @param streamMediator the stream mediator
52+
*/
53+
public BlockStreamService(StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
54+
this.streamMediator = streamMediator;
6055
}
6156

57+
/**
58+
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
59+
*
60+
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
61+
*/
6262
@Override
6363
public Descriptors.FileDescriptor proto() {
6464
return BlockStreamServiceGrpcProto.getDescriptor();
6565
}
6666

67+
/**
68+
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in the proto file.
69+
*
70+
* @return the service name
71+
*/
6772
@Override
6873
public String serviceName() {
6974
return SERVICE_NAME;
7075
}
7176

77+
/**
78+
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
79+
*
80+
* @param routing the routing for the BlockStreamService
81+
*/
7282
@Override
7383
public void update(Routing routing) {
7484
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
7585
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
7686
}
7787

88+
/**
89+
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
90+
*
91+
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses back to the producer.
92+
*
93+
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumers
94+
* via the streamMediator as well as sending responses back to the producer.
95+
*/
7896
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
79-
logger.info("Executing bidirectional streamSink method");
97+
logger.finer("Executing bidirectional streamSink method");
8098

81-
return new InboundBlockStreamObserver(streamMediator, responseStreamObserver);
99+
return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
82100
}
83101

102+
/**
103+
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
104+
*
105+
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses from the consumer
106+
* back to the server.
107+
*
108+
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well as
109+
* handling responses from the consumer.
110+
*/
84111
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
85-
logger.info("Executing bidirectional streamSource method");
112+
logger.finer("Executing bidirectional streamSource method");
86113

87-
LiveStreamObserver<BlockStreamServiceGrpcProto.BlockResponse, BlockStreamServiceGrpcProto.Block> streamObserver = new HistoricStreamObserver(
114+
// Return a custom StreamObserver to handle streaming blocks from the producer.
115+
LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
88116
streamMediator,
89117
responseStreamObserver);
90118
streamMediator.subscribe(streamObserver);

server/src/main/java/com/hedera/block/server/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private Constants() {}
2424
// Config Constants
2525
public static String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
2626

27-
// Stable names
27+
// Constants specified in the service definition of the .proto file
2828
public static String SERVICE_NAME = "BlockStreamGrpc";
2929
public static String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
3030
public static String SERVER_STREAMING_METHOD_NAME = "StreamSource";

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919
package com.hedera.block.server;
2020

2121
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
22+
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
23+
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
24+
import com.hedera.block.server.persistence.cache.BlockCache;
25+
import com.hedera.block.server.persistence.cache.LRUCache;
26+
import com.hedera.block.server.persistence.storage.BlockStorage;
27+
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
2228
import io.grpc.stub.ServerCalls;
2329
import io.grpc.stub.StreamObserver;
30+
import io.helidon.config.Config;
2431
import io.helidon.webserver.WebServer;
2532
import io.helidon.webserver.grpc.GrpcRouting;
2633
import io.helidon.webserver.http.HttpRouting;
2734

35+
import java.io.IOException;
36+
import java.util.logging.Logger;
2837
import java.util.stream.Stream;
2938

3039
import static com.hedera.block.server.Constants.*;
@@ -38,31 +47,49 @@ public class Server {
3847
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
3948
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
4049

41-
private Server() {
42-
// Not meant to be instantiated
43-
}
50+
private static final Logger logger = Logger.getLogger(Server.class.getName());
51+
52+
private Server() {}
4453

4554
/**
4655
* Main entrypoint for the block node server
4756
*
4857
* @param args Command line arguments. Not used at present,
4958
*/
5059
public static void main(String[] args) {
51-
WebServer.builder()
52-
.port(8080)
53-
.addRouting(HttpRouting.builder()
54-
.get("/greet", (req, res) -> res.send("Hello World!")))
55-
.addRouting(GrpcRouting.builder()
56-
.service(new BlockStreamService())
57-
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
58-
SERVICE_NAME,
59-
CLIENT_STREAMING_METHOD_NAME,
60-
clientBidiStreamingMethod)
61-
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
62-
SERVICE_NAME,
63-
SERVER_STREAMING_METHOD_NAME,
64-
serverBidiStreamingMethod))
65-
.build()
66-
.start();
60+
61+
try {
62+
63+
// Set the global configuration
64+
Config config = Config.create();
65+
Config.global(config);
66+
67+
// Initialize the block storage, cache, and service
68+
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
69+
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(1000);
70+
BlockStreamService blockStreamService = new BlockStreamService(new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)));
71+
72+
// Start the web server
73+
WebServer.builder()
74+
.port(8080)
75+
.addRouting(HttpRouting.builder()
76+
.get("/greet", (req, res) -> res.send("Hello World!")))
77+
.addRouting(GrpcRouting.builder()
78+
.service(blockStreamService)
79+
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
80+
SERVICE_NAME,
81+
CLIENT_STREAMING_METHOD_NAME,
82+
clientBidiStreamingMethod)
83+
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
84+
SERVICE_NAME,
85+
SERVER_STREAMING_METHOD_NAME,
86+
serverBidiStreamingMethod))
87+
.build()
88+
.start();
89+
90+
} catch (IOException e) {
91+
logger.severe("There was an exception starting the server: " + e.getMessage());
92+
throw new RuntimeException(e);
93+
}
6794
}
6895
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Hedera Block Node
3+
*
4+
* Copyright (C) 2024 Hedera Hashgraph, LLC
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.hedera.block.server.consumer;
20+
21+
import io.grpc.stub.StreamObserver;
22+
23+
/**
24+
* The LiveStreamObserver interface augments the StreamObserver interface with the notify() method thereby
25+
* allowing a caller to pass a block to the observer of a different type than the StreamObserver. In this way,
26+
* the implementation of this interface can receive and process inbound messages with different types from
27+
* the producer and response messages from the consumer.
28+
*
29+
* @param <U> - the type of the block
30+
* @param <V> - the type of the StreamObserver
31+
*/
32+
public interface LiveStreamObserver<U, V> extends StreamObserver<V> {
33+
34+
/**
35+
* Pass the block to the observer.
36+
*
37+
* @param block - the block to be passed to the observer
38+
*/
39+
void notify(U block);
40+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Hedera Block Node
3+
*
4+
* Copyright (C) 2024 Hedera Hashgraph, LLC
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.hedera.block.server.consumer;
20+
21+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
22+
import com.hedera.block.server.mediator.StreamMediator;
23+
import io.grpc.stub.StreamObserver;
24+
25+
import java.util.logging.Logger;
26+
27+
/**
28+
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
29+
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
30+
*
31+
*/
32+
public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> {
33+
34+
private final Logger logger = Logger.getLogger(getClass().getName());
35+
36+
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator;
37+
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
38+
39+
/**
40+
* Constructor for the LiveStreamObserverImpl class.
41+
*
42+
* @param mediator the mediator
43+
* @param responseStreamObserver the response stream observer
44+
*
45+
*/
46+
public LiveStreamObserverImpl(
47+
StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
48+
StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
49+
50+
this.mediator = mediator;
51+
this.responseStreamObserver = responseStreamObserver;
52+
}
53+
54+
/**
55+
* Pass the block to the observer provided by Helidon
56+
*
57+
* @param block - the block to be passed to the observer
58+
*/
59+
@Override
60+
public void notify(BlockStreamServiceGrpcProto.Block block) {
61+
this.responseStreamObserver.onNext(block);
62+
}
63+
64+
/**
65+
* The onNext() method is triggered by Helidon when the consumer sends a blockResponse via the bidirectional stream.
66+
*
67+
* @param blockResponse - the BlockResponse passed to the server via the bidirectional stream to the downstream consumer
68+
*/
69+
@Override
70+
public void onNext(BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
71+
logger.finer("Received response block " + blockResponse);
72+
}
73+
74+
/**
75+
* The onError() method is triggered by Helidon when an error occurs on the bidirectional stream to the downstream consumer.
76+
* Unsubscribe the observer from the mediator.
77+
*
78+
* @param t the error occurred on the stream
79+
*/
80+
@Override
81+
public void onError(Throwable t) {
82+
logger.severe("onError: " + t.getMessage());
83+
mediator.unsubscribe(this);
84+
}
85+
86+
/**
87+
* The onCompleted() method is triggered by Helidon when the bidirectional stream to the downstream consumer is completed.
88+
* Unsubscribe the observer from the mediator.
89+
*
90+
*/
91+
@Override
92+
public void onCompleted() {
93+
logger.finer("gRPC connection completed. Unsubscribing observer.");
94+
mediator.unsubscribe(this);
95+
logger.finer("Unsubscribed observer.");
96+
}
97+
}

0 commit comments

Comments
 (0)