forked from aws/aws-lambda-java-libs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMSKFirehoseEventHandler.java
39 lines (33 loc) · 1.51 KB
/
MSKFirehoseEventHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseResponse;
import com.amazonaws.services.lambda.runtime.events.MSKFirehoseEvent;
import org.json.JSONObject;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* A sample MSKFirehoseEvent handler
* For more information see the developer guide - <a href="https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html">...</a>
*/
public class MSKFirehoseEventHandler implements RequestHandler<MSKFirehoseEvent, MSKFirehoseResponse> {
@Override
public MSKFirehoseResponse handleRequest(MSKFirehoseEvent MSKFirehoseEvent, Context context) {
List<MSKFirehoseResponse.Record> records = new ArrayList<>();
for (MSKFirehoseEvent.Record record : MSKFirehoseEvent.getRecords()) {
String recordData = new String(record.getKafkaRecordValue().array());
// Your business logic
JSONObject jsonObject = new JSONObject(recordData);
records.add(new MSKFirehoseResponse.Record(record.getRecordId(), MSKFirehoseResponse.Result.Ok, encode(jsonObject.toString())));
}
return new MSKFirehoseResponse(records);
}
private ByteBuffer encode(String content) {
return ByteBuffer.wrap(content.getBytes());
}
}