diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index bd78248cb9..8e73d5895a 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -182,13 +182,13 @@ public CompletableFuture getSnapshotIndex(String blobId, Metadata return FutureUtil.executeAsyncWithRetries(opName, () -> { ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture() - .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor) - .handle((snapshotIndex, ex) -> { - if (ex != null) { + .thenApplyAsync(f -> { + try { + return snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()); + } catch (Exception ex) { throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex); } - return snapshotIndex; - }); + }, executor); }, isCauseNonRetriable(), executor, retryPolicyConfig); } diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java index a44f86e644..6395582b53 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +51,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.zip.CRC32; @@ -78,6 +80,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.util.FileUtil; import org.apache.samza.util.FutureUtil; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() { checkpoint, storesToBackupOrRestore, false); } + @Test + public void testSerdeException() throws ExecutionException, InterruptedException { + final String blobId = "foo"; + + final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager(); + final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null); + + final CompletableFuture future = util.getSnapshotIndex(blobId, mock(Metadata.class), true) + .handle((snapshotIndex, throwable) -> { + if (throwable != null) { + // assert if expected error is thrown during deserialization of a bad SnapshotIndex blob + Assert.assertEquals(throwable.getMessage(), String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId)); + } + return snapshotIndex; + }); + future.get(); + } + @Test public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() { String store = "storeName1"; @@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) { + final String randBlob = "foobar"; + final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8); + try { + outputStream.write(byteArray); + } catch (IOException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(null); + } + } + + /** + * Test BlobStoreManager for unit tests. + * */ + private static class TestBlobStoreManager implements BlobStoreManager { + @Override + public void init() { + } + + @Override + public CompletionStage put(InputStream inputStream, Metadata metadata) { + return null; + } + + @Override + public CompletionStage get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) { + return null; + } + + @Override + public CompletionStage delete(String id, Metadata metadata) { + return null; + } + + @Override + public CompletionStage removeTTL(String blobId, Metadata metadata) { + return null; + } + + @Override + public void close() { + } + } }