Skip to content

Commit 6bf9473

Browse files
ShashankAWSmsailes
authored andcommitted
Added event class MskFirehoseEvent.java for Firehose Lambda transformation when MSK is the source (aws#490)
* Create MskFirehoseEvent.java * Create MSKFirehoseResponse.java
1 parent a59b548 commit 6bf9473

File tree

9 files changed

+238
-0
lines changed

9 files changed

+238
-0
lines changed

aws-lambda-java-events/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
* `KinesisFirehoseEvent`
4747
* `LambdaDestinationEvent`
4848
* `LexEvent`
49+
* `MSKFirehoseEvent`
50+
* `MSKFirehoseResponse`
4951
* `RabbitMQEvent`
5052
* `S3BatchEvent`
5153
* `S3BatchResponse`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.amazonaws.services.lambda.runtime.events;
7+
8+
import java.nio.ByteBuffer;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
import lombok.AllArgsConstructor;
13+
import lombok.Builder;
14+
import lombok.Data;
15+
import lombok.NoArgsConstructor;
16+
17+
@Data
18+
@Builder(setterPrefix = "with")
19+
@NoArgsConstructor
20+
@AllArgsConstructor
21+
22+
public class MSKFirehoseEvent {
23+
24+
private String invocationId;
25+
26+
private String deliveryStreamArn;
27+
28+
private String sourceMSKArn;
29+
30+
private String region;
31+
32+
private List<Record> records;
33+
34+
@Data
35+
@Builder(setterPrefix = "with")
36+
@NoArgsConstructor
37+
@AllArgsConstructor
38+
public static class Record {
39+
40+
private ByteBuffer kafkaRecordValue;
41+
42+
private String recordId;
43+
44+
private Long approximateArrivalEpoch;
45+
46+
private Long approximateArrivalTimestamp;
47+
48+
private Map<String, String> mskRecordMetadata;
49+
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.amazonaws.services.lambda.runtime.events;
7+
8+
import java.nio.ByteBuffer;
9+
import java.util.List;
10+
11+
import lombok.AllArgsConstructor;
12+
import lombok.Builder;
13+
import lombok.Data;
14+
import lombok.NoArgsConstructor;
15+
16+
/**
17+
* Response model for Amazon Data Firehose Lambda transformation with MSK as a source.
18+
* [+] Amazon Data Firehose Data Transformation - Data Transformation and Status Model - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-status-model">...</a>
19+
* OK : Indicates that processing of this item succeeded.
20+
* ProcessingFailed : Indicate that the processing of this item failed.
21+
* Dropped : Indicates that this item should be silently dropped
22+
*/
23+
24+
@Data
25+
@Builder(setterPrefix = "with")
26+
@NoArgsConstructor
27+
@AllArgsConstructor
28+
29+
public class MSKFirehoseResponse {
30+
31+
public enum Result {
32+
33+
/**
34+
* Indicates that processing of this item succeeded.
35+
*/
36+
Ok,
37+
38+
/**
39+
* Indicate that the processing of this item failed
40+
*/
41+
ProcessingFailed,
42+
43+
/**
44+
* Indicates that this item should be silently dropped
45+
*/
46+
Dropped
47+
}
48+
public List<Record> records;
49+
50+
@Data
51+
@NoArgsConstructor
52+
@Builder(setterPrefix = "with")
53+
@AllArgsConstructor
54+
55+
public static class Record {
56+
public String recordId;
57+
public Result result;
58+
public ByteBuffer kafkaRecordValue;
59+
60+
}
61+
}

aws-lambda-java-tests/src/main/java/com/amazonaws/services/lambda/runtime/tests/EventLoader.java

+4
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public static LexEvent loadLexEvent(String filename) {
9797
return loadEvent(filename, LexEvent.class);
9898
}
9999

100+
public static MSKFirehoseEvent loadMSKFirehoseEvent(String filename) {
101+
return loadEvent(filename, MSKFirehoseEvent.class);
102+
}
103+
100104
public static S3Event loadS3Event(String filename) {
101105
return loadEvent(filename, S3Event.class);
102106
}

aws-lambda-java-tests/src/test/java/com/amazonaws/services/lambda/runtime/tests/EventLoaderTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,19 @@ public void testLoadKinesisFirehoseEvent() {
147147
assertThat(event.getRecords().get(0).getData().array()).asString().isEqualTo("Hello, this is a test 123.");
148148
}
149149

150+
@Test
151+
public void testLoadMSKFirehoseEvent() {
152+
MSKFirehoseEvent event = EventLoader.loadMSKFirehoseEvent("msk_firehose_event.json");
153+
154+
assertThat(event).isNotNull();
155+
assertThat(event.getSourceMSKArn()).isEqualTo("arn:aws:kafka:EXAMPLE");
156+
assertThat(event.getDeliveryStreamArn()).isEqualTo("arn:aws:firehose:EXAMPLE");
157+
assertThat(event.getRecords()).hasSize(1);
158+
assertThat(event.getRecords().get(0).getKafkaRecordValue().array()).asString().isEqualTo("{\"Name\":\"Hello World\"}");
159+
assertThat(event.getRecords().get(0).getApproximateArrivalTimestamp()).asString().isEqualTo("1716369573887");
160+
assertThat(event.getRecords().get(0).getMskRecordMetadata()).asString().isEqualTo("{offset=0, partitionId=1, approximateArrivalTimestamp=1716369573887}");
161+
}
162+
150163
@Test
151164
public void testLoadS3Event() {
152165
S3Event event = EventLoader.loadS3Event("s3_event.json");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"invocationId": "12345621-4787-0000-a418-36e56Example",
3+
"sourceMSKArn": "arn:aws:kafka:EXAMPLE",
4+
"deliveryStreamArn": "arn:aws:firehose:EXAMPLE",
5+
"region": "us-east-1",
6+
"records": [
7+
{
8+
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
9+
"approximateArrivalTimestamp": 1716369573887,
10+
"mskRecordMetadata": {
11+
"offset": "0",
12+
"partitionId": "1",
13+
"approximateArrivalTimestamp": 1716369573887
14+
},
15+
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ=="
16+
}
17+
]
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package example;
7+
8+
import com.amazonaws.services.lambda.runtime.Context;
9+
import com.amazonaws.services.lambda.runtime.RequestHandler;
10+
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse;
11+
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent;
12+
import org.json.JSONObject;
13+
14+
import java.nio.ByteBuffer;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
/**
19+
* A sample MSKFirehoseEvent handler
20+
* For more information see the developer guide - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html">...</a>
21+
*/
22+
public class MSKFirehoseEventHandler implements RequestHandler<MSKFirehoseEvent, MSKFirehoseResponse> {
23+
24+
@Override
25+
public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) {
26+
List<MSKFirehoseResponse.Record> records = new ArrayList<>();
27+
28+
for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) {
29+
String recordData = new String(record.getKafkaRecordValue().array());
30+
// Your business logic
31+
JSONObject jsonObject = new JSONObject(recordData);
32+
records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString())));
33+
}
34+
return new MSKFirehoseResponse(records);
35+
}
36+
private ByteBuffer encode(String content) {
37+
return ByteBuffer.wrap(content.getBytes());
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package example;
7+
8+
import com.amazonaws.services.lambda.runtime.Context;
9+
import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
10+
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent;
11+
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse;
12+
import org.junit.jupiter.api.Assertions;
13+
import org.junit.jupiter.params.ParameterizedTest;
14+
15+
import static java.nio.charset.StandardCharsets.UTF_8;
16+
17+
public class MSKFirehoseEventHandlerTest {
18+
19+
private Context context; // intentionally null as it's not used in the test
20+
21+
@ParameterizedTest
22+
@Event(value = "event.json", type = MSKFirehoseEvent.class)
23+
public void testEventHandler(MSKFirehoseEvent event) {
24+
MSKFirehoseEventHandler Sample = new MSKFirehoseEventHandler();
25+
MSKFirehoseResponse response = Sample.handleRequest(event, context);
26+
27+
String expectedString = "{\"Name\":\"Hello World\"}";
28+
MSKFirehoseResponse.Record firstRecord = response.getRecords().get(0);
29+
Assertions.assertEquals(expectedString, UTF_8.decode(firstRecord.getKafkaRecordValue()).toString());
30+
Assertions.assertEquals(MSKFirehoseResponse.Result.Ok, firstRecord.getResult());
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"invocationId": "12345621-4787-0000-a418-36e56Example",
3+
"sourceMSKArn": "",
4+
"deliveryStreamArn": "",
5+
"region": "us-east-1",
6+
"records": [
7+
{
8+
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
9+
"approximateArrivalTimestamp": 1716369573887,
10+
"mskRecordMetadata": {
11+
"offset": "0",
12+
"partitionId": "1",
13+
"approximateArrivalTimestamp": 1716369573887
14+
},
15+
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ=="
16+
}
17+
]
18+
}

0 commit comments

Comments
 (0)