Skip to content

Commit 1b1bba6

Browse files
committed
Checks
1 parent 4b1538d commit 1b1bba6

File tree

11 files changed

+223
-140
lines changed

11 files changed

+223
-140
lines changed

src/main/java/io/lumigo/core/SpansContainer.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,17 @@
1717
import java.io.*;
1818
import java.util.*;
1919
import java.util.concurrent.Callable;
20-
21-
import net.bytebuddy.asm.Advice;
2220
import org.apache.http.Header;
2321
import org.apache.http.HttpEntity;
2422
import org.apache.http.HttpResponse;
2523
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
2624
import org.apache.http.client.methods.HttpUriRequest;
27-
import org.apache.kafka.clients.ApiVersions;
28-
import org.apache.kafka.clients.Metadata;
2925
import org.apache.kafka.clients.consumer.ConsumerRecords;
3026
import org.apache.kafka.clients.consumer.KafkaConsumer;
3127
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
32-
import org.apache.kafka.clients.producer.Callback;
33-
import org.apache.kafka.clients.producer.ProducerConfig;
3428
import org.apache.kafka.clients.producer.ProducerRecord;
3529
import org.apache.kafka.clients.producer.RecordMetadata;
3630
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
37-
import org.apache.kafka.clients.producer.internals.Sender;
3831
import org.apache.kafka.common.serialization.Serializer;
3932
import org.pmw.tinylog.Logger;
4033
import software.amazon.awssdk.awscore.AwsResponse;
@@ -453,12 +446,34 @@ public void addHttpSpan(
453446
httpSpans.add(httpSpan);
454447
}
455448

