Skip to content

Commit f4f66dd

Browse files
committed
Update Glue schema id to 36. Add some tests to increase coverage. Use Kafka ByteUtils also in unit tests.
1 parent 205d9e1 commit f4f66dd

File tree

10 files changed

+229
-89
lines changed

10 files changed

+229
-89
lines changed

examples/powertools-examples-kafka/src/main/proto/KeyMessage.proto

Whitespace-only changes.

examples/powertools-examples-kafka/src/main/proto/ValueMessage.proto

Whitespace-only changes.

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@
4141
abstract class AbstractKafkaDeserializer implements PowertoolsDeserializer {
4242
protected static final ObjectMapper objectMapper = new ObjectMapper()
4343
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
44-
private static final Integer GLUE_SCHEMA_ID_LENGTH = 16;
44+
private static final Integer GLUE_SCHEMA_ID_LENGTH = 36;
4545

4646
public enum SchemaRegistryType {
47-
CONFLUENT,
48-
GLUE,
49-
NONE
47+
CONFLUENT, GLUE, NONE
5048
}
5149

5250
/**
@@ -197,7 +195,8 @@ private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
197195
Optional.empty());
198196
}
199197

200-
private <T> T deserializeField(String encodedData, Class<T> type, String fieldName, SchemaRegistryType schemaRegistryType) {
198+
private <T> T deserializeField(String encodedData, Class<T> type, String fieldName,
199+
SchemaRegistryType schemaRegistryType) {
201200
if (encodedData == null) {
202201
return null;
203202
}
@@ -239,10 +238,8 @@ private String extractValueSchemaId(KafkaEvent.KafkaEventRecord eventRecord) {
239238
return null;
240239
}
241240

242-
// The Assumption is that there will always be only one schema registry used, either Glue or Confluent, for both key
243-
// and value.
244-
protected SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventRecord eventRecord) {
245-
241+
private SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventRecord eventRecord) {
242+
// This method is used for both key and value, so we try to extract the schema id from both fields
246243
String schemaId = extractValueSchemaId(eventRecord);
247244
if (schemaId == null) {
248245
schemaId = extractKeySchemaId(eventRecord);
@@ -266,7 +263,8 @@ protected SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventReco
266263
* @return The deserialized object
267264
* @throws IOException If deserialization fails
268265
*/
269-
protected abstract <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException;
266+
protected abstract <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType)
267+
throws IOException;
270268

