Skip to content

Commit b1ea596

Browse files
Added hedera user to the container. Added rethrow of IOException in the reader. Now the server will stop when IOExceptions are thrown upstream
Signed-off-by: Matt Peterson <[email protected]>
1 parent a82c2e6 commit b1ea596

File tree

10 files changed

+102
-76
lines changed

10 files changed

+102
-76
lines changed

server/docker/Dockerfile

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
# Use Eclipse Temurin with Java 21 as the base image
22
FROM eclipse-temurin:21
33

4+
# Expose the port that the application will run on
5+
EXPOSE 8080
6+
47
# Define version
58
ARG VERSION
69

10+
ARG UNAME=hedera
11+
ARG UID=2000
12+
ARG GID=2000
13+
RUN groupadd -g $GID -o $UNAME
14+
RUN useradd -m -u $UID -g $GID -o -s /bin/bash $UNAME
15+
USER $UNAME
16+
717
# Set the working directory inside the container
818
WORKDIR /app
919

@@ -13,8 +23,5 @@ COPY --from=distributions server-${VERSION}.tar .
1323
# Extract the TAR file
1424
RUN tar -xvf server-${VERSION}.tar
1525

16-
# Expose the port that the application will run on
17-
EXPOSE 8080
18-
1926
# RUN the bin script for starting the server
2027
ENTRYPOINT ["sh", "-c", "/app/server-${VERSION}/bin/server"]

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.hedera.block.server.producer.ItemAckBuilder;
2727
import com.hedera.block.server.producer.ProducerBlockItemObserver;
2828
import io.grpc.stub.StreamObserver;
29+
import io.helidon.webserver.WebServer;
2930
import io.helidon.webserver.grpc.GrpcService;
3031
import java.time.Clock;
3132

