Skip to content

Commit bbc4066

Browse files
fix:refactor out FileSystemBlockStorage and split Read and Write
Signed-off-by: Matt Peterson <[email protected]>
1 parent 2721b4b commit bbc4066

File tree

14 files changed

+381
-312
lines changed

14 files changed

+381
-312
lines changed

server/src/main/java/com/hedera/block/server/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ private Constants() {}
2929
public static final String SERVICE_NAME = "BlockStreamGrpcService";
3030
public static final String CLIENT_STREAMING_METHOD_NAME = "publishBlockStream";
3131
public static final String SERVER_STREAMING_METHOD_NAME = "subscribeBlockStream";
32+
33+
// Constants used when interacting with the file system.
34+
public static final String BLOCK_FILE_EXTENSION = ".blk";
3235
}

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121

2222
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2323
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
24-
import com.hedera.block.server.persistence.storage.BlockStorage;
25-
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
24+
import com.hedera.block.server.persistence.storage.*;
2625
import io.grpc.stub.ServerCalls;
2726
import io.grpc.stub.StreamObserver;
2827
import io.helidon.config.Config;
@@ -66,13 +65,17 @@ public static void main(final String[] args) {
6665
.asLong()
6766
.orElse(1500L);
6867

69-
// Initialize the block storage, cache, and service
70-
final BlockStorage<Block, BlockItem> blockStorage =
71-
new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
68+
// Initialize the reader and writer for the block storage
69+
final BlockWriter<BlockItem> blockWriter =
70+
new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
71+
final BlockReader<Block> blockReader =
72+
new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
73+
7274
final BlockStreamService blockStreamService =
7375
new BlockStreamService(
7476
consumerTimeoutThreshold,
75-
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
77+
new LiveStreamMediatorImpl(
78+
new WriteThroughCacheHandler(blockReader, blockWriter)));
7679

7780
// Start the web server
7881
WebServer.builder()

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ public void publishEvent(BlockItem blockItem) {
7575
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));
7676

7777
// Block persistence
78-
blockPersistenceHandler.persist(blockItem);
78+
try {
79+
blockPersistenceHandler.persist(blockItem);
80+
} catch (Exception e) {
81+
// TODO: Push back on the producer?
82+
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e);
83+
}
7984
}
8085

8186
@Override

server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.hedera.block.server.persistence;
1818

19+
import java.io.IOException;
1920
import java.util.Optional;
2021
import java.util.Queue;
2122

