Skip to content

Commit

Permalink
Throw SnapshotIndex deserialization error from the chained thenApplyA…
Browse files Browse the repository at this point in the history
…sync method of the get future instead of handle
  • Loading branch information
ajothomas committed Oct 11, 2024
1 parent d6ffeeb commit 4479c33
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata
return FutureUtil.executeAsyncWithRetries(opName, () -> {
ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream
return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture()
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
.handle((snapshotIndex, ex) -> {
if (ex != null) {
.thenApplyAsync(f -> {
try {
return snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray());
} catch (Exception ex) {
throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex);
}
return snapshotIndex;
});
}, executor);
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() {
checkpoint, storesToBackupOrRestore, false);
}

@Test
public void testSerdeException() throws ExecutionException, InterruptedException {
final String blobId = "foo";

final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager();
final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);

final CompletableFuture<SnapshotIndex> future = util.getSnapshotIndex(blobId, mock(Metadata.class), true)
.handle((snapshotIndex, throwable) -> {
if (throwable != null) {
// assert if expected error is thrown during deserialization of a bad SnapshotIndex blob
Assert.assertEquals(throwable.getMessage(), String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId));
}
return snapshotIndex;
});
future.get();
}

@Test
public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
String store = "storeName1";
Expand Down Expand Up @@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map<String,
factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
}

/**
* Test {@link BlobStoreManager} to be used to assert SnapshotIndex deserialization failure
* exception message.
* We write a dummy string's bytes to the OutputStream parameter of get method instead of a SnapshotIndex
* blob. The OutputStream is used by SnapshotIndexSerde which will fail during deserialization.
* */
private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
final String randBlob = "foobar";
final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
try {
outputStream.write(byteArray);
} catch (IOException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture(null);
}
}

/**
* Test BlobStoreManager for unit tests.
* */
private static class TestBlobStoreManager implements BlobStoreManager {
@Override
public void init() {
}

@Override
public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
return null;
}

@Override
public CompletionStage<Void> delete(String id, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
return null;
}

@Override
public void close() {
}
}
}

0 comments on commit 4479c33

Please sign in to comment.