@@ -47,6 +48,8 @@ public class BlockStreamService implements GrpcService {
4748
private final ItemAckBuilder itemAckBuilder;
4849
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
4950

51+
private WebServer webServer;
52+
5053
/**
5154
* Constructor for the BlockStreamService class.
5255
*
@@ -94,14 +97,18 @@ public void update(final Routing routing) {
9497
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
9598
}
9699

100+
public void register(final WebServer webServer) {
101+
this.webServer = webServer;
102+
}
103+
97104
StreamObserver<PublishStreamRequest> publishBlockStream(
98105
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
99106
LOGGER.log(
100107
System.Logger.Level.DEBUG,
101108
"Executing bidirectional publishBlockStream gRPC method");
102109

103110
return new ProducerBlockItemObserver(
104-
streamMediator, publishStreamResponseObserver, itemAckBuilder);
111+
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);
105112
}
106113

107114
void subscribeBlockStream(

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import static com.hedera.block.protos.BlockStreamService.*;
2020
import static com.hedera.block.server.Constants.*;
2121

22+
import com.hedera.block.server.data.ObjectEvent;
2223
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
24+
import com.hedera.block.server.mediator.StreamMediator;
2325
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
2426
import com.hedera.block.server.persistence.storage.*;
2527
import com.hedera.block.server.producer.ItemAckBuilder;
@@ -59,57 +61,60 @@ public static void main(final String[] args) {
5961
final Config config = Config.create();
6062
Config.global(config);
6163

62-
// Build the gRPC service
63-
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(config);
64-
65-
// Start the web server
66-
WebServer webServer = WebServer.builder().port(8080).addRouting(grpcRouting).build();
67-
68-
webServer.start();
69-
// .start();
70-
64+
try {
65+
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
66+
buildStreamMediator(config);
67+
final BlockStreamService blockStreamService =
68+
buildBlockStreamService(config, streamMediator);
69+
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService);
70+
71+
// Build the web server
72+
final WebServer webServer =
73+
WebServer.builder().port(8080).addRouting(grpcRouting).build();
74+
75+
blockStreamService.register(webServer);
76+
77+
// Start the web server
78+
webServer.start();
79+
} catch (IOException e) {
80+
throw new RuntimeException(e);
81+
}
7182
}
7283

73-
private static GrpcRouting.Builder buildGrpcRouting(final Config config) {
84+
private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
85+
buildStreamMediator(final Config config) throws IOException {
86+
return new LiveStreamMediatorImpl(
87+
new ConcurrentHashMap<>(32),
88+
new FileSystemPersistenceHandler(
89+
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
90+
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config)));
91+
}
7492

75-
try {
76-
final BlockStreamService blockStreamService = buildBlockStreamService(config);
77-
return GrpcRouting.builder()
78-
.service(blockStreamService)
79-
.bidi(
80-
com.hedera.block.protos.BlockStreamService.getDescriptor(),
81-
SERVICE_NAME,
82-
CLIENT_STREAMING_METHOD_NAME,
83-
clientBidiStreamingMethod)
84-
.serverStream(
85-
com.hedera.block.protos.BlockStreamService.getDescriptor(),
86-
SERVICE_NAME,
87-
SERVER_STREAMING_METHOD_NAME,
88-
serverStreamingMethod);
89-
} catch (IOException io) {
90-
LOGGER.log(
91-
System.Logger.Level.ERROR, "An exception was thrown starting the server", io);
92-
throw new RuntimeException(io);
93-
}
93+
private static GrpcRouting.Builder buildGrpcRouting(
94+
final BlockStreamService blockStreamService) {
95+
96+
return GrpcRouting.builder()
97+
.service(blockStreamService)
98+
.bidi(
99+
com.hedera.block.protos.BlockStreamService.getDescriptor(),
100+
SERVICE_NAME,
101+
CLIENT_STREAMING_METHOD_NAME,
102+
clientBidiStreamingMethod)
103+
.serverStream(
104+
com.hedera.block.protos.BlockStreamService.getDescriptor(),
105+
SERVICE_NAME,
106+
SERVER_STREAMING_METHOD_NAME,
107+
serverStreamingMethod);
94108
}
95109

96-
private static BlockStreamService buildBlockStreamService(final Config config)
97-
throws IOException {
110+
private static BlockStreamService buildBlockStreamService(
111+
final Config config,
112+
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator) {
98113
// Get Timeout threshold from configuration
99114
final long consumerTimeoutThreshold =
100115
config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
101116

102-
// Initialize the reader and writer for the block storage
103-
final BlockWriter<BlockItem> blockWriter =
104-
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
105-
final BlockReader<Block> blockReader =
106-
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
107-
108117
return new BlockStreamService(
109-
consumerTimeoutThreshold,
110-
new ItemAckBuilder(),
111-
new LiveStreamMediatorImpl(
112-
new ConcurrentHashMap<>(32),
113-
new FileSystemPersistenceHandler(blockReader, blockWriter)));
118+
consumerTimeoutThreshold, new ItemAckBuilder(), streamMediator);
114119
}
115120
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
9696
final var subscribeStreamResponse =
9797
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
9898
ringBuffer.publishEvent((event, sequence) -> event.set(subscribeStreamResponse));
99+
100+
// Persist the BlockItem
99101
blockPersistenceHandler.persist(blockItem);
100102

101103
} else {
@@ -172,11 +174,6 @@ public boolean isSubscribed(
172174
return subscribers.containsKey(handler);
173175
}
174176

175-
@Override
176-
public void register(final WebServer webServer) {
177-
this.webserver = webServer;
178-
}
179-
180177
private static SubscribeStreamResponse buildEndStreamResponse() {
181178
// The current spec does not contain a generic error code for
182179
// SubscribeStreamResponseCode.

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.hedera.block.server.mediator;
1818

1919
import com.hedera.block.server.consumer.BlockItemEventHandler;
20-
import io.helidon.webserver.WebServer;
2120
import java.io.IOException;
2221

2322
/**
@@ -42,6 +41,4 @@ public interface StreamMediator<U, V> {
4241
void unsubscribe(final BlockItemEventHandler<V> handler);
4342

4443
boolean isSubscribed(final BlockItemEventHandler<V> handler);
45-
46-
void register(final WebServer webServer);
4744
}

server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void write(final BlockItem blockItem) throws IOException {
7070
} catch (IOException e) {
7171
LOGGER.log(
7272
System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e);
73+
throw e;
7374
}
7475
}
7576

server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.hedera.block.server.data.ObjectEvent;
2323
import com.hedera.block.server.mediator.StreamMediator;
2424
import io.grpc.stub.StreamObserver;
25+
import io.helidon.webserver.WebServer;
2526
import java.io.IOException;
2627
import java.security.NoSuchAlgorithmException;
2728

@@ -37,6 +38,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
3738
private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
3839
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
3940
private final ItemAckBuilder itemAckBuilder;
41+
private final WebServer webServer;
4042

4143
/**
4244
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
@@ -46,11 +48,13 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
4648
public ProducerBlockItemObserver(
4749
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
4850
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
49-
final ItemAckBuilder itemAckBuilder) {
51+
final ItemAckBuilder itemAckBuilder,
52+
final WebServer webServer) {
5053

5154
this.streamMediator = streamMediator;
5255
this.publishStreamResponseObserver = publishStreamResponseObserver;
5356
this.itemAckBuilder = itemAckBuilder;
57+
this.webServer = webServer;
5458
}
5559

5660
/**
@@ -94,6 +98,9 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
9498
final var errorResponse = buildErrorStreamResponse();
9599
publishStreamResponseObserver.onNext(errorResponse);
96100
LOGGER.log(System.Logger.Level.ERROR, "Exception thrown publishing BlockItem", io);
101+
102+
LOGGER.log(System.Logger.Level.ERROR, "Shutting down the web server");
103+
webServer.stop();
97104
}
98105
}
99106

server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.helidon.config.Config;
4141
import io.helidon.config.MapConfigSource;
4242
import io.helidon.config.spi.ConfigSource;
43+
import io.helidon.webserver.WebServer;
4344
import java.io.IOException;
4445
import java.nio.file.Files;
4546
import java.nio.file.Path;
@@ -75,6 +76,8 @@ public class BlockStreamServiceIT {
7576
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver5;
7677
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver6;
7778

79+
@Mock private WebServer webServer;
80+
7881
@Mock private BlockReader<Block> blockReader;
7982
@Mock private BlockWriter<BlockItem> blockWriter;
8083

@@ -389,6 +392,10 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
389392
final var streamMediator = buildStreamMediator(subscribers);
390393
final var blockStreamService = buildBlockStreamService(streamMediator);
391394

395+
// Register the web server to confirm
396+
// the server is stopped when an exception occurs
397+
blockStreamService.register(webServer);
398+
392399
// Subscribe the consumers
393400
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver1);
394401
blockStreamService.subscribeBlockStream(subscribeStreamRequest, subscribeStreamObserver2);
@@ -441,6 +448,8 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
441448
final var endOfStreamResponse =
442449
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
443450
verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse);
451+
452+
verify(webServer, times(1)).stop();
444453
}
445454

446455
private static final String NO_WRITE = "r-xr-xr-x";

server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -166,23 +166,7 @@ public void testRemoveBlockItemWritePerms() throws IOException {
166166

167167
// Change the permissions on the block node root directory
168168
removeBlockWritePerms(1, testConfig);
169-
170-
// Here, BlockItem writes won't throw an exception.
171-
// We will rely on a different process to detect the invalid
172-
// block and replace it.
173-
for (int i = 2; i < blockItems.size(); i++) {
174-
blockWriter.write(blockItems.get(1));
175-
}
176-
177-
// Verify only the header block is on the file system
178-
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
179-
Optional<Block> blockOpt = blockReader.read(1);
180-
assertFalse(blockOpt.isEmpty());
181-
182-
for (int i = 2; i < blockItems.size(); i++) {
183-
blockOpt = blockReader.read(i);
184-
assertTrue(blockOpt.isEmpty());
185-
}
169+
assertThrows(IOException.class, () -> blockWriter.write(blockItems.get(1)));
186170
}
187171

188172
@Test

server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.hedera.block.server.persistence.storage.BlockReader;
3535
import com.hedera.block.server.persistence.storage.BlockWriter;
3636
import io.grpc.stub.StreamObserver;
37+
import io.helidon.webserver.WebServer;
3738
import java.io.IOException;
3839
import java.security.NoSuchAlgorithmException;
3940
import java.util.List;
@@ -58,14 +59,19 @@ public class ProducerBlockItemObserverTest {
5859
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver2;
5960
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver3;
6061

62+
@Mock private WebServer webServer;
63+
6164
@Test
6265
public void testProducerOnNext()
6366
throws InterruptedException, IOException, NoSuchAlgorithmException {
6467

6568
List<BlockItem> blockItems = generateBlockItems(1);
6669
ProducerBlockItemObserver producerBlockItemObserver =
6770
new ProducerBlockItemObserver(
68-
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
71+
streamMediator,
72+
publishStreamResponseObserver,
73+
new ItemAckBuilder(),
74+
webServer);
6975

7076
when(streamMediator.isPublishing()).thenReturn(true);
7177

@@ -147,7 +153,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
147153

148154
final ProducerBlockItemObserver producerBlockItemObserver =
149155
new ProducerBlockItemObserver(
150-
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
156+
streamMediator,
157+
publishStreamResponseObserver,
158+
new ItemAckBuilder(),
159+
webServer);
151160

152161
PublishStreamRequest publishStreamRequest =
153162
PublishStreamRequest.newBuilder().setBlockItem(blockItem).build();
@@ -171,7 +180,10 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
171180
public void testOnError() {
172181
ProducerBlockItemObserver producerBlockItemObserver =
173182
new ProducerBlockItemObserver(
174-
streamMediator, publishStreamResponseObserver, new ItemAckBuilder());
183+
streamMediator,
184+
publishStreamResponseObserver,
185+
new ItemAckBuilder(),
186+
webServer);
175187

176188
Throwable t = new Throwable("Test error");
177189
producerBlockItemObserver.onError(t);
@@ -184,7 +196,7 @@ public void testItemAckBuilderExceptionTest()
184196

185197
ProducerBlockItemObserver producerBlockItemObserver =
186198
new ProducerBlockItemObserver(
187-
streamMediator, publishStreamResponseObserver, itemAckBuilder);
199+
streamMediator, publishStreamResponseObserver, itemAckBuilder, webServer);
188200

189201
when(streamMediator.isPublishing()).thenReturn(true);
190202
when(itemAckBuilder.buildAck(any()))

0 commit comments

Comments
 (0)