271269
/**
272270
* Main deserialize method that handles primitive types and delegates to subclasses for complex types and

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
public class KafkaAvroDeserializer extends AbstractKafkaDeserializer {
2727

2828
@Override
29-
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
29+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType)
30+
throws IOException {
3031
// If no Avro generated class is passed we cannot deserialize using Avro
3132
if (SpecificRecordBase.class.isAssignableFrom(type)) {
3233
try {

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414

1515
import java.io.IOException;
1616
import java.nio.charset.StandardCharsets;
17-
import java.rmi.UnexpectedException;
1817

1918
/**
2019
* Deserializer for Kafka records using JSON format.
2120
*/
2221
public class KafkaJsonDeserializer extends AbstractKafkaDeserializer {
2322

2423
@Override
25-
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
24+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType)
25+
throws IOException {
2626
String decodedStr = new String(data, StandardCharsets.UTF_8);
2727

2828
return objectMapper.readValue(decodedStr, type);

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package software.amazon.lambda.powertools.kafka.serializers;
1414

1515
import java.io.IOException;
16+
import java.lang.reflect.InvocationTargetException;
1617
import java.nio.ByteBuffer;
1718

1819
import org.apache.kafka.common.utils.ByteUtils;
@@ -25,7 +26,8 @@
2526

2627
/**
2728
* Deserializer for Kafka records using Protocol Buffers format.
28-
* Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices.
29+
* Supports both standard protobuf serialization and Confluent / Glue Schema Registry serialization using messages
30+
* indices.
2931
*
3032
* For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped
3133
* by the Kafka ESM, leaving only the message index (if present) and protobuf data.
@@ -35,19 +37,24 @@
3537
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {
3638

3739
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class);
40+
private static final String PROTOBUF_PARSER_METHOD = "parser";
3841

3942
@Override
40-
@SuppressWarnings("unchecked")
41-
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
43+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType)
44+
throws IOException {
4245
// If no Protobuf generated class is passed, we cannot deserialize using Protobuf
4346
if (Message.class.isAssignableFrom(type)) {
44-
switch (schemaRegistryType) {
45-
case GLUE:
46-
return glueDeserializer(data, type);
47-
case CONFLUENT:
48-
return confluentDeserializer(data, type);
49-
default:
50-
return defaultDeserializer(data, type);
47+
try {
48+
switch (schemaRegistryType) {
49+
case GLUE:
50+
return glueDeserializer(data, type);
51+
case CONFLUENT:
52+
return confluentDeserializer(data, type);
53+
default:
54+
return defaultDeserializer(data, type);
55+
}
56+
} catch (Exception e) {
57+
throw new IOException("Failed to deserialize Protobuf data.", e);
5158
}
5259
} else {
5360
throw new IOException("Unsupported type for Protobuf deserialization: " + type.getName() + ". "
@@ -56,53 +63,48 @@ protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryTyp
5663
}
5764
}
5865

66+
@SuppressWarnings("unchecked")
5967
private <T> T defaultDeserializer(byte[] data, Class<T> type) throws IOException {
6068
try {
6169
LOGGER.debug("Using default Protobuf deserializer");
62-
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
70+
Parser<Message> parser = (Parser<Message>) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null);
6371
Message message = parser.parseFrom(data);
6472
return type.cast(message);
6573
} catch (Exception e) {
6674
throw new IOException("Failed to deserialize Protobuf data.", e);
6775
}
6876
}
6977

70-
private <T> T confluentDeserializer(byte[] data, Class<T> type) throws IOException {
71-
try {
72-
73-
LOGGER.debug("Using Confluent Deserializer");
74-
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
75-
ByteBuffer buffer = ByteBuffer.wrap(data);
76-
int size = ByteUtils.readVarint(buffer);
78+
@SuppressWarnings("unchecked")
79+
private <T> T confluentDeserializer(byte[] data, Class<T> type)
80+
throws IOException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
81+
LOGGER.debug("Using Confluent Deserializer");
82+
Parser<Message> parser = (Parser<Message>) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null);
83+
ByteBuffer buffer = ByteBuffer.wrap(data);
84+
int size = ByteUtils.readVarint(buffer);
7785

78-
// Only if the size is greater than zero, continue reading varInt. based on Glue Proto deserializer implementation
79-
if (size > 0) {
80-
for (int i = 0; i < size; i++) {
81-
ByteUtils.readVarint(buffer);
82-
}
86+
// Only if the size is greater than zero, continue reading varInt.
87+
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
88+
if (size > 0) {
89+
for (int i = 0; i < size; i++) {
90+
ByteUtils.readVarint(buffer);
8391
}
84-
Message message = parser.parseFrom(buffer);
85-
return type.cast(message);
86-
} catch (Exception e) {
87-
throw new IOException("Failed to deserialize Protobuf data.", e);
8892
}
93+
Message message = parser.parseFrom(buffer);
94+
return type.cast(message);
8995
}
9096

97+
@SuppressWarnings("unchecked")
98+
private <T> T glueDeserializer(byte[] data, Class<T> type)
99+
throws IOException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
100+
LOGGER.debug("Using Glue Deserializer");
101+
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
102+
Parser<Message> parser = (Parser<Message>) type.getMethod(PROTOBUF_PARSER_METHOD).invoke(null);
91103

92-
private <T> T glueDeserializer(byte[] data, Class<T> type) throws IOException {
93-
try {
94-
95-
LOGGER.debug("Using Glue Deserializer");
96-
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
97-
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
98-
99-
// Seek one byte forward. Based on Glue Proto deserializer implementation
100-
codedInputStream.readUInt32();
104+
// Seek one byte forward. Based on Glue Proto deserializer implementation
105+
codedInputStream.readUInt32();
101106

102-
Message message = parser.parseFrom(codedInputStream);
103-
return type.cast(message);
104-
} catch (Exception e) {
105-
throw new IOException("Failed to deserialize Protobuf data.", e);
106-
}
107+
Message message = parser.parseFrom(codedInputStream);
108+
return type.cast(message);
107109
}
108110
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,117 @@ void shouldThrowExceptionWhenConvertingEmptyStringToChar(InputType inputType) {
459459
}
460460
}
461461

462+
@ParameterizedTest
463+
@MethodSource("inputTypes")
464+
void shouldHandleGlueSchemaMetadata(InputType inputType) throws IOException {
465+
// Given
466+
TestProductPojo product = new TestProductPojo(123, "Test Product", 99.99, null);
467+
String productJson = objectMapper.writeValueAsString(product);
468+
String base64Value = Base64.getEncoder().encodeToString(productJson.getBytes());
469+
470+
String kafkaJson = "{\n" +
471+
" \"eventSource\": \"aws:kafka\",\n" +
472+
" \"records\": {\n" +
473+
" \"test-topic-1\": [\n" +
474+
" {\n" +
475+
" \"topic\": \"test-topic-1\",\n" +
476+
" \"partition\": 0,\n" +
477+
" \"offset\": 15,\n" +
478+
" \"timestamp\": 1545084650987,\n" +
479+
" \"timestampType\": \"CREATE_TIME\",\n" +
480+
" \"key\": null,\n" +
481+
" \"value\": \"" + base64Value + "\",\n" +
482+
" \"headers\": [],\n" +
483+
" \"keySchemaMetadata\": {\n" +
484+
" \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" +
485+
" \"dataFormat\": \"PROTOBUF\"\n" +
486+
" },\n" +
487+
" \"valueSchemaMetadata\": {\n" +
488+
" \"schemaId\": \"87654321-4321-4321-4321-210987654321\",\n" +
489+
" \"dataFormat\": \"PROTOBUF\"\n" +
490+
" }\n" +
491+
" }\n" +
492+
" ]\n" +
493+
" }\n" +
494+
"}";
495+
Type type = TestUtils.createConsumerRecordsType(String.class, TestProductPojo.class);
496+
497+
// When
498+
ConsumerRecords<String, TestProductPojo> records;
499+
if (inputType == InputType.INPUT_STREAM) {
500+
ByteArrayInputStream inputStream = new ByteArrayInputStream(kafkaJson.getBytes());
501+
records = deserializer.fromJson(inputStream, type);
502+
} else {
503+
records = deserializer.fromJson(kafkaJson, type);
504+
}
505+
506+
// Then
507+
assertThat(records).isNotNull();
508+
TopicPartition tp = new TopicPartition("test-topic-1", 0);
509+
List<ConsumerRecord<String, TestProductPojo>> topicRecords = records.records(tp);
510+
assertThat(topicRecords).hasSize(1);
511+
512+
ConsumerRecord<String, TestProductPojo> consumerRecord = topicRecords.get(0);
513+
assertThat(consumerRecord.value()).isNotNull();
514+
assertThat(consumerRecord.value().getId()).isEqualTo(123);
515+
}
516+
517+
@ParameterizedTest
518+
@MethodSource("inputTypes")
519+
void shouldHandleConfluentSchemaMetadata(InputType inputType) throws IOException {
520+
// Given
521+
TestProductPojo product = new TestProductPojo(456, "Confluent Product", 199.99, null);
522+
String productJson = objectMapper.writeValueAsString(product);
523+
String base64Value = Base64.getEncoder().encodeToString(productJson.getBytes());
524+
525+
String kafkaJson = "{\n" +
526+
" \"eventSource\": \"aws:kafka\",\n" +
527+
" \"records\": {\n" +
528+
" \"test-topic-1\": [\n" +
529+
" {\n" +
530+
" \"topic\": \"test-topic-1\",\n" +
531+
" \"partition\": 0,\n" +
532+
" \"offset\": 15,\n" +
533+
" \"timestamp\": 1545084650987,\n" +
534+
" \"timestampType\": \"CREATE_TIME\",\n" +
535+
" \"key\": null,\n" +
536+
" \"value\": \"" + base64Value + "\",\n" +
537+
" \"headers\": [],\n" +
538+
" \"keySchemaMetadata\": {\n" +
539+
" \"schemaId\": \"123\",\n" +
540+
" \"dataFormat\": \"PROTOBUF\"\n" +
541+
" }\n" +
542+
" }\n" +
543+
" ]\n" +
544+
" }\n" +
545+
"}";
546+
Type type = TestUtils.createConsumerRecordsType(String.class, TestProductPojo.class);
547+
548+
// When
549+
ConsumerRecords<String, TestProductPojo> records;
550+
if (inputType == InputType.INPUT_STREAM) {
551+
ByteArrayInputStream inputStream = new ByteArrayInputStream(kafkaJson.getBytes());
552+
records = deserializer.fromJson(inputStream, type);
553+
} else {
554+
records = deserializer.fromJson(kafkaJson, type);
555+
}
556+
557+
// Then
558+
assertThat(records).isNotNull();
559+
TopicPartition tp = new TopicPartition("test-topic-1", 0);
560+
List<ConsumerRecord<String, TestProductPojo>> topicRecords = records.records(tp);
561+
assertThat(topicRecords).hasSize(1);
562+
563+
ConsumerRecord<String, TestProductPojo> consumerRecord = topicRecords.get(0);
564+
assertThat(consumerRecord.value()).isNotNull();
565+
assertThat(consumerRecord.value().getId()).isEqualTo(456);
566+
}
567+
462568
// Test implementation of AbstractKafkaDeserializer
463-
private static class TestDeserializer extends AbstractKafkaDeserializer {
569+
private static final class TestDeserializer extends AbstractKafkaDeserializer {
464570
@Override
465-
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
571+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType)
572+
throws IOException {
466573
return objectMapper.readValue(data, type);
467574
}
468575
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ void shouldThrowExceptionWhenTypeIsNotAvroSpecificRecord() {
3838
byte[] data = new byte[] { 1, 2, 3 };
3939

4040
// When/Then
41-
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
42-
.isInstanceOf(IOException.class)
43-
.hasMessageContaining("Unsupported type for Avro deserialization");
41+
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class,
42+
AbstractKafkaDeserializer.SchemaRegistryType.NONE))
43+
.isInstanceOf(IOException.class)
44+
.hasMessageContaining("Unsupported type for Avro deserialization");
4445
}
4546

4647
@Test
@@ -50,7 +51,8 @@ void shouldDeserializeValidAvroData() throws IOException {
5051
byte[] avroData = serializeAvro(product);
5152

5253
// When
53-
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);
54+
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class,
55+
AbstractKafkaDeserializer.SchemaRegistryType.NONE);
5456

5557
// Then
5658
assertThat(result).isNotNull();
@@ -65,9 +67,10 @@ void shouldThrowExceptionWhenDeserializingInvalidAvroData() {
6567
byte[] invalidAvroData = new byte[] { 1, 2, 3, 4, 5 };
6668

6769
// When/Then
68-
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
69-
.isInstanceOf(IOException.class)
70-
.hasMessageContaining("Failed to deserialize Avro data");
70+
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class,
71+
AbstractKafkaDeserializer.SchemaRegistryType.NONE))
72+
.isInstanceOf(IOException.class)
73+
.hasMessageContaining("Failed to deserialize Avro data");
7174
}
7275

7376
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ void shouldThrowExceptionWhenTypeIsNotSupportedForJson() {
4242
byte[] data = new byte[] { 1, 2, 3 };
4343

4444
// When/Then
45-
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
46-
.isInstanceOf(JsonParseException.class);
45+
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class,
46+
AbstractKafkaDeserializer.SchemaRegistryType.NONE))
47+
.isInstanceOf(JsonParseException.class);
4748
}
4849

4950
@Test
@@ -53,7 +54,8 @@ void shouldDeserializeValidJsonData() throws IOException {
5354
byte[] jsonData = objectMapper.writeValueAsBytes(product);
5455

5556
// When
56-
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);
57+
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class,
58+
AbstractKafkaDeserializer.SchemaRegistryType.NONE);
5759

5860
// Then
5961
assertThat(result).isNotNull();

0 commit comments

Comments
 (0)