From bdae2e0284e610b5f01c4b6fac57ed65cde12b26 Mon Sep 17 00:00:00 2001 From: Ajo Thomas Date: Fri, 13 Sep 2024 14:19:21 -0700 Subject: [PATCH] Log bob deserialization exception for snapshot blobs --- .../samza/storage/blobstore/util/BlobStoreUtil.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index 327eb7f33c..bd78248cb9 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -182,7 +182,13 @@ public CompletableFuture getSnapshotIndex(String blobId, Metadata return FutureUtil.executeAsyncWithRetries(opName, () -> { ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture() - .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor) + .handle((snapshotIndex, ex) -> { + if (ex != null) { + throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex); + } + return snapshotIndex; + }); }, isCauseNonRetriable(), executor, retryPolicyConfig); }