Skip to content

Commit a82c2e6

Browse files
refactor: added system-wide exception handling. IOExceptions will notify the producer, all consumers and unsubscribe all subscribers
Signed-off-by: Matt Peterson <[email protected]>
1 parent c1a92fd commit a82c2e6

File tree

12 files changed

+361
-198
lines changed

12 files changed

+361
-198
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class BlockStreamService implements GrpcService {
4545

4646
private final long timeoutThresholdMillis;
4747
private final ItemAckBuilder itemAckBuilder;
48-
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
48+
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
4949

5050
/**
5151
* Constructor for the BlockStreamService class.
@@ -55,7 +55,7 @@ public class BlockStreamService implements GrpcService {
5555
public BlockStreamService(
5656
final long timeoutThresholdMillis,
5757
final ItemAckBuilder itemAckBuilder,
58-
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator) {
58+
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
5959
this.timeoutThresholdMillis = timeoutThresholdMillis;
6060
this.itemAckBuilder = itemAckBuilder;
6161
this.streamMediator = streamMediator;

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.helidon.webserver.WebServer;
3030
import io.helidon.webserver.grpc.GrpcRouting;
3131
import java.io.IOException;
32+
import java.util.concurrent.ConcurrentHashMap;
3233

3334
/** Main class for the block node server */
3435
public class Server {
@@ -54,60 +55,61 @@ private Server() {}
5455
*/
5556
public static void main(final String[] args) {
5657

57-
try {
58+
// Set the global configuration
59+
final Config config = Config.create();
60+
Config.global(config);
61+
62+
// Build the gRPC service
63+
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config);
64+
65+
// Start the web server
66+
WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build();
67+
68+
webServer.start();
69+
// .start();
70+
71+
}
72+
73+
private static GrpcRouting.Builder buildGrpcRouting(final Config config) {
5874

59-
// Set the global configuration
60-
final Config config = Config.create();
61-
Config.global(config);
62-
63-
// Get Timeout threshold from configuration
64-
final long consumerTimeoutThreshold =
65-
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY)
66-
.asLong()
67-
.orElse(1500L);
68-
69-
// Initialize the reader and writer for the block storage
70-
final BlockWriter<BlockItem> blockWriter =
71-
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
72-
final BlockReader<Block> blockReader =
73-
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
74-
75-
final BlockStreamService blockStreamService =
76-
new BlockStreamService(
77-
consumerTimeoutThreshold,
78-
new ItemAckBuilder(),
79-
new LiveStreamMediatorImpl(
80-
new FileSystemPersistenceHandler(blockReader, blockWriter),
81-
(streamMediator) -> {
82-
LOGGER.log(
83-
System.Logger.Level.ERROR,
84-
"Shutting down the server due to an error.");
85-
}));
86-
87-
// Start the web server
88-
WebServer.builder()
89-
.port(8080)
90-
.addRouting(
91-
GrpcRouting.builder()
92-
.service(blockStreamService)
93-
.bidi(
94-
com.hedera.block.protos.BlockStreamService
95-
.getDescriptor(),
96-
SERVICE_NAME,
97-
CLIENT_STREAMING_METHOD_NAME,
98-
clientBidiStreamingMethod)
99-
.serverStream(
100-
com.hedera.block.protos.BlockStreamService
101-
.getDescriptor(),
102-
SERVICE_NAME,
103-
SERVER_STREAMING_METHOD_NAME,
104-
serverStreamingMethod))
105-
.build()
106-
.start();
107-
108-
} catch (IOException e) {
109-
LOGGER.log(System.Logger.Level.ERROR, "An exception was thrown starting the server", e);
110-
throw new RuntimeException(e);
75+
try {
76+
final BlockStreamService blockStreamService = buildBlockStreamService(config);
77+
return GrpcRouting.builder()
78+
.service(blockStreamService)
79+
.bidi(
80+
com.hedera.block.protos.BlockStreamService.getDescriptor(),
81+
SERVICE_NAME,
82+
CLIENT_STREAMING_METHOD_NAME,
83+
clientBidiStreamingMethod)
84+
.serverStream(
85+
com.hedera.block.protos.BlockStreamService.getDescriptor(),
86+
SERVICE_NAME,
87+
SERVER_STREAMING_METHOD_NAME,
88+
serverStreamingMethod);
89+
} catch (IOException io) {
90+
LOGGER.log(
91+
System.Logger.Level.ERROR, "An exception was thrown starting the server", io);
92+
throw new RuntimeException(io);
11193
}
11294
}
95+
96+
private static BlockStreamService buildBlockStreamService(final Config config)
97+
throws IOException {
98+
// Get Timeout threshold from configuration
99+
final long consumerTimeoutThreshold =
100+
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
101+
102+
// Initialize the reader and writer for the block storage
103+
final BlockWriter<BlockItem> blockWriter =
104+
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
105+
final BlockReader<Block> blockReader =
106+
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
107+
108+
return new BlockStreamService(
109+
consumerTimeoutThreshold,
110+
new ItemAckBuilder(),
111+
new LiveStreamMediatorImpl(
112+
new ConcurrentHashMap<>(32),
113+
new FileSystemPersistenceHandler(blockReader, blockWriter)));
114+
}
113115
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import io.grpc.stub.ServerCallStreamObserver;
2525
import io.grpc.stub.StreamObserver;
2626
import java.time.InstantSource;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2728

2829
/**
2930
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
3031
* the downstream consumer via the notify method and manage the bidirectional stream to the consumer
3132
* via the onNext, onError, and onCompleted methods.
3233
*/
33-
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockItem>> {
34+
public class ConsumerBlockItemObserver
35+
implements BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> {
3436

3537
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3638

@@ -40,9 +42,10 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4042
private final InstantSource producerLivenessClock;
4143
private long producerLivenessMillis;
4244

43-
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
45+
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
4446

4547
private boolean streamStarted;
48+
private AtomicBoolean isEventTransitioning = new AtomicBoolean(false);
4649

4750
/**
4851
* Constructor for the LiveStreamObserverImpl class.
@@ -52,7 +55,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
5255
public ConsumerBlockItemObserver(
5356
final long timeoutThresholdMillis,
5457
final InstantSource producerLivenessClock,
55-
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
58+
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
5659
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
5760

5861
this.timeoutThresholdMillis = timeoutThresholdMillis;
@@ -84,7 +87,10 @@ public ConsumerBlockItemObserver(
8487

8588
/** Pass the block to the observer provided by Helidon */
8689
@Override
87-
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {
90+
public void onEvent(
91+
final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {
92+
93+
isEventTransitioning.set(true);
8894

8995
final long currentMillis = producerLivenessClock.millis();
9096
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
@@ -97,22 +103,31 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
97103

98104
// Only start sending BlockItems after we've reached
99105
// the beginning of a block.
100-
final BlockItem blockItem = event.get();
106+
final SubscribeStreamResponse subscribeStreamResponse = event.get();
107+
final BlockItem blockItem = subscribeStreamResponse.getBlockItem();
101108
if (!streamStarted && blockItem.hasHeader()) {
102109
streamStarted = true;
103110
}
104111

105112
if (streamStarted) {
106-
final SubscribeStreamResponse subscribeStreamResponse =
107-
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
108-
109113
LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem);
110-
111114
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
112115
}
113116
}
117+
118+
isEventTransitioning.set(false);
114119
}
115120

116121
@Override
117-
public void awaitShutdown() throws InterruptedException {}
122+
public void awaitShutdown() throws InterruptedException {
123+
while (isEventTransitioning.get()) {
124+
try {
125+
Thread.sleep(1);
126+
} catch (InterruptedException e) {
127+
LOGGER.log(
128+
System.Logger.Level.ERROR,
129+
"Interrupted while waiting for event to complete");
130+
}
131+
}
132+
}
118133
}

0 commit comments

Comments
 (0)