456-
public <K, V> void addKafkaProduceSpan(Long startTime, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata producerMetadata, ProducerRecord<K, V> record, RecordMetadata recordMetadata, Exception exception) {
457-
this.kafkaSpans.add(KafkaSpan.createProduce(this.baseSpan, startTime, keySerializer, valueSerializer, producerMetadata, record, recordMetadata, exception));
458-
}
459-
460-
public void addKafkaConsumeSpan(Long startTime, KafkaConsumer<?, ?> consumer, ConsumerMetadata consumerMetadata, ConsumerRecords<?, ?> consumerRecords) {
461-
this.kafkaSpans.add(KafkaSpan.createConsume(this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
449+
public <K, V> void addKafkaProduceSpan(
450+
Long startTime,
451+
Serializer<K> keySerializer,
452+
Serializer<V> valueSerializer,
453+
ProducerMetadata producerMetadata,
454+
ProducerRecord<K, V> record,
455+
RecordMetadata recordMetadata,
456+
Exception exception) {
457+
this.kafkaSpans.add(
458+
KafkaSpan.createProduce(
459+
this.baseSpan,
460+
startTime,
461+
keySerializer,
462+
valueSerializer,
463+
producerMetadata,
464+
record,
465+
recordMetadata,
466+
exception));
467+
}
468+
469+
public void addKafkaConsumeSpan(
470+
Long startTime,
471+
KafkaConsumer<?, ?> consumer,
472+
ConsumerMetadata consumerMetadata,
473+
ConsumerRecords<?, ?> consumerRecords) {
474+
this.kafkaSpans.add(
475+
KafkaSpan.createConsume(
476+
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
462477
}
463478

464479
private static String extractHeaders(Map<String, String> headers) {

src/main/java/io/lumigo/core/instrumentation/agent/Loader.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,32 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
1515
new AmazonHttpClientInstrumentation();
1616
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
1717
new AmazonHttpClientV2Instrumentation();
18-
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation = new ApacheKafkaProducerInstrumentation();
19-
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation = new ApacheKafkaConsumerInstrumentation();
18+
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation =
19+
new ApacheKafkaProducerInstrumentation();
20+
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation =
21+
new ApacheKafkaConsumerInstrumentation();
2022
AgentBuilder builder =
2123
new AgentBuilder.Default()
2224
.disableClassFormatChanges()
2325
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
2426
.ignore(
2527
not(nameStartsWith("com.amazonaws.http.AmazonHttpClient"))
26-
.and(not(nameStartsWith("org.apache.http.impl.client")))
27-
.and(not(nameStartsWith(AmazonHttpClientV2Instrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
28-
.and(not(nameStartsWith(ApacheKafkaProducerInstrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
29-
.and(not(nameStartsWith(ApacheKafkaConsumerInstrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
30-
)
28+
.and(not(nameStartsWith("org.apache.http.impl.client")))
29+
.and(
30+
not(
31+
nameStartsWith(
32+
AmazonHttpClientV2Instrumentation
33+
.INSTRUMENTATION_PACKAGE_PREFIX)))
34+
.and(
35+
not(
36+
nameStartsWith(
37+
ApacheKafkaProducerInstrumentation
38+
.INSTRUMENTATION_PACKAGE_PREFIX)))
39+
.and(
40+
not(
41+
nameStartsWith(
42+
ApacheKafkaConsumerInstrumentation
43+
.INSTRUMENTATION_PACKAGE_PREFIX))))
3144
.type(apacheHttpInstrumentation.getTypeMatcher())
3245
.transform(apacheHttpInstrumentation.getTransformer())
3346
.type(amazonHttpClientInstrumentation.getTypeMatcher())

src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
public class AmazonHttpClientV2Instrumentation implements LumigoInstrumentationApi {
2121

22-
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
22+
public static final String INSTRUMENTATION_PACKAGE_PREFIX =
23+
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
2324

2425
@Override
2526
public ElementMatcher<TypeDescription> getTypeMatcher() {

src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaConsumerInstrumentation.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,20 @@
11
package io.lumigo.core.instrumentation.impl;
22

3-
import com.amazonaws.Response;
3+
import static net.bytebuddy.matcher.ElementMatchers.*;
4+
45
import io.lumigo.core.SpansContainer;
56
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
67
import io.lumigo.core.instrumentation.agent.Loader;
78
import io.lumigo.core.utils.LRUCache;
8-
import lombok.AllArgsConstructor;
99
import net.bytebuddy.agent.builder.AgentBuilder;
1010
import net.bytebuddy.asm.Advice;
1111
import net.bytebuddy.description.type.TypeDescription;
1212
import net.bytebuddy.matcher.ElementMatcher;
1313
import org.apache.kafka.clients.consumer.ConsumerRecords;
1414
import org.apache.kafka.clients.consumer.KafkaConsumer;
1515
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
16-
import org.apache.kafka.clients.producer.Callback;
17-
import org.apache.kafka.clients.producer.ProducerRecord;
18-
import org.apache.kafka.clients.producer.RecordMetadata;
1916
import org.pmw.tinylog.Logger;
2017

21-
import java.time.Duration;
22-
import java.util.UUID;
23-
24-
import static net.bytebuddy.matcher.ElementMatchers.*;
25-
2618
public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {
2719

2820
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";
@@ -43,7 +35,10 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {
4335
.and(isPublic())
4436
.and(named("poll"))
4537
.and(takesArguments(1))
46-
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
38+
.and(
39+
returns(
40+
named(
41+
"org.apache.kafka.clients.consumer.ConsumerRecords"))),
4742
ApacheKafkaConsumerAdvice.class.getName());
4843
}
4944

@@ -52,27 +47,27 @@ public static class ApacheKafkaConsumerAdvice {
5247
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
5348
public static final LRUCache<Integer, Long> startTimeMap = new LRUCache<>(1000);
5449

55-
56-
@Advice.OnMethodEnter(suppress = Throwable.class)
50+
@Advice.OnMethodEnter(suppress = Throwable.class)
5751
public static void methodEnter() {
5852
try {
5953
System.out.println("Inside ApacheKafkaConsumerAdvice.methodEnter()");
6054
// TODO fix start time
61-
// startTimeMap.put(record.hashCode(), System.currentTimeMillis());
55+
// startTimeMap.put(record.hashCode(), System.currentTimeMillis());
6256
} catch (Exception e) {
6357
Logger.error(e);
6458
}
6559
}
6660

67-
@Advice.OnMethodExit(suppress = Throwable.class)
61+
@Advice.OnMethodExit(suppress = Throwable.class)
6862
public static void methodExit(
6963
@Advice.This KafkaConsumer<?, ?> consumer,
7064
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
7165
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
7266
try {
7367
System.out.println("Inside ApacheKafkaConsumerAdvice.methodExit()");
7468
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
75-
spansContainer.addKafkaConsumeSpan(System.currentTimeMillis(), consumer, metadata, consumerRecords);
69+
spansContainer.addKafkaConsumeSpan(
70+
System.currentTimeMillis(), consumer, metadata, consumerRecords);
7671
handled.put(consumerRecords.hashCode(), true);
7772
} catch (Throwable error) {
7873
Logger.error(error, "Failed to add kafka span");

src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.lumigo.core.instrumentation.impl;
22

3+
import static net.bytebuddy.matcher.ElementMatchers.*;
4+
35
import io.lumigo.core.SpansContainer;
46
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
57
import io.lumigo.core.instrumentation.agent.Loader;
68
import io.lumigo.core.utils.LRUCache;
9+
import java.util.UUID;
710
import lombok.AllArgsConstructor;
811
import net.bytebuddy.agent.builder.AgentBuilder;
912
import net.bytebuddy.asm.Advice;
@@ -16,10 +19,6 @@
1619
import org.apache.kafka.common.serialization.Serializer;
1720
import org.pmw.tinylog.Logger;
1821

19-
import java.util.UUID;
20-
21-
import static net.bytebuddy.matcher.ElementMatchers.*;
22-
2322
public class ApacheKafkaProducerInstrumentation implements LumigoInstrumentationApi {
2423

2524
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.producer";
@@ -39,8 +38,16 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {
3938
isMethod()
4039
.and(isPublic())
4140
.and(named("send"))
42-
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))
43-
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback")))),
41+
.and(
42+
takesArgument(
43+
0,
44+
named(
45+
"org.apache.kafka.clients.producer.ProducerRecord"))
46+
.and(
47+
takesArgument(
48+
1,
49+
named(
50+
"org.apache.kafka.clients.producer.Callback")))),
4451
ApacheKafkaProducerAdvice.class.getName());
4552
}
4653

@@ -57,10 +64,20 @@ public static <K, V> void methodEnter(
5764
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
5865
try {
5966
System.out.println("Inside ApacheKafkaProducerAdvice.methodEnter()");
60-
callback = new KafkaProducerCallback<>(callback, keySerializer, valueSerializer, metadata, record, System.currentTimeMillis());
67+
callback =
68+
new KafkaProducerCallback<>(
69+
callback,
70+
keySerializer,
71+
valueSerializer,
72+
metadata,
73+
record,
74+
System.currentTimeMillis());
6175

6276
// Try to inject correlation id to the kafka record headers
63-
record.headers().add("lumigoMessageId", UUID.randomUUID().toString().substring(0, 10).getBytes());
77+
record.headers()
78+
.add(
79+
"lumigoMessageId",
80+
UUID.randomUUID().toString().substring(0, 10).getBytes());
6481
} catch (Exception e) {
6582
Logger.error(e);
6683
}
@@ -84,7 +101,14 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
84101
System.out.println("Inside KafkaProducerCallback.onCompletion()");
85102

86103
Logger.info("Handling kafka request {} from host {}", record.hashCode());
87-
spansContainer.addKafkaProduceSpan(startTime, keySerializer, valueSerializer, producerMetadata, record, recordMetadata, exception);
104+
spansContainer.addKafkaProduceSpan(
105+
startTime,
106+
keySerializer,
107+
valueSerializer,
108+
producerMetadata,
109+
record,
110+
recordMetadata,
111+
exception);
88112
handled.put(record.hashCode(), true);
89113
} catch (Throwable error) {
90114
Logger.error(error, "Failed to add kafka span");

src/main/java/io/lumigo/core/parsers/event/EventParserFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package io.lumigo.core.parsers.event;
22

33
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
4+
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
45
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
56
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6-
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
77
import org.pmw.tinylog.Logger;
88

99
public interface EventParserFactory {

src/main/java/io/lumigo/core/parsers/event/KafkaEventParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.lumigo.core.parsers.event;
22

33
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
4-
54
import java.util.ArrayList;
65
import java.util.List;
76
import java.util.Map;
@@ -10,7 +9,8 @@ public class KafkaEventParser implements IEventParser<KafkaEvent> {
109
@Override
1110
public Object parse(KafkaEvent event) {
1211
List<ParsedKafkaEvent.Record> records = new ArrayList<>();
13-
for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry : event.getRecords().entrySet()) {
12+
for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry :
13+
event.getRecords().entrySet()) {
1414
String messageId = null;
1515
for (KafkaEvent.KafkaEventRecord record : entry.getValue()) {
1616
for (Map<String, byte[]> headers : record.getHeaders()) {

src/main/java/io/lumigo/core/parsers/event/ParsedKafkaEvent.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package io.lumigo.core.parsers.event;
22

3+
import java.util.List;
4+
import java.util.Map;
35
import lombok.AllArgsConstructor;
46
import lombok.Builder;
57
import lombok.Data;
68

7-
import java.util.List;
8-
import java.util.Map;
9-
109
@AllArgsConstructor
1110
@Builder(toBuilder = true)
1211
@Data(staticConstructor = "of")

src/main/java/io/lumigo/core/utils/AwsUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.lumigo.core.parsers.event.KafkaEventParser;
77
import io.lumigo.core.parsers.event.ParsedKafkaEvent;
88
import io.lumigo.models.Span;
9-
109
import java.util.*;
1110
import java.util.regex.Matcher;
1211
import java.util.regex.Pattern;
@@ -15,8 +14,6 @@
1514
import lombok.NoArgsConstructor;
1615
import org.pmw.tinylog.Logger;
1716

18-
import static io.lumigo.core.utils.StringUtils.buildMd5Hash;
19-
2017
public class AwsUtils {
2118

2219
public static final String COLD_START_KEY = "LUMIGO_COLD_START_KEY";
@@ -160,7 +157,11 @@ public static TriggeredBy extractTriggeredByFromEvent(Object event) {
160157
} else if (event instanceof KafkaEvent) {
161158
triggeredBy.setTriggeredBy("kafka");
162159
Object parsed = new KafkaEventParser().parse((KafkaEvent) event);
163-
triggeredBy.setMessageIds(((ParsedKafkaEvent) parsed).getRecords().stream().map(ParsedKafkaEvent.Record::getMessageId).collect(Collectors.toList()));
160+
triggeredBy.setMessageIds(
161+
((ParsedKafkaEvent) parsed)
162+
.getRecords().stream()
163+
.map(ParsedKafkaEvent.Record::getMessageId)
164+
.collect(Collectors.toList()));
164165
} else {
165166
Logger.info(
166167
"Failed to found relevant triggered by found for event {} ",

0 commit comments

Comments
 (0)