Skip to content

Commit 3b42361

Browse files
committed
WIP
1 parent 24ab0c1 commit 3b42361

File tree

5 files changed

+312
-50
lines changed

5 files changed

+312
-50
lines changed

src/main/java/io/lumigo/core/SpansContainer.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@
2626
import org.apache.http.client.methods.HttpUriRequest;
2727
import org.apache.kafka.clients.ApiVersions;
2828
import org.apache.kafka.clients.Metadata;
29+
import org.apache.kafka.clients.consumer.ConsumerRecords;
30+
import org.apache.kafka.clients.consumer.KafkaConsumer;
31+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
2932
import org.apache.kafka.clients.producer.Callback;
3033
import org.apache.kafka.clients.producer.ProducerConfig;
3134
import org.apache.kafka.clients.producer.ProducerRecord;
35+
import org.apache.kafka.clients.producer.RecordMetadata;
36+
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
3237
import org.apache.kafka.clients.producer.internals.Sender;
3338
import org.pmw.tinylog.Logger;
3439
import software.amazon.awssdk.awscore.AwsResponse;
@@ -47,6 +52,7 @@ public class SpansContainer {
4752
private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID";
4853
private static final String FUNCTION_SPAN_TYPE = "function";
4954
private static final String HTTP_SPAN_TYPE = "http";
55+
public static final String KAFKA_SPAN_TYPE = "kafka";
5056

5157
private Span baseSpan;
5258
private Span startFunctionSpan;
@@ -446,8 +452,12 @@ public void addHttpSpan(
446452
httpSpans.add(httpSpan);
447453
}
448454

449-
public void addKafkaProduceSpan(Long startTime, ProducerRecord record) {
450-
this.kafkaSpans.add(KafkaSpan.create(this.baseSpan, startTime, record));
455+
public void addKafkaProduceSpan(Long startTime, ProducerMetadata producerMetadata, ProducerRecord<?, ?> record, RecordMetadata recordMetadata, Exception exception) {
456+
this.kafkaSpans.add(KafkaSpan.createProduce(this.baseSpan, startTime, producerMetadata, record, recordMetadata, exception));
457+
}
458+
459+
public void addKafkaConsumeSpan(Long startTime, KafkaConsumer<?, ?> consumer, ConsumerMetadata consumerMetadata, ConsumerRecords<?, ?> consumerRecords) {
460+
this.kafkaSpans.add(KafkaSpan.createConsume(this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
451461
}
452462

453463
private static String extractHeaders(Map<String, String> headers) {

src/main/java/io/lumigo/core/instrumentation/agent/Loader.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
44
import static net.bytebuddy.matcher.ElementMatchers.not;
55

6-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientInstrumentation;
7-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientV2Instrumentation;
8-
import io.lumigo.core.instrumentation.impl.ApacheHttpInstrumentation;
9-
import io.lumigo.core.instrumentation.impl.ApacheKafkaProducerInstrumentation;
6+
import io.lumigo.core.instrumentation.impl.*;
107
import net.bytebuddy.agent.builder.AgentBuilder;
118
import org.pmw.tinylog.Logger;
129

@@ -19,6 +16,7 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
1916
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
2017
new AmazonHttpClientV2Instrumentation();
2118
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation = new ApacheKafkaProducerInstrumentation();
19+
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation = new ApacheKafkaConsumerInstrumentation();
2220
AgentBuilder builder =
2321
new AgentBuilder.Default()
2422
.disableClassFormatChanges()
@@ -28,6 +26,7 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
2826
.and(not(nameStartsWith("org.apache.http.impl.client")))
2927
.and(not(nameStartsWith(AmazonHttpClientV2Instrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
3028
.and(not(nameStartsWith(ApacheKafkaProducerInstrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
29+
.and(not(nameStartsWith(ApacheKafkaConsumerInstrumentation.INSTRUMENTATION_PACKAGE_PREFIX)))
3130
)
3231
.type(apacheHttpInstrumentation.getTypeMatcher())
3332
.transform(apacheHttpInstrumentation.getTransformer())
@@ -36,7 +35,9 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
3635
.type(amazonHttpClientV2Instrumentation.getTypeMatcher())
3736
.transform(amazonHttpClientV2Instrumentation.getTransformer())
3837
.type(apacheKafkaInstrumentation.getTypeMatcher())
39-
.transform(apacheKafkaInstrumentation.getTransformer());
38+
.transform(apacheKafkaInstrumentation.getTransformer())
39+
.type(apacheKafkaConsumerInstrumentation.getTypeMatcher())
40+
.transform(apacheKafkaConsumerInstrumentation.getTransformer());
4041

4142
builder.installOn(inst);
4243
Logger.debug("Finish Instrumentation");
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.lumigo.core.instrumentation.impl;
2+
3+
import com.amazonaws.Response;
4+
import io.lumigo.core.SpansContainer;
5+
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
6+
import io.lumigo.core.instrumentation.agent.Loader;
7+
import io.lumigo.core.utils.LRUCache;
8+
import lombok.AllArgsConstructor;
9+
import net.bytebuddy.agent.builder.AgentBuilder;
10+
import net.bytebuddy.asm.Advice;
11+
import net.bytebuddy.description.type.TypeDescription;
12+
import net.bytebuddy.matcher.ElementMatcher;
13+
import org.apache.kafka.clients.consumer.ConsumerRecords;
14+
import org.apache.kafka.clients.consumer.KafkaConsumer;
15+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
16+
import org.apache.kafka.clients.producer.Callback;
17+
import org.apache.kafka.clients.producer.ProducerRecord;
18+
import org.apache.kafka.clients.producer.RecordMetadata;
19+
import org.pmw.tinylog.Logger;
20+
21+
import java.time.Duration;
22+
import java.util.UUID;
23+
24+
import static net.bytebuddy.matcher.ElementMatchers.*;
25+
26+
public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {
27+
28+
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";
29+
30+
@Override
31+
public ElementMatcher<TypeDescription> getTypeMatcher() {
32+
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTypeMatcher()");
33+
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
34+
}
35+
36+
@Override
37+
public AgentBuilder.Transformer.ForAdvice getTransformer() {
38+
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTransformer()");
39+
return new AgentBuilder.Transformer.ForAdvice()
40+
.include(Loader.class.getClassLoader())
41+
.advice(
42+
isMethod()
43+
.and(isPublic())
44+
.and(named("poll"))
45+
.and(takesArguments(1))
46+
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
47+
ApacheKafkaConsumerAdvice.class.getName());
48+
}
49+
50+
public static class ApacheKafkaConsumerAdvice {
51+
public static final SpansContainer spansContainer = SpansContainer.getInstance();
52+
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
53+
public static final LRUCache<Integer, Long> startTimeMap = new LRUCache<>(1000);
54+
55+
56+
@Advice.OnMethodEnter(suppress = Throwable.class)
57+
public static void methodEnter() {
58+
try {
59+
System.out.println("Inside ApacheKafkaConsumerAdvice.methodEnter()");
60+
// TODO fix start time
61+
// startTimeMap.put(record.hashCode(), System.currentTimeMillis());
62+
} catch (Exception e) {
63+
Logger.error(e);
64+
}
65+
}
66+
67+
@Advice.OnMethodExit(suppress = Throwable.class)
68+
public static void methodExit(
69+
@Advice.This KafkaConsumer<?, ?> consumer,
70+
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
71+
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
72+
try {
73+
System.out.println("Inside ApacheKafkaConsumerAdvice.methodExit()");
74+
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
75+
spansContainer.addKafkaConsumeSpan(System.currentTimeMillis(), consumer, metadata, consumerRecords);
76+
handled.put(consumerRecords.hashCode(), true);
77+
} catch (Throwable error) {
78+
Logger.error(error, "Failed to add kafka span");
79+
}
80+
}
81+
}
82+
}

src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import net.bytebuddy.asm.Advice;
1010
import net.bytebuddy.description.type.TypeDescription;
1111
import net.bytebuddy.matcher.ElementMatcher;
12+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
1213
import org.apache.kafka.clients.producer.Callback;
1314
import org.apache.kafka.clients.producer.ProducerRecord;
1415
import org.apache.kafka.clients.producer.RecordMetadata;
16+
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
1517
import org.pmw.tinylog.Logger;
1618

1719
import java.util.UUID;
@@ -24,13 +26,13 @@ public class ApacheKafkaProducerInstrumentation implements LumigoInstrumentation
2426

2527
@Override
2628
public ElementMatcher<TypeDescription> getTypeMatcher() {
27-
System.out.println("Inside ApacheKafkaInstrumentation.getTypeMatcher()");
29+
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTypeMatcher()");
2830
return named("org.apache.kafka.clients.producer.KafkaProducer");
2931
}
3032

3133
@Override
3234
public AgentBuilder.Transformer.ForAdvice getTransformer() {
33-
System.out.println("Inside ApacheKafkaInstrumentation.getTransformer()");
35+
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTransformer()");
3436
return new AgentBuilder.Transformer.ForAdvice()
3537
.include(Loader.class.getClassLoader())
3638
.advice(
@@ -39,25 +41,23 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {
3941
.and(named("send"))
4042
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))
4143
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback")))),
42-
ApacheKafkaAdvice.class.getName());
44+
ApacheKafkaProducerAdvice.class.getName());
4345
}
4446

45-
public static class ApacheKafkaAdvice {
47+
public static class ApacheKafkaProducerAdvice {
4648
public static final SpansContainer spansContainer = SpansContainer.getInstance();
4749
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
48-
public static final LRUCache<Integer, Long> startTimeMap = new LRUCache<>(1000);
4950

5051
@Advice.OnMethodEnter
5152
public static void methodEnter(
52-
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
53+
@Advice.FieldValue("metadata") ProducerMetadata metadata,
54+
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
5355
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
5456
try {
55-
System.out.println("Inside ApacheKafkaAdvice.methodEnter()");
56-
startTimeMap.put(record.hashCode(), System.currentTimeMillis());
57-
callback = new KafkaProducerCallback(callback, record);
57+
System.out.println("Inside ApacheKafkaProducerAdvice.methodEnter()");
58+
callback = new KafkaProducerCallback(callback, metadata, record, System.currentTimeMillis());
5859

5960
// Try to inject correlation id to the kafka record headers
60-
// TODO dd injecting time in queue to the record headers
6161
record.headers().add("lumigoMessageId", UUID.randomUUID().toString().substring(0, 10).getBytes());
6262
} catch (Exception e) {
6363
Logger.error(e);
@@ -67,7 +67,9 @@ public static void methodEnter(
6767
@AllArgsConstructor
6868
public static class KafkaProducerCallback implements Callback {
6969
private final Callback callback;
70-
private final ProducerRecord record;
70+
private final ProducerMetadata producerMetadata;
71+
private final ProducerRecord<?, ?> record;
72+
private final long startTime;
7173

7274
@Override
7375
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
@@ -76,13 +78,10 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
7678
callback.onCompletion(recordMetadata, exception);
7779
}
7880
System.out.println("Inside KafkaProducerCallback.onCompletion()");
79-
if (handled.get(record.hashCode()) == null) {
80-
Logger.info("Handling kafka request {} from host {}", record.hashCode());
81-
spansContainer.addKafkaProduceSpan(startTimeMap.get(record.hashCode()), record);
82-
handled.put(record.hashCode(), true);
83-
} else {
84-
Logger.warn("Already handle kafka request {} for host {}", record.hashCode());
85-
}
81+
82+
Logger.info("Handling kafka request {} from host {}", record.hashCode());
83+
spansContainer.addKafkaProduceSpan(startTime, producerMetadata, record, recordMetadata, exception);
84+
handled.put(record.hashCode(), true);
8685
} catch (Throwable error) {
8786
Logger.error(error, "Failed to add kafka span");
8887
}

0 commit comments

Comments
 (0)