Skip to content

Commit ea7615d

Browse files
committed
Fix get future exception message
1 parent d6ffeeb commit ea7615d

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

Diff for: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata
185185
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
186186
.handle((snapshotIndex, ex) -> {
187187
if (ex != null) {
188-
throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex);
188+
throw new SamzaException(String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId), ex);
189189
}
190190
return snapshotIndex;
191191
});

Diff for: samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java

+74
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.io.InputStream;
3030
import java.io.OutputStream;
31+
import java.nio.charset.StandardCharsets;
3132
import java.nio.file.Files;
3233
import java.nio.file.Path;
3334
import java.nio.file.Paths;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.CompletionStage;
5152
import java.util.concurrent.ExecutionException;
5253
import java.util.concurrent.ExecutorService;
54+
import java.util.concurrent.Executors;
5355
import java.util.concurrent.TimeUnit;
5456
import java.util.concurrent.TimeoutException;
5557
import java.util.zip.CRC32;
@@ -78,6 +80,7 @@
7880
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
7981
import org.apache.samza.util.FileUtil;
8082
import org.apache.samza.util.FutureUtil;
83+
import org.junit.Assert;
8184
import org.junit.Ignore;
8285
import org.junit.Test;
8386
import org.mockito.ArgumentCaptor;
@@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() {
920923
checkpoint, storesToBackupOrRestore, false);
921924
}
922925

926+
@Test
927+
public void testSerdeException() throws ExecutionException, InterruptedException {
928+
final String blobId = "foo";
929+
930+
final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager();
931+
final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);
932+
933+
final CompletableFuture<SnapshotIndex> future = util.getSnapshotIndex(blobId, mock(Metadata.class), true)
934+
.handle((snapshotIndex, throwable) -> {
935+
if (throwable != null) {
936+
Assert.assertEquals(throwable.getMessage(), String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId));
937+
Assert.assertEquals(throwable.getCause().getMessage(), "org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex bytes foobar");
938+
}
939+
return snapshotIndex;
940+
});
941+
future.get();
942+
}
943+
923944
@Test
924945
public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
925946
String store = "storeName1";
@@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map<String,
10451066
factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
10461067
return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
10471068
}
1069+
1070+
/**
1071+
* Test {@link BlobStoreManager} to be used to assert SnapshotIndex deserialization failure
1072+
* exception message.
1073+
* We write a dummy string's bytes to the OutputStream parameter of get method instead of a SnapshotIndex
1074+
* blob. The OutputStream is used by SnapshotIndexSerde which will fail during deserialization.
1075+
* */
1076+
private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
1077+
@Override
1078+
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
1079+
final String randBlob = "foobar";
1080+
final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
1081+
try {
1082+
outputStream.write(byteArray);
1083+
} catch (IOException e) {
1084+
throw new RuntimeException(e);
1085+
}
1086+
return CompletableFuture.completedFuture(null);
1087+
}
1088+
}
1089+
1090+
/**
1091+
* Test BlobStoreManager for unit tests.
1092+
* */
1093+
private static class TestBlobStoreManager implements BlobStoreManager {
1094+
@Override
1095+
public void init() {
1096+
}
1097+
1098+
@Override
1099+
public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
1100+
return null;
1101+
}
1102+
1103+
@Override
1104+
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
1105+
return null;
1106+
}
1107+
1108+
@Override
1109+
public CompletionStage<Void> delete(String id, Metadata metadata) {
1110+
return null;
1111+
}
1112+
1113+
@Override
1114+
public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
1115+
return null;
1116+
}
1117+
1118+
@Override
1119+
public void close() {
1120+
}
1121+
}
10481122
}

0 commit comments

Comments
 (0)