-
Notifications
You must be signed in to change notification settings - Fork 239
Added event class MskFirehoseEvent.java for Firehose Lambda transformation when MSK is the source #490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added event class MskFirehoseEvent.java for Firehose Lambda transformation when MSK is the source #490
Changes from 13 commits
ec8dca0
7df3645
f6f4a60
51b2640
7d689bb
eb08bbe
5eb7042
a720f87
df1e0e4
57a9a81
5ba689e
f240a62
5abcb06
f29f105
24e1b10
feec28f
6fa478e
243253b
4505f6d
cb37072
8239c4a
5769d76
e12da26
6fd7282
4c14fec
b3b5f61
62d58c1
641b3f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package com.amazonaws.services.lambda.runtime.events; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
||
/** | ||
* Created by vermshas on 6/28/24. | ||
* { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this example and link the docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the example, I am not able to find any document stating the specific event. |
||
* "invocationId": "", | ||
* "sourceMSKArn": "", | ||
* "deliveryStreamArn": "", | ||
* "region": "us-east-1", | ||
* "records": [ | ||
* { | ||
* "recordId": "00000000000000000000000000000000000000000000000000000000000000", | ||
* "approximateArrivalTimestamp": 1716369573887, | ||
* "mskRecordMetadata": { | ||
* "offset": "0", | ||
* "partitionId": "1", | ||
* "approximateArrivalTimestamp": 1716369573887 | ||
* }, | ||
* "kafkaRecordValue": "" | ||
* } | ||
* ] | ||
* } | ||
*/ | ||
|
||
@Data | ||
@Builder(setterPrefix = "with") | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
|
||
public class MSKFirehoseEvent { | ||
|
||
private String invocationId; | ||
|
||
private String deliveryStreamArn; | ||
|
||
private String sourceMSKArn; | ||
|
||
private String region; | ||
|
||
private List<Record> records; | ||
|
||
@Data | ||
@Builder(setterPrefix = "with") | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
public static class Record { | ||
|
||
private ByteBuffer kafkaRecordValue; | ||
|
||
private String recordId; | ||
|
||
private Long approximateArrivalEpoch; | ||
|
||
private Long approximateArrivalTimestamp; | ||
|
||
private Map<String, String> mskRecordMetadata; | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package com.amazonaws.services.lambda.runtime.events; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
|
||
import lombok.*; | ||
msailes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Response model for Amazon Data Firehose Lambda transformation with MSK as a source. | ||
msailes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
|
||
@Data | ||
@Builder(setterPrefix = "with") | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
|
||
public class MSKFirehoseResponse { | ||
|
||
public enum Result { | ||
|
||
/** | ||
msailes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Indicates that processing of this item succeeded. | ||
*/ | ||
Ok, | ||
|
||
/** | ||
* Indicate that the processing of this item failed | ||
*/ | ||
ProcessingFailed, | ||
|
||
/** | ||
* Indicates that this item should be silently dropped | ||
*/ | ||
Dropped | ||
} | ||
public List<Record> records; | ||
|
||
@Data | ||
@Builder(setterPrefix = "with") | ||
@NoArgsConstructor | ||
|
||
public static class Record { | ||
public String recordId; | ||
public Result result; | ||
public ByteBuffer kafkaRecordValue; | ||
|
||
public Record(String recordId, Result result, ByteBuffer kafkaRecordValue) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need this constructor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the constructor by using Lombok. |
||
super(); | ||
this.recordId = recordId; | ||
this.result = result; | ||
this.kafkaRecordValue = kafkaRecordValue; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
msailes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"invocationId": "12345621-4787-0000-a418-36e56Example", | ||
"sourceMSKArn": "", | ||
"deliveryStreamArn": "", | ||
"region": "us-east-1", | ||
"records": [ | ||
{ | ||
"recordId": "00000000000000000000000000000000000000000000000000000000000000", | ||
"approximateArrivalTimestamp": 1716369573887, | ||
"mskRecordMetadata": { | ||
"offset": "0", | ||
"partitionId": "1", | ||
"approximateArrivalTimestamp": 1716369573887 | ||
}, | ||
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package example; | ||
|
||
import com.amazonaws.services.lambda.runtime.Context; | ||
import com.amazonaws.services.lambda.runtime.RequestHandler; | ||
import model.MSKFirehoseResponse; | ||
import events.MSKFirehoseEvent; | ||
import org.json.JSONObject; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* A sample MSKFirehoseEvent handler | ||
* For more information see the developer guide - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html">...</a> | ||
*/ | ||
public class Sample implements RequestHandler<MSKFirehoseEvent, MSKFirehoseResponse> { | ||
|
||
@Override | ||
public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) { | ||
List<MSKFirehoseResponse.Record> records = new ArrayList<>(); | ||
|
||
for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) { | ||
String recordData = new String(record.getKafkaRecordValue().array()); | ||
// Your business logic | ||
JSONObject jsonObject = new JSONObject(recordData); | ||
records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString()))); | ||
} | ||
return new MSKFirehoseResponse(records); | ||
} | ||
private ByteBuffer encode(String content) { | ||
return ByteBuffer.wrap(content.getBytes()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package example; | ||
|
||
import com.amazonaws.services.lambda.runtime.Context; | ||
import com.amazonaws.services.lambda.runtime.tests.annotations.Event; | ||
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent; | ||
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
|
||
public class MSKFirehoseEventHandlerTest { | ||
|
||
private Context context; // intentionally null as it's not used in the test | ||
|
||
@ParameterizedTest | ||
@Event(value = "event.json", type = MSKFirehoseEvent.class) | ||
public void testEventHandler(MSKFirehoseEvent event) { | ||
Sample Sample = new Sample(); | ||
MSKFirehoseResponse response = Sample.handleRequest(event, context); | ||
|
||
String expectedString = "{\"Name\":\"Hello World\"}"; | ||
MSKFirehoseResponse.Record firstRecord = response.getRecords().get(0); | ||
Assertions.assertEquals(expectedString, UTF_8.decode(firstRecord.getKafkaRecordValue()).toString()); | ||
Assertions.assertEquals(MSKFirehoseResponse.Result.Ok, firstRecord.getResult()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"invocationId": "12345621-4787-0000-a418-36e56Example", | ||
"sourceMSKArn": "", | ||
"deliveryStreamArn": "", | ||
"region": "us-east-1", | ||
"records": [ | ||
{ | ||
"recordId": "00000000000000000000000000000000000000000000000000000000000000", | ||
"approximateArrivalTimestamp": 1716369573887, | ||
"mskRecordMetadata": { | ||
"offset": "0", | ||
"partitionId": "1", | ||
"approximateArrivalTimestamp": 1716369573887 | ||
}, | ||
"kafkaRecordValue": "eyJOYW1lIjoiSGVsbG8gV29ybGQifQ==" | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed saw it added to some classes and thought its required. Added copyright/license text to MSKFirehoseEvent.java and MSKFirehoseResponse.java. Do we need to add license text to sample as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, to all source code, please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added license text to samples as well.