Skip to content

Commit

Permalink
updated todo tags
Browse files Browse the repository at this point in the history
Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Feb 28, 2025
1 parent ea3bb72 commit 69521c7
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public static Stream<Arguments> negativeIntPowersOf10() {
Arguments.of(-1_000_000_000));
}

// @todo(517) add 0 and MIN MAX values here as well, also, make the same logic to test for longs as well
// @todo(713) add 0 and MIN MAX values here as well, also, make the same logic to test for longs as well
public static Stream<Arguments> nonPowersOf10() {
return Stream.of(
Arguments.of(2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void blockPersisted(@NonNull final BlockPersistenceResult blockPersistenc
final BlockInfo info = blockInfo.computeIfAbsent(blockNumber, BlockInfo::new);
info.getBlockStatus().setPersisted();
} else {
// @todo(545) handle other cases for the blockPersistenceResult
// @todo(743) handle other cases for the blockPersistenceResult
// for now we will simply send an end of stream message
// but more things need to be handled, like ensure the
// blockInfo map will not be inserted
Expand Down Expand Up @@ -150,16 +150,16 @@ private void attemptAcks() {
try {
// @todo(582) if we are unable to move the block to the verified state,
// should we throw or for now simply take the same action as if the block
// failed persistence (for now since we lack infrastructure we simply)
// call the verification failed method
// failed persistence (for now since we lack infrastructure we simply
// call the verification failed method)
streamPersistenceHandler.moveVerified(nextBlock);
} catch (final IOException e) {
// @todo(582) if we do this, we must be aware that we will not increment
// lastAcknowledgedBlockNumber and the verification failed method will
// remove the info from the map. This means that the data needs to be requested
// again. What would be the best way to hande inability to move the block with
// the limitations we current have?
// @todo(582) we should thing of a response code for unsuccessful persistence
// @todo(743) we should think of a response code for unsuccessful persistence
// this is outside of the scope of this PR
blockVerificationFailed(nextBlock);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ static Compression providesCompression(@NonNull final PersistenceStorageConfig c
@Singleton
static LocalBlockArchiver providesLocalBlockArchiver(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
// @todo(740) allow for configurable executor for the archiver
return new BlockAsLocalFileArchiver(config, blockPathResolver, Executors.newFixedThreadPool(5));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.hedera.pbj.runtime.ParseException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TransferQueue;
import java.util.stream.Stream;
import javax.inject.Singleton;

/**
Expand Down Expand Up @@ -105,6 +107,12 @@ public StreamPersistenceHandlerImpl(
Files.createDirectories(archiveRootPath);
Files.createDirectories(unverifiedRootPath);

try (final Stream<Path> blockFilesInUnverified = Files.list(unverifiedRootPath)) {
// Clean up the unverified directory at startup. Any files under the
// unverified root at startup are to be considered unreliable
blockFilesInUnverified.map(Path::toFile).forEach(File::delete);
}

// It is indeed a very bad idea to expose `this` to the outside world
// for an object that has not finished initializing, unfortunately there
// is no way around this until we have much needed architectural
Expand Down Expand Up @@ -248,7 +256,7 @@ private void handleBlockItems(final List<BlockItemUnparsed> blockItems)
private void handlePersistenceExecution(final Future<Void> completionResult) throws BlockStreamProtocolException {
try {
if (completionResult.isCancelled()) {
// @todo(545) submit cancelled to ackHandler when migrated
// @todo(713) submit cancelled to ackHandler when migrated
} else {
// we call get here to verify that the task has run to completion
// we do not expect it to throw an exception, but to publish
Expand All @@ -262,8 +270,7 @@ private void handlePersistenceExecution(final Future<Void> completionResult) thr
// result otherwise, it is either a bug or an unhandled case
throw new BlockStreamProtocolException("Unexpected exception during block persistence.", e);
} catch (final InterruptedException e) {
// @todo(545) if we enter here, then the ring buffer thread was
// interrupted. What shall we do here? How to handle?
// @todo(713) What would be the proper handling here
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public record PersistenceStorageConfig(
Objects.requireNonNull(unverifiedRootPath);
Objects.requireNonNull(type);
compression.verifyCompressionLevel(compressionLevel);
// @todo(742) verify that the group size has not changed once it has
// been set initially
Preconditions.requireGreaterOrEqual(
archiveGroupSize,
10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private void handleSubmittedResults() {
while ((completionResult = completionService.poll()) != null) {
try {
if (completionResult.isCancelled()) {
// @todo(517) when we have infrastructure for publishing
// @todo(713) when we have infrastructure for publishing
// results, we should do so
} else {
// We call get here to verify that the task has run to completion
Expand All @@ -69,15 +69,15 @@ private void handleSubmittedResults() {
// The result should be the number of actual block items
// archived.
final long result = completionResult.get();
// @todo(517) this is a good place to do some metrics
// @todo(713) this is a good place to do some metrics
LOGGER.log(TRACE, "Archived [{0}] BlockFiles", result);
}
} catch (final ExecutionException e) {
// we do not expect to enter here, if we do, then there is
// either a bug in the archiving task, or an unhandled case
throw new BlockArchivingException(e.getCause());
} catch (final InterruptedException e) {
// @todo(517) What shall we do here? How to handle?
// @todo(713) What shall we do here? How to handle?
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public Long call() throws IOException {
} else {
LOGGER.log(Level.DEBUG, NO_FILES_TO_ARCHIVE_MESSAGE, rootToArchive);
}
// @todo(737) update proper metrics
// @todo(739) the task should return meaningful result that would be
// published
// If no exception is thrown, then we expect that the archiving process is successful,
// and we can return the number of blocks that were archived.
return blockFilesArchived;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public Optional<BlockUnparsed> read(final long blockNumber) throws IOException,
final ArchiveBlockPath archiveBlockPath = optArchivedBlock.get();
final Path zipFilePath = archiveBlockPath.dirPath().resolve(archiveBlockPath.zipFileName());
final BlockUnparsed value;
// @todo(736) (depends on 598!) update reader to be able to read
// appended blocks
// @todo(741) update reader to use zipfs to read blocks
try (final ZipFile zipFile = new ZipFile(zipFilePath.toFile())) {
final ZipEntry entry = zipFile.getEntry(archiveBlockPath.zipEntryName());
final InputStream in = zipFile.getInputStream(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TransferQueue<BlockItemUnparsed> getQueue() {
}

private BlockPersistenceResult doPersistBlock() {
// @todo(545) think about possible race conditions, it is possible that
// @todo(713) think about possible race conditions, it is possible that
// the persistence handler to start two tasks for the same block number
// simultaneously theoretically. If that happens, then maybe both of them
// will enter in the else statement. Should the persistence handler
Expand All @@ -105,7 +105,7 @@ private BlockPersistenceResult doPersistBlock() {
}
}
} catch (final InterruptedException e) {
// @todo(545) if we have entered here, something has cancelled the task.
// @todo(713) if we have entered here, something has cancelled the task.
// Is this the proper handling here?
LOGGER.log(
ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private BlockPersistenceResult doPersistBlock() {
}
}
} catch (final InterruptedException e) {
// @todo(545) is this the proper handling here?
// @todo(713) is this the proper handling here?
LOGGER.log(
Level.ERROR,
"Interrupted while waiting for next block item for block [%d]".formatted(blockNumber));
Expand Down

0 comments on commit 69521c7

Please sign in to comment.