Skip to content

Commit d6ffeeb

Browse files
authored
Log bob deserialization exception for snapshot blobs (#1711)
1 parent 56c6267 commit d6ffeeb

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,13 @@ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata
182182
return FutureUtil.executeAsyncWithRetries(opName, () -> {
183183
ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream
184184
return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture()
185-
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor);
185+
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
186+
.handle((snapshotIndex, ex) -> {
187+
if (ex != null) {
188+
throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex);
189+
}
190+
return snapshotIndex;
191+
});
186192
}, isCauseNonRetriable(), executor, retryPolicyConfig);
187193
}
188194

0 commit comments

Comments
 (0)