Skip to content

Commit 4b1538d

Browse files
committed
Works
1 parent 3b42361 commit 4b1538d

File tree

10 files changed

+145
-27
lines changed

10 files changed

+145
-27
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
<dependency>
8181
<groupId>com.amazonaws</groupId>
8282
<artifactId>aws-lambda-java-events</artifactId>
83-
<version>2.2.6</version>
83+
<version>3.11.5</version>
8484
</dependency>
8585
<dependency>
8686
<groupId>com.amazonaws</groupId>

src/main/java/io/lumigo/core/SpansContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.clients.producer.RecordMetadata;
3636
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
3737
import org.apache.kafka.clients.producer.internals.Sender;
38+
import org.apache.kafka.common.serialization.Serializer;
3839
import org.pmw.tinylog.Logger;
3940
import software.amazon.awssdk.awscore.AwsResponse;
4041
import software.amazon.awssdk.core.SdkResponse;
@@ -452,8 +453,8 @@ public void addHttpSpan(
452453
httpSpans.add(httpSpan);
453454
}
454455

455-
public void addKafkaProduceSpan(Long startTime, ProducerMetadata producerMetadata, ProducerRecord<?, ?> record, RecordMetadata recordMetadata, Exception exception) {
456-
this.kafkaSpans.add(KafkaSpan.createProduce(this.baseSpan, startTime, producerMetadata, record, recordMetadata, exception));
456+
public <K, V> void addKafkaProduceSpan(Long startTime, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata producerMetadata, ProducerRecord<K, V> record, RecordMetadata recordMetadata, Exception exception) {
457+
this.kafkaSpans.add(KafkaSpan.createProduce(this.baseSpan, startTime, keySerializer, valueSerializer, producerMetadata, record, recordMetadata, exception));
457458
}
458459

459460
public void addKafkaConsumeSpan(Long startTime, KafkaConsumer<?, ?> consumer, ConsumerMetadata consumerMetadata, ConsumerRecords<?, ?> consumerRecords) {

src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import net.bytebuddy.asm.Advice;
1010
import net.bytebuddy.description.type.TypeDescription;
1111
import net.bytebuddy.matcher.ElementMatcher;
12-
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
1312
import org.apache.kafka.clients.producer.Callback;
1413
import org.apache.kafka.clients.producer.ProducerRecord;
1514
import org.apache.kafka.clients.producer.RecordMetadata;
1615
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
16+
import org.apache.kafka.common.serialization.Serializer;
1717
import org.pmw.tinylog.Logger;
1818

1919
import java.util.UUID;
@@ -49,13 +49,15 @@ public static class ApacheKafkaProducerAdvice {
4949
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
5050

5151
@Advice.OnMethodEnter
52-
public static void methodEnter(
52+
public static <K, V> void methodEnter(
5353
@Advice.FieldValue("metadata") ProducerMetadata metadata,
54-
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
54+
@Advice.FieldValue("keySerializer") Serializer<K> keySerializer,
55+
@Advice.FieldValue("valueSerializer") Serializer<V> valueSerializer,
56+
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<K, V> record,
5557
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
5658
try {
5759
System.out.println("Inside ApacheKafkaProducerAdvice.methodEnter()");
58-
callback = new KafkaProducerCallback(callback, metadata, record, System.currentTimeMillis());
60+
callback = new KafkaProducerCallback<>(callback, keySerializer, valueSerializer, metadata, record, System.currentTimeMillis());
5961

6062
// Try to inject correlation id to the kafka record headers
6163
record.headers().add("lumigoMessageId", UUID.randomUUID().toString().substring(0, 10).getBytes());
@@ -65,10 +67,12 @@ public static void methodEnter(
6567
}
6668

6769
@AllArgsConstructor
68-
public static class KafkaProducerCallback implements Callback {
70+
public static class KafkaProducerCallback<K, V> implements Callback {
6971
private final Callback callback;
72+
private final Serializer<K> keySerializer;
73+
private final Serializer<V> valueSerializer;
7074
private final ProducerMetadata producerMetadata;
71-
private final ProducerRecord<?, ?> record;
75+
private final ProducerRecord<K, V> record;
7276
private final long startTime;
7377

7478
@Override
@@ -80,7 +84,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
8084
System.out.println("Inside KafkaProducerCallback.onCompletion()");
8185

8286
Logger.info("Handling kafka request {} from host {}", record.hashCode());
83-
spansContainer.addKafkaProduceSpan(startTime, producerMetadata, record, recordMetadata, exception);
87+
spansContainer.addKafkaProduceSpan(startTime, keySerializer, valueSerializer, producerMetadata, record, recordMetadata, exception);
8488
handled.put(record.hashCode(), true);
8589
} catch (Throwable error) {
8690
Logger.error(error, "Failed to add kafka span");

src/main/java/io/lumigo/core/parsers/event/EventParserFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
44
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
55
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
67
import org.pmw.tinylog.Logger;
78

89
public interface EventParserFactory {
@@ -14,6 +15,8 @@ static Object parseEvent(Object event) {
1415
return new SnsEventParser().parse((SNSEvent) event);
1516
} else if (event instanceof SQSEvent) {
1617
return new SqsEventParser().parse((SQSEvent) event);
18+
} else if (event instanceof KafkaEvent) {
19+
return new KafkaEventParser().parse((KafkaEvent) event);
1720
} else {
1821
return event;
1922
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.lumigo.core.parsers.event;
2+
3+
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class KafkaEventParser implements IEventParser<KafkaEvent> {
10+
@Override
11+
public Object parse(KafkaEvent event) {
12+
List<ParsedKafkaEvent.Record> records = new ArrayList<>();
13+
for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry : event.getRecords().entrySet()) {
14+
String messageId = null;
15+
for (KafkaEvent.KafkaEventRecord record : entry.getValue()) {
16+
for (Map<String, byte[]> headers : record.getHeaders()) {
17+
if (headers.containsKey("lumigoMessageId")) {
18+
messageId = new String(headers.get("lumigoMessageId"));
19+
break;
20+
}
21+
}
22+
23+
records.add(
24+
ParsedKafkaEvent.Record.builder()
25+
.topic(record.getTopic())
26+
.partition(record.getPartition())
27+
.offset(record.getOffset())
28+
.key(record.getKey())
29+
.value(record.getValue())
30+
.headers(record.getHeaders())
31+
.messageId(messageId)
32+
.build());
33+
}
34+
}
35+
36+
return ParsedKafkaEvent.builder()
37+
.bootstrapServers(event.getBootstrapServers())
38+
.records(records)
39+
.build();
40+
}
41+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.lumigo.core.parsers.event;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
7+
import java.util.List;
8+
import java.util.Map;
9+
10+
@AllArgsConstructor
11+
@Builder(toBuilder = true)
12+
@Data(staticConstructor = "of")
13+
public class ParsedKafkaEvent {
14+
private String bootstrapServers;
15+
private List<ParsedKafkaEvent.Record> records;
16+
17+
@AllArgsConstructor
18+
@Builder(toBuilder = true)
19+
@Data(staticConstructor = "of")
20+
public static class Record {
21+
private String topic;
22+
private Integer partition;
23+
private Long offset;
24+
private String key;
25+
private String value;
26+
private List<Map<String, byte[]>> headers;
27+
private String messageId;
28+
}
29+
}

src/main/java/io/lumigo/core/utils/AwsSdkV2Utils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
@UtilityClass
1111
public class AwsSdkV2Utils {
1212

13-
public String calculateItemHash(
14-
Map<String, software.amazon.awssdk.services.dynamodb.model.AttributeValue> item) {
13+
public String calculateItemHash(Map<String, AttributeValue> item) {
1514
Map<String, Object> simpleMap = AwsSdkV2Utils.convertAttributeMapToSimpleMap(item);
1615
return StringUtils.buildMd5Hash(JsonUtils.getObjectAsJsonString(simpleMap));
1716
}

src/main/java/io/lumigo/core/utils/AwsUtils.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package io.lumigo.core.utils;
22

3-
import static io.lumigo.core.utils.StringUtils.dynamodbItemToHash;
4-
53
import com.amazonaws.services.lambda.runtime.events.*;
4+
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue;
65
import com.fasterxml.jackson.annotation.JsonInclude;
6+
import io.lumigo.core.parsers.event.KafkaEventParser;
7+
import io.lumigo.core.parsers.event.ParsedKafkaEvent;
78
import io.lumigo.models.Span;
8-
import java.util.Collections;
9-
import java.util.List;
10-
import java.util.Objects;
9+
10+
import java.util.*;
1111
import java.util.regex.Matcher;
1212
import java.util.regex.Pattern;
1313
import java.util.stream.Collectors;
1414
import lombok.Data;
1515
import lombok.NoArgsConstructor;
1616
import org.pmw.tinylog.Logger;
1717

18+
import static io.lumigo.core.utils.StringUtils.buildMd5Hash;
19+
1820
public class AwsUtils {
1921

2022
public static final String COLD_START_KEY = "LUMIGO_COLD_START_KEY";
@@ -155,6 +157,10 @@ public static TriggeredBy extractTriggeredByFromEvent(Object event) {
155157
triggeredBy.setTriggeredBy("lex");
156158
} else if (event instanceof CognitoEvent) {
157159
triggeredBy.setTriggeredBy("cognito");
160+
} else if (event instanceof KafkaEvent) {
161+
triggeredBy.setTriggeredBy("kafka");
162+
Object parsed = new KafkaEventParser().parse((KafkaEvent) event);
163+
triggeredBy.setMessageIds(((ParsedKafkaEvent) parsed).getRecords().stream().map(ParsedKafkaEvent.Record::getMessageId).collect(Collectors.toList()));
158164
} else {
159165
Logger.info(
160166
"Failed to found relevant triggered by found for event {} ",
@@ -282,10 +288,44 @@ private static String extractMessageIdFromDynamodbRecord(
282288
DynamodbEvent.DynamodbStreamRecord record) {
283289
if (record.getEventName() == null) return null;
284290
if (record.getEventName().equals("INSERT")) {
285-
return dynamodbItemToHash(record.getDynamodb().getNewImage());
291+
return calculateItemHash(record.getDynamodb().getNewImage());
286292
} else if (record.getEventName().equals("MODIFY")
287293
|| record.getEventName().equals("REMOVE")) {
288-
return dynamodbItemToHash(record.getDynamodb().getKeys());
294+
return calculateItemHash(record.getDynamodb().getKeys());
295+
}
296+
return null;
297+
}
298+
299+
private static String calculateItemHash(Map<String, AttributeValue> item) {
300+
Map<String, Object> simpleMap = convertAttributeMapToSimpleMap(item);
301+
return StringUtils.buildMd5Hash(JsonUtils.getObjectAsJsonString(simpleMap));
302+
}
303+
304+
private static Map<String, Object> convertAttributeMapToSimpleMap(
305+
Map<String, AttributeValue> attributeValueMap) {
306+
Map<String, Object> simpleMap = new HashMap<>();
307+
attributeValueMap.forEach(
308+
(key, value) -> simpleMap.put(key, attributeValueToObject(value)));
309+
return simpleMap;
310+
}
311+
312+
private static Object attributeValueToObject(AttributeValue value) {
313+
if (value == null) {
314+
return null;
315+
} else if (value.getS() != null) {
316+
return value.getS();
317+
} else if (value.getN() != null) {
318+
return value.getN();
319+
} else if (value.getBOOL() != null) {
320+
return value.getBOOL();
321+
} else if (value.getL() != null && !value.getL().isEmpty()) {
322+
List<Object> list = new ArrayList<>();
323+
for (AttributeValue v : value.getL()) {
324+
list.add(attributeValueToObject(v));
325+
}
326+
return list;
327+
} else if (value.getM() != null && !value.getM().isEmpty()) {
328+
return convertAttributeMapToSimpleMap(value.getM());
289329
}
290330
return null;
291331
}

src/main/java/io/lumigo/models/KafkaSpan.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
1313
import org.apache.kafka.common.header.Header;
1414
import org.apache.kafka.common.header.Headers;
15+
import org.apache.kafka.common.serialization.Serializer;
1516

1617
import java.util.*;
1718
import java.util.stream.Collectors;
@@ -81,8 +82,8 @@ public static class KafkaProducerInfo implements KafkaInfo {
8182
@Builder(toBuilder = true)
8283
@Data(staticConstructor = "of")
8384
public static class KafkaProducerRecord {
84-
private String key;
85-
private String value;
85+
private byte[] key;
86+
private byte[] value;
8687
private Map<String, byte[]> headers;
8788
}
8889

@@ -127,12 +128,12 @@ public static class KafkaConsumerRecord {
127128
private Map<String, byte[]> headers;
128129
}
129130

130-
public static KafkaSpan createProduce(Span baseSpan, Long startTime, ProducerMetadata producerMetadata, ProducerRecord<?, ?> record, RecordMetadata recordMetadata, Exception exception) {
131+
public static <K, V> KafkaSpan createProduce(Span baseSpan, Long startTime, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata producerMetadata, ProducerRecord<K, V> record, RecordMetadata recordMetadata, Exception exception) {
131132
List<String> bootstrapServers = producerMetadata.fetch().nodes().stream().map(node -> node.host() + ":" + node.port()).collect(Collectors.toList());
132133
String topic = record.topic();
133134
KafkaProducerRecord producerRecord = KafkaProducerRecord.builder()
134-
.key(record.key().toString()) // TODO USE serializer
135-
.value(record.value().toString()) // TODO USE serializer
135+
.key(keySerializer.serialize(record.topic(), record.headers(), record.key()))
136+
.value(valueSerializer.serialize(record.topic(), record.headers(), record.value()))
136137
.headers(extractHeaders(record.headers()))
137138
.build();
138139

src/test/java/infa/AwsLambdaEventGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public S3Event s3Event() {
2525
when(s3EventNotificationRecord.getS3()).thenReturn(s3Entity);
2626
when(s3EventNotificationRecord.getS3().getBucket()).thenReturn(bucketEntity);
2727
when(s3EventNotificationRecord.getS3().getBucket().getArn()).thenReturn("s3-arn");
28-
when(s3Event.getRecords()).thenReturn(Collections.singletonList(s3EventNotificationRecord));
28+
// when(s3Event.getRecords()).thenReturn(Collections.singletonList(s3EventNotificationRecord));
2929
return s3Event;
3030
}
3131

@@ -55,12 +55,12 @@ public DynamodbEvent dynamodbEvent() {
5555
when(dynamodbEvent.getRecords()).thenReturn(List.of(record, record2));
5656
when(record.getEventSourceARN()).thenReturn("dynamodb-arn");
5757
when(record.getEventName()).thenReturn("INSERT");
58-
when(record.getDynamodb()).thenReturn(streamRecord);
58+
// when(record.getDynamodb()).thenReturn(streamRecord);
5959
when(streamRecord.getApproximateCreationDateTime()).thenReturn(new Date(769554000));
6060
when(streamRecord.getNewImage())
6161
.thenReturn(Collections.singletonMap("k", new AttributeValue("v")));
6262
when(record2.getEventName()).thenReturn("MODIFY");
63-
when(record2.getDynamodb()).thenReturn(streamRecord2);
63+
// when(record2.getDynamodb()).thenReturn(streamRecord2);
6464
when(streamRecord2.getKeys())
6565
.thenReturn(Collections.singletonMap("k2", new AttributeValue("v2")));
6666
return dynamodbEvent;

0 commit comments

Comments
 (0)