@@ -28,7 +29,7 @@
2829
public interface BlockPersistenceHandler<U, V> {
2930

3031
/** Persists a block. */
31-
void persist(final V blockItem);
32+
void persist(final V blockItem) throws IOException;
3233

3334
/**
3435
* Reads a block.

server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import static com.hedera.block.protos.BlockStreamService.Block;
2020
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2121

22-
import com.hedera.block.server.persistence.storage.BlockStorage;
22+
import com.hedera.block.server.persistence.storage.BlockReader;
23+
import com.hedera.block.server.persistence.storage.BlockWriter;
24+
import java.io.IOException;
2325
import java.util.LinkedList;
2426
import java.util.Optional;
2527
import java.util.Queue;
@@ -30,21 +32,14 @@
3032
*/
3133
public class WriteThroughCacheHandler implements BlockPersistenceHandler<Block, BlockItem> {
3234

33-
private final BlockStorage<Block, BlockItem> blockStorage;
35+
private final BlockReader<Block> blockReader;
36+
private final BlockWriter<BlockItem> blockWriter;
3437

35-
/**
36-
* Constructor for the WriteThroughCacheHandler class.
37-
*
38-
* @param blockStorage the block storage
39-
*/
40-
public WriteThroughCacheHandler(final BlockStorage<Block, BlockItem> blockStorage) {
41-
this.blockStorage = blockStorage;
42-
}
43-
44-
/** Persists the block to the block storage and cache the block. */
45-
@Override
46-
public void persist(final BlockItem blockItem) {
47-
blockStorage.write(blockItem);
38+
/** Constructor for the WriteThroughCacheHandler class. */
39+
public WriteThroughCacheHandler(
40+
final BlockReader<Block> blockReader, final BlockWriter<BlockItem> blockWriter) {
41+
this.blockReader = blockReader;
42+
this.blockWriter = blockWriter;
4843
}
4944

5045
/**
@@ -74,6 +69,12 @@ public Queue<Block> readRange(final long startBlockId, final long endBlockId) {
7469
*/
7570
@Override
7671
public Optional<Block> read(final long id) {
77-
return blockStorage.read(id);
72+
return blockReader.read(id);
73+
}
74+
75+
/** Persists the block to the block storage and cache the block. */
76+
@Override
77+
public void persist(final BlockItem blockItem) throws IOException {
78+
blockWriter.write(blockItem);
7879
}
7980
}

server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java renamed to server/src/main/java/com/hedera/block/server/persistence/storage/BlockReader.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,6 @@
1818

1919
import java.util.Optional;
2020

21-
/**
22-
* The BlockStorage interface defines operations to write and read blocks to a persistent store.
23-
*
24-
* @param <V> the type of block to store
25-
*/
26-
public interface BlockStorage<U, V> {
27-
28-
/** Writes a block to storage. */
29-
void write(final V blockItem);
30-
31-
/**
32-
* Reads a block from storage.
33-
*
34-
* @return the block
35-
*/
36-
Optional<U> read(final long blockNumber);
21+
public interface BlockReader<V> {
22+
Optional<V> read(final long blockNumber);
3723
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.persistence.storage;
18+
19+
import java.io.IOException;
20+
21+
public interface BlockWriter<V> {
22+
void write(final V blockItem) throws IOException;
23+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.persistence.storage;
18+
19+
import static com.hedera.block.protos.BlockStreamService.Block;
20+
import static com.hedera.block.protos.BlockStreamService.Block.Builder;
21+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
22+
import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;
23+
24+
import io.helidon.config.Config;
25+
import java.io.FileInputStream;
26+
import java.io.FileNotFoundException;
27+
import java.io.IOException;
28+
import java.nio.file.Path;
29+
import java.util.Optional;
30+
31+
public class FileSystemBlockReader implements BlockReader<Block> {
32+
33+
private final System.Logger LOGGER = System.getLogger(getClass().getName());
34+
35+
final Path blockNodeRootPath;
36+
37+
public FileSystemBlockReader(final String key, final Config config) {
38+
39+
LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");
40+
41+
final Path blockNodeRootPath = Path.of(config.get(key).asString().get());
42+
43+
LOGGER.log(System.Logger.Level.INFO, config.toString());
44+
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);
45+
46+
this.blockNodeRootPath = blockNodeRootPath;
47+
}
48+
49+
public Optional<Block> read(final long blockNumber) {
50+
51+
// Construct the path to the requested Block
52+
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(blockNumber));
53+
54+
// Verify attributes of the Block
55+
if (!isVerified(blockPath)) {
56+
return Optional.empty();
57+
}
58+
59+
// There may be thousands of BlockItem files in a Block directory.
60+
// The BlockItems must be written into the Block object in order. A
61+
// DirectoryStream will iterate in any guaranteed order. To avoid sorting,
62+
// and to keep the retrieval process linear with the number of BlockItems,
63+
// Run a loop to fetch in the order we need. The loop will break when
64+
// it looks for a BlockItem file that does not exist.
65+
final Builder builder = Block.newBuilder();
66+
for (int i = 1; ; i++) {
67+
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
68+
try {
69+
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
70+
if (blockItemOpt.isPresent()) {
71+
builder.addBlockItems(blockItemOpt.get());
72+
continue;
73+
}
74+
} catch (IOException io) {
75+
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io);
76+
return Optional.empty();
77+
}
78+
79+
break;
80+
}
81+
82+
// Return the Block
83+
return Optional.of(builder.build());
84+
}
85+
86+
private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOException {
87+
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
88+
return Optional.of(BlockItem.parseFrom(fis));
89+
} catch (FileNotFoundException io) {
90+
// The outer loop caller will continue to query
91+
// for the next BlockItem file based on the index
92+
// until the FileNotFoundException is thrown.
93+
// It's expected that this exception will be caught
94+
// at the end of every query.
95+
return Optional.empty();
96+
}
97+
}
98+
99+
private boolean isVerified(final Path blockPath) {
100+
if (!blockPath.toFile().isDirectory()) {
101+
LOGGER.log(System.Logger.Level.ERROR, "Block directory not found: " + blockPath);
102+
return false;
103+
}
104+
105+
if (!blockPath.toFile().canRead()) {
106+
LOGGER.log(System.Logger.Level.ERROR, "Block directory not readable: " + blockPath);
107+
return false;
108+
}
109+
110+
return true;
111+
}
112+
}

0 commit comments

Comments
 (0)