Skip to content

Commit

Permalink
feat: add unverified blocks root
Browse files Browse the repository at this point in the history
- add a new unverified blocks root
- persistence will first persist blocks under the unverfied root
- after successful persistence under unverified && successful verification, persistence should move the block to live
- due to unavaliable critical infrastructure, a method call has to be made to persistence to move the block

Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Mar 4, 2025
1 parent ab846e5 commit f100092
Show file tree
Hide file tree
Showing 41 changed files with 664 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public static Stream<Arguments> negativeIntPowersOf10() {
Arguments.of(-1_000_000_000));
}

// @todo(517) add 0 and MIN MAX values here as well, also, make the same logic to test for longs as well
// @todo(713) add 0 and MIN MAX values here as well, also, make the same logic to test for longs as well
public static Stream<Arguments> nonPowersOf10() {
return Stream.of(
Arguments.of(2),
Expand Down
12 changes: 0 additions & 12 deletions server/src/main/java/com/hedera/block/server/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

/** Constants used in the BlockNode service. */
public final class Constants {
/** Constant mapped to the semantic name of the Block Node live root directory */
public static final String BLOCK_NODE_LIVE_ROOT_DIRECTORY_SEMANTIC_NAME = "Block Node Live Root Directory";

/** Constant mapped to the semantic name of the Block Node archive root directory */
public static final String BLOCK_NODE_ARCHIVE_ROOT_DIRECTORY_SEMANTIC_NAME = "Block Node Archive Root Directory";

/** Constant mapped to PbjProtocolProvider.CONFIG_NAME in the PBJ Helidon Plugin */
public static final String PBJ_PROTOCOL_PROVIDER_CONFIG_NAME = "pbj";

Expand All @@ -30,12 +24,6 @@ public final class Constants {
/** Constant defining the block file extension */
public static final String BLOCK_FILE_EXTENSION = ".blk";

/** Constant defining the unverified block file extension */
public static final String UNVERIFIED_BLOCK_FILE_EXTENSION = ".blk.unverified";

/** Constant defining the compressed file extension */
public static final String COMPRESSED_FILE_EXTENSION = ".zstd";

/** Constant defining zip file extension */
public static final String ZIP_FILE_EXTENSION = ".zip";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.ack;

import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -10,6 +11,13 @@
* Responsible for sending Block Acknowledgements to the producer.
*/
public interface AckHandler {
/**
* Register the persistence handler. Temporary solution due to lack of
* critical infrastructure. This should be removed as soon as architectural
* changes, which would allow us to publish results which would then be
* picked up, are implemented.
*/
void registerPersistence(@NonNull final StreamPersistenceHandlerImpl persistence);

/**
* Called when we receive a "persistence" result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.hedera.block.server.metrics.BlockNodeMetricTypes;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.StreamPersistenceHandlerImpl;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult.BlockPersistenceStatus;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class AckHandlerImpl implements AckHandler {
private final ServiceStatus serviceStatus;
private final BlockRemover blockRemover;
private final MetricsService metricsService;
private StreamPersistenceHandlerImpl streamPersistenceHandler;

/**
* Constructor. If either skipPersistence or skipVerification is true,
Expand All @@ -54,6 +56,11 @@ public AckHandlerImpl(
this.metricsService = metricsService;
}

@Override
public void registerPersistence(@NonNull final StreamPersistenceHandlerImpl streamPersistenceHandler) {
this.streamPersistenceHandler = Objects.requireNonNull(streamPersistenceHandler);
}

@Override
public void blockPersisted(@NonNull final BlockPersistenceResult blockPersistenceResult) {
Objects.requireNonNull(blockPersistenceResult);
Expand All @@ -63,7 +70,7 @@ public void blockPersisted(@NonNull final BlockPersistenceResult blockPersistenc
final BlockInfo info = blockInfo.computeIfAbsent(blockNumber, BlockInfo::new);
info.getBlockStatus().setPersisted();
} else {
// @todo(545) handle other cases for the blockPersistenceResult
// @todo(743) handle other cases for the blockPersistenceResult
// for now we will simply send an end of stream message
// but more things need to be handled, like ensure the
// blockInfo map will not be inserted
Expand Down Expand Up @@ -101,7 +108,7 @@ public void blockVerified(long blockNumber, @NonNull Bytes blockHash) {
public void blockVerificationFailed(long blockNumber) {
notifier.sendEndOfStream(lastAcknowledgedBlockNumber, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF);
try {
blockRemover.removeLiveUnverified(blockNumber);
blockRemover.removeUnverified(blockNumber);
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Failed to remove block " + blockNumber, e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -140,6 +147,23 @@ private void attemptAcks() {

// Attempt to mark ACK sent (CAS-protected to avoid duplicates)
if (info.getBlockStatus().markAckSentIfNotAlready()) {
try {
// @todo(582) if we are unable to move the block to the verified state,
// should we throw or for now simply take the same action as if the block
// failed persistence (for now since we lack infrastructure we simply
// call the verification failed method)
streamPersistenceHandler.moveVerified(nextBlock);
} catch (final IOException e) {
// @todo(582) if we do this, we must be aware that we will not increment
// lastAcknowledgedBlockNumber and the verification failed method will
// remove the info from the map. This means that the data needs to be requested
// again. What would be the best way to hande inability to move the block with
// the limitations we current have?
// @todo(743) we should think of a response code for unsuccessful persistence
// this is outside of the scope of this PR
blockVerificationFailed(nextBlock);
return;
}
// We "won" the race; we do the actual ACK
notifier.sendAck(nextBlock, info.getBlockHash(), false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class ServerMappedConfigSourceInitializer {
new ConfigMapping("persistence.storage.liveRootPath", "PERSISTENCE_STORAGE_LIVE_ROOT_PATH"),
new ConfigMapping("persistence.storage.type", "PERSISTENCE_STORAGE_TYPE"),
new ConfigMapping("persistence.storage.archiveGroupSize", "PERSISTENCE_STORAGE_ARCHIVE_GROUP_SIZE"),
new ConfigMapping("persistence.storage.unverifiedRootPath", "PERSISTENCE_STORAGE_UNVERIFIED_ROOT_PATH"),

// Producer Config
new ConfigMapping("producer.type", "PRODUCER_TYPE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ static Compression providesCompression(@NonNull final PersistenceStorageConfig c
@Singleton
static LocalBlockArchiver providesLocalBlockArchiver(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
// @todo(740) allow for configurable executor for the archiver
return new BlockAsLocalFileArchiver(config, blockPathResolver, Executors.newFixedThreadPool(5));
}

Expand All @@ -176,6 +177,7 @@ static BlockNodeEventHandler<ObjectEvent<List<BlockItemUnparsed>>> providesBlock
@NonNull final ServiceStatus serviceStatus,
@NonNull final AckHandler ackHandler,
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final LocalBlockArchiver localBlockArchiver) {
try {
Expand All @@ -188,6 +190,7 @@ static BlockNodeEventHandler<ObjectEvent<List<BlockItemUnparsed>>> providesBlock
asyncBlockWriterFactory,
Executors.newFixedThreadPool(5),
localBlockArchiver,
blockPathResolver,
persistenceStorageConfig);
} catch (final IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.common.utils.FileUtilities;
import com.hedera.block.server.ack.AckHandler;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
Expand All @@ -15,6 +16,8 @@
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.archive.LocalBlockArchiver;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.path.UnverifiedBlockPath;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriter;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriterFactory;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
Expand All @@ -24,17 +27,21 @@
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TransferQueue;
import java.util.stream.Stream;
import javax.inject.Singleton;

/**
Expand All @@ -56,6 +63,7 @@ public class StreamPersistenceHandlerImpl implements BlockNodeEventHandler<Objec
private final AsyncBlockWriterFactory asyncBlockWriterFactory;
private final CompletionService<Void> completionService;
private final LocalBlockArchiver archiver;
private final BlockPathResolver pathResolver;
private TransferQueue<BlockItemUnparsed> currentWriterQueue;

/**
Expand All @@ -80,21 +88,68 @@ public StreamPersistenceHandlerImpl(
@NonNull final AsyncBlockWriterFactory asyncBlockWriterFactory,
@NonNull final Executor writerExecutor,
@NonNull final LocalBlockArchiver archiver,
@NonNull final BlockPathResolver pathResolver,
@NonNull final PersistenceStorageConfig persistenceStorageConfig)
throws IOException {
this.subscriptionHandler = Objects.requireNonNull(subscriptionHandler);
this.notifier = Objects.requireNonNull(notifier);
this.metricsService = blockNodeContext.metricsService();
this.serviceStatus = Objects.requireNonNull(serviceStatus);
this.ackHandler = Objects.requireNonNull(ackHandler);
this.asyncBlockWriterFactory = Objects.requireNonNull(asyncBlockWriterFactory);
this.archiver = Objects.requireNonNull(archiver);
this.pathResolver = Objects.requireNonNull(pathResolver);
this.completionService = new ExecutorCompletionService<>(Objects.requireNonNull(writerExecutor));
// Ensure that the root paths exist
final Path liveRootPath = Objects.requireNonNull(persistenceStorageConfig.liveRootPath());
final Path archiveRootPath = Objects.requireNonNull(persistenceStorageConfig.archiveRootPath());
final Path unverifiedRootPath = Objects.requireNonNull(persistenceStorageConfig.unverifiedRootPath());
Files.createDirectories(liveRootPath);
Files.createDirectories(archiveRootPath);
Files.createDirectories(unverifiedRootPath);

try (final Stream<Path> blockFilesInUnverified = Files.list(unverifiedRootPath)) {
// Clean up the unverified directory at startup. Any files under the
// unverified root at startup are to be considered unreliable
blockFilesInUnverified.map(Path::toFile).forEach(File::delete);
}

// It is indeed a very bad idea to expose `this` to the outside world
// for an object that has not finished initializing, unfortunately there
// is no way around this until we have much needed architectural
// changes. Generally, the whole concept of the ackHandler to be calling
// back to the persistence handler is a bad idea since it couples the
// two classes together and makes the ackHandler an overlord.
// As mentioned, we should get rid of this as soon as possible and
// be publishing results which would then be picked up instead of having
// direct method calls. As far as exposing `this`, thankfully the logic
// that would call `this` from the ackHandler is not executed until the
// persistence handler is fully initialized, so there is no risk of
// calling a method on an object that is not fully initialized, BUT we
// need to take extra care! Essentially, in order to ever call the
// `moveVerified` method, the ackHandler must be fully initialized, and
// the block which would be first in line for moving must be both
// verified and persisted into the unverified directory. Persisting into
// the unverified directory is done by `this`, so it would not be
// possible to call `moveVerified` before the persistence handler is
// fully initialized.
this.ackHandler = Objects.requireNonNull(ackHandler);
this.ackHandler.registerPersistence(this);
}

public void moveVerified(final long blockNumber) throws IOException {
final Optional<UnverifiedBlockPath> optUnverified = pathResolver.findUnverifiedBlock(blockNumber);
if (optUnverified.isPresent()) {
final UnverifiedBlockPath unverifiedBlockPath = optUnverified.get();
final Path source = unverifiedBlockPath.dirPath().resolve(unverifiedBlockPath.blockFileName());
final Path rawPathToLive = pathResolver.resolveLiveRawPathToBlock(unverifiedBlockPath.blockNumber());
final Path target = FileUtilities.appendExtension(
rawPathToLive, unverifiedBlockPath.compressionType().getFileExtension());
Files.createDirectories(target.getParent());
Files.move(source, target);
} else {
throw new FileNotFoundException(
"File for Block [%s] not found under unverified root".formatted(blockNumber));
}
}

/**
Expand All @@ -106,16 +161,14 @@ public StreamPersistenceHandlerImpl(
* @param b true if the event is the last in the sequence
*/
@Override
public void onEvent(final ObjectEvent<List<BlockItemUnparsed>> event, long l, boolean b) {

public void onEvent(final ObjectEvent<List<BlockItemUnparsed>> event, final long l, final boolean b) {
try {
if (serviceStatus.isRunning()) {
final List<BlockItemUnparsed> blockItems = event.get();
if (blockItems.isEmpty()) {
final String message = "BlockItems list is empty.";
throw new BlockStreamProtocolException(message);
}

handleBlockItems(blockItems);
} else {
LOGGER.log(ERROR, "Service is not running. Block items will not be persisted.");
Expand Down Expand Up @@ -203,7 +256,7 @@ private void handleBlockItems(final List<BlockItemUnparsed> blockItems)
private void handlePersistenceExecution(final Future<Void> completionResult) throws BlockStreamProtocolException {
try {
if (completionResult.isCancelled()) {
// @todo(545) submit cancelled to ackHandler when migrated
// @todo(713) submit cancelled to ackHandler when migrated
} else {
// we call get here to verify that the task has run to completion
// we do not expect it to throw an exception, but to publish
Expand All @@ -217,8 +270,7 @@ private void handlePersistenceExecution(final Future<Void> completionResult) thr
// result otherwise, it is either a bug or an unhandled case
throw new BlockStreamProtocolException("Unexpected exception during block persistence.", e);
} catch (final InterruptedException e) {
// @todo(545) if we enter here, then the ring buffer thread was
// interrupted. What shall we do here? How to handle?
// @todo(713) What would be the proper handling here
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public record PersistenceStorageConfig(
@Loggable @ConfigProperty(defaultValue = "/opt/hashgraph/blocknode/data/live") Path liveRootPath,
@Loggable @ConfigProperty(defaultValue = "/opt/hashgraph/blocknode/data/archive") Path archiveRootPath,
@Loggable @ConfigProperty(defaultValue = "/opt/hashgraph/blocknode/data/unverified") Path unverifiedRootPath,
@Loggable @ConfigProperty(defaultValue = "BLOCK_AS_LOCAL_FILE") StorageType type,
@Loggable @ConfigProperty(defaultValue = "ZSTD") CompressionType compression,
@Loggable @ConfigProperty(defaultValue = "3") @Min(0) @Max(20) int compressionLevel,
Expand All @@ -35,8 +36,11 @@ public record PersistenceStorageConfig(
public PersistenceStorageConfig {
Objects.requireNonNull(liveRootPath);
Objects.requireNonNull(archiveRootPath);
Objects.requireNonNull(unverifiedRootPath);
Objects.requireNonNull(type);
compression.verifyCompressionLevel(compressionLevel);
// @todo(742) verify that the group size has not changed once it has
// been set initially
Preconditions.requireGreaterOrEqual(
archiveGroupSize,
10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private void handleSubmittedResults() {
while ((completionResult = completionService.poll()) != null) {
try {
if (completionResult.isCancelled()) {
// @todo(517) when we have infrastructure for publishing
// @todo(713) when we have infrastructure for publishing
// results, we should do so
} else {
// We call get here to verify that the task has run to completion
Expand All @@ -69,15 +69,15 @@ private void handleSubmittedResults() {
// The result should be the number of actual block items
// archived.
final long result = completionResult.get();
// @todo(517) this is a good place to do some metrics
// @todo(713) this is a good place to do some metrics
LOGGER.log(TRACE, "Archived [{0}] BlockFiles", result);
}
} catch (final ExecutionException e) {
// we do not expect to enter here, if we do, then there is
// either a bug in the archiving task, or an unhandled case
throw new BlockArchivingException(e.getCause());
} catch (final InterruptedException e) {
// @todo(517) What shall we do here? How to handle?
// @todo(713) What shall we do here? How to handle?
Thread.currentThread().interrupt();
}
}
Expand Down
Loading

0 comments on commit f100092

Please sign in to comment.