Skip to content

Commit 09bf2e5

Browse files
authored
Rd 12757 java tracer support kafka instrumentation (#67)
1 parent 24062f4 commit 09bf2e5

25 files changed

+1228
-83
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,5 @@ Add the environment variable `JAVA_TOOL_OPTIONS` to your Lambda functions and se
107107

108108
- Aws SDK V1
109109
- Aws SDK V2
110-
- Apache HTTP Client
110+
- Apache HTTP Client
111+
- Apache Kafka

findbugs/findbugs-exclude.xml

+9
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,13 @@
22
<Match>
33
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME"/>
44
</Match>
5+
<Match>
6+
<Bug pattern="EI_EXPOSE_REP2" />
7+
</Match>
8+
<Match>
9+
<Bug pattern="EI_EXPOSE_REP" />
10+
</Match>
11+
<Match>
12+
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
13+
</Match>
514
</FindBugsFilter>

pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
<dependency>
6969
<groupId>com.amazonaws</groupId>
7070
<artifactId>aws-lambda-java-events</artifactId>
71-
<version>2.2.6</version>
71+
<version>3.11.5</version>
7272
</dependency>
7373
<dependency>
7474
<groupId>com.amazonaws</groupId>
@@ -133,6 +133,13 @@
133133
<version>2.25.45</version>
134134
</dependency>
135135

136+
<!-- Kafka dependencies -->
137+
<dependency>
138+
<groupId>org.apache.kafka</groupId>
139+
<artifactId>kafka-clients</artifactId>
140+
<version>3.1.0</version>
141+
</dependency>
142+
136143
<!-- Tracer dependencies -->
137144
<dependency>
138145
<groupId>com.fasterxml.jackson.core</groupId>

src/main/java/io/lumigo/core/SpansContainer.java

+59-26
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,25 @@
1313
import io.lumigo.core.utils.JsonUtils;
1414
import io.lumigo.core.utils.SecretScrubber;
1515
import io.lumigo.core.utils.StringUtils;
16+
import io.lumigo.models.*;
1617
import io.lumigo.models.HttpSpan;
17-
import io.lumigo.models.Reportable;
1818
import io.lumigo.models.Span;
1919
import java.io.*;
2020
import java.util.*;
2121
import java.util.concurrent.Callable;
22+
import lombok.Getter;
2223
import org.apache.http.Header;
2324
import org.apache.http.HttpEntity;
2425
import org.apache.http.HttpResponse;
2526
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
2627
import org.apache.http.client.methods.HttpUriRequest;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
29+
import org.apache.kafka.clients.consumer.KafkaConsumer;
30+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
31+
import org.apache.kafka.clients.producer.ProducerRecord;
32+
import org.apache.kafka.clients.producer.RecordMetadata;
33+
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
34+
import org.apache.kafka.common.serialization.Serializer;
2735
import org.pmw.tinylog.Logger;
2836
import software.amazon.awssdk.awscore.AwsResponse;
2937
import software.amazon.awssdk.core.SdkResponse;
@@ -41,14 +49,16 @@ public class SpansContainer {
4149
private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID";
4250
private static final String FUNCTION_SPAN_TYPE = "function";
4351
private static final String HTTP_SPAN_TYPE = "http";
44-
private static final SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
52+
public static final String KAFKA_SPAN_TYPE = "kafka";
4553

4654
private Span baseSpan;
47-
private Span startFunctionSpan;
55+
@Getter private Span startFunctionSpan;
4856
private Long rttDuration;
4957
private Span endFunctionSpan;
5058
private Reporter reporter;
51-
private List<HttpSpan> httpSpans = new LinkedList<>();
59+
private SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
60+
@Getter private List<BaseSpan> spans = new LinkedList<>();
61+
5262
private static final SpansContainer ourInstance = new SpansContainer();
5363

5464
public static SpansContainer getInstance() {
@@ -63,14 +73,15 @@ public void clear() {
6373
rttDuration = null;
6474
endFunctionSpan = null;
6575
reporter = null;
66-
httpSpans = new LinkedList<>();
76+
spans = new LinkedList<>();
6777
}
6878

6979
private SpansContainer() {}
7080

7181
public void init(Map<String, String> env, Reporter reporter, Context context, Object event) {
7282
this.clear();
7383
this.reporter = reporter;
84+
this.secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
7485

7586
int javaVersion = AwsUtils.parseJavaVersion(System.getProperty("java.version"));
7687
if (javaVersion > 11) {
@@ -81,6 +92,7 @@ public void init(Map<String, String> env, Reporter reporter, Context context, Ob
8192
Logger.debug("awsTracerId {}", awsTracerId);
8293

8394
AwsUtils.TriggeredBy triggeredBy = AwsUtils.extractTriggeredByFromEvent(event);
95+
8496
long startTime = System.currentTimeMillis();
8597
this.baseSpan =
8698
Span.builder()
@@ -166,8 +178,7 @@ public void start() {
166178
.build();
167179

168180
try {
169-
rttDuration =
170-
reporter.reportSpans(prepareToSend(startFunctionSpan, false), MAX_REQUEST_SIZE);
181+
rttDuration = reporter.reportSpans(prepareToSend(startFunctionSpan), MAX_REQUEST_SIZE);
171182
} catch (Throwable e) {
172183
Logger.error(e, "Failed to send start span");
173184
}
@@ -214,25 +225,17 @@ private void end(Span endFunctionSpan) throws IOException {
214225
MAX_REQUEST_SIZE);
215226
}
216227

217-
public Span getStartFunctionSpan() {
218-
return startFunctionSpan;
219-
}
220-
221-
public List<Reportable> getAllCollectedSpans() {
222-
List<Reportable> spans = new LinkedList<>();
228+
public List<BaseSpan> getAllCollectedSpans() {
229+
List<BaseSpan> spans = new LinkedList<>();
223230
spans.add(endFunctionSpan);
224-
spans.addAll(httpSpans);
231+
spans.addAll(this.spans);
225232
return spans;
226233
}
227234

228235
public Span getEndSpan() {
229236
return endFunctionSpan;
230237
}
231238

232-
public List<HttpSpan> getHttpSpans() {
233-
return httpSpans;
234-
}
235-
236239
private String getStackTrace(Throwable throwable) {
237240
StringWriter sw = new StringWriter();
238241
PrintWriter pw = new PrintWriter(sw, true);
@@ -307,7 +310,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
307310
response.getStatusLine().getStatusCode())
308311
.build())
309312
.build());
310-
httpSpans.add(httpSpan);
313+
this.spans.add(httpSpan);
311314
}
312315

313316
public void addHttpSpan(Long startTime, Request<?> request, Response<?> response) {
@@ -366,7 +369,7 @@ public void addHttpSpan(Long startTime, Request<?> request, Response<?> response
366369
.build());
367370
AwsSdkV1ParserFactory.getParser(request.getServiceName())
368371
.safeParse(httpSpan, request, response);
369-
httpSpans.add(httpSpan);
372+
this.spans.add(httpSpan);
370373
}
371374

372375
public void addHttpSpan(
@@ -435,7 +438,37 @@ public void addHttpSpan(
435438
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME))
436439
.safeParse(httpSpan, context);
437440

438-
httpSpans.add(httpSpan);
441+
this.spans.add(httpSpan);
442+
}
443+
444+
public <K, V> void addKafkaProduceSpan(
445+
Long startTime,
446+
Serializer<K> keySerializer,
447+
Serializer<V> valueSerializer,
448+
ProducerMetadata producerMetadata,
449+
ProducerRecord<K, V> record,
450+
RecordMetadata recordMetadata,
451+
Exception exception) {
452+
this.spans.add(
453+
KafkaSpanFactory.createProduce(
454+
this.baseSpan,
455+
startTime,
456+
keySerializer,
457+
valueSerializer,
458+
producerMetadata,
459+
record,
460+
recordMetadata,
461+
exception));
462+
}
463+
464+
public void addKafkaConsumeSpan(
465+
Long startTime,
466+
KafkaConsumer<?, ?> consumer,
467+
ConsumerMetadata consumerMetadata,
468+
ConsumerRecords<?, ?> consumerRecords) {
469+
this.spans.add(
470+
KafkaSpanFactory.createConsume(
471+
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
439472
}
440473

441474
private static String extractHeaders(Map<String, String> headers) {
@@ -522,18 +555,18 @@ protected static <T> T callIfVerbose(Callable<T> method) {
522555
}
523556
}
524557

525-
private Reportable prepareToSend(Reportable span, boolean hasError) {
526-
return reduceSpanSize(span.scrub(secretScrubber), hasError);
558+
private BaseSpan prepareToSend(BaseSpan span) {
559+
return reduceSpanSize(span.scrub(secretScrubber), false);
527560
}
528561

529-
private List<Reportable> prepareToSend(List<Reportable> spans, boolean hasError) {
530-
for (Reportable span : spans) {
562+
private List<BaseSpan> prepareToSend(List<BaseSpan> spans, boolean hasError) {
563+
for (BaseSpan span : spans) {
531564
reduceSpanSize(span.scrub(secretScrubber), hasError);
532565
}
533566
return spans;
534567
}
535568

536-
public Reportable reduceSpanSize(Reportable span, boolean hasError) {
569+
public BaseSpan reduceSpanSize(BaseSpan span, boolean hasError) {
537570
int maxFieldSize =
538571
hasError
539572
? Configuration.getInstance().maxSpanFieldSizeWhenError()

src/main/java/io/lumigo/core/configuration/Configuration.java

+9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class Configuration {
2828
public static final String LUMIGO_MAX_SIZE_FOR_REQUEST = "LUMIGO_MAX_SIZE_FOR_REQUEST";
2929
public static final String LUMIGO_INSTRUMENTATION = "LUMIGO_INSTRUMENTATION";
3030
public static final String LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX";
31+
public static final String LUMIGO_MAX_BATCH_MESSAGE_IDS = "LUMIGO_MAX_BATCH_MESSAGE_IDS";
3132

3233
private static Configuration instance;
3334
private LumigoConfiguration inlineConf;
@@ -137,4 +138,12 @@ public int maxRequestSize() {
137138
LUMIGO_MAX_SIZE_FOR_REQUEST,
138139
envUtil.getIntegerEnv(LUMIGO_MAX_RESPONSE_SIZE, 1024 * 500));
139140
}
141+
142+
public int maxBatchMessageIds() {
143+
int value = envUtil.getIntegerEnv(LUMIGO_MAX_BATCH_MESSAGE_IDS, 20);
144+
if (value == 0) {
145+
value = 20;
146+
}
147+
return value;
148+
}
140149
}

src/main/java/io/lumigo/core/instrumentation/agent/Loader.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
44
import static net.bytebuddy.matcher.ElementMatchers.not;
55

6-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientInstrumentation;
7-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientV2Instrumentation;
8-
import io.lumigo.core.instrumentation.impl.ApacheHttpInstrumentation;
6+
import io.lumigo.core.instrumentation.impl.*;
97
import net.bytebuddy.agent.builder.AgentBuilder;
108
import org.pmw.tinylog.Logger;
119

@@ -17,6 +15,10 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
1715
new AmazonHttpClientInstrumentation();
1816
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
1917
new AmazonHttpClientV2Instrumentation();
18+
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation =
19+
new ApacheKafkaProducerInstrumentation();
20+
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation =
21+
new ApacheKafkaConsumerInstrumentation();
2022
AgentBuilder builder =
2123
new AgentBuilder.Default()
2224
.disableClassFormatChanges()
@@ -27,13 +29,28 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
2729
.and(
2830
not(
2931
nameStartsWith(
30-
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"))))
32+
AmazonHttpClientV2Instrumentation
33+
.INSTRUMENTATION_PACKAGE_PREFIX)))
34+
.and(
35+
not(
36+
nameStartsWith(
37+
ApacheKafkaProducerInstrumentation
38+
.INSTRUMENTATION_PACKAGE_PREFIX)))
39+
.and(
40+
not(
41+
nameStartsWith(
42+
ApacheKafkaConsumerInstrumentation
43+
.INSTRUMENTATION_PACKAGE_PREFIX))))
3144
.type(apacheHttpInstrumentation.getTypeMatcher())
3245
.transform(apacheHttpInstrumentation.getTransformer())
3346
.type(amazonHttpClientInstrumentation.getTypeMatcher())
3447
.transform(amazonHttpClientInstrumentation.getTransformer())
3548
.type(amazonHttpClientV2Instrumentation.getTypeMatcher())
36-
.transform(amazonHttpClientV2Instrumentation.getTransformer());
49+
.transform(amazonHttpClientV2Instrumentation.getTransformer())
50+
.type(apacheKafkaInstrumentation.getTypeMatcher())
51+
.transform(apacheKafkaInstrumentation.getTransformer())
52+
.type(apacheKafkaConsumerInstrumentation.getTypeMatcher())
53+
.transform(apacheKafkaConsumerInstrumentation.getTransformer());
3754

3855
builder.installOn(inst);
3956
Logger.debug("Finish Instrumentation");

src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
import software.amazon.awssdk.http.SdkHttpRequest;
1919

2020
public class AmazonHttpClientV2Instrumentation implements LumigoInstrumentationApi {
21+
22+
public static final String INSTRUMENTATION_PACKAGE_PREFIX =
23+
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
24+
2125
@Override
2226
public ElementMatcher<TypeDescription> getTypeMatcher() {
23-
return named("software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder");
27+
return named(INSTRUMENTATION_PACKAGE_PREFIX);
2428
}
2529

2630
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.lumigo.core.instrumentation.impl;
2+
3+
import static net.bytebuddy.matcher.ElementMatchers.*;
4+
5+
import io.lumigo.core.SpansContainer;
6+
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
7+
import io.lumigo.core.instrumentation.agent.Loader;
8+
import io.lumigo.core.utils.LRUCache;
9+
import net.bytebuddy.agent.builder.AgentBuilder;
10+
import net.bytebuddy.asm.Advice;
11+
import net.bytebuddy.description.type.TypeDescription;
12+
import net.bytebuddy.matcher.ElementMatcher;
13+
import org.apache.kafka.clients.consumer.ConsumerRecords;
14+
import org.apache.kafka.clients.consumer.KafkaConsumer;
15+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
16+
import org.pmw.tinylog.Logger;
17+
18+
public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {
19+
20+
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";
21+
22+
@Override
23+
public ElementMatcher<TypeDescription> getTypeMatcher() {
24+
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
25+
}
26+
27+
@Override
28+
public AgentBuilder.Transformer.ForAdvice getTransformer() {
29+
return new AgentBuilder.Transformer.ForAdvice()
30+
.include(Loader.class.getClassLoader())
31+
.advice(
32+
isMethod()
33+
.and(isPublic())
34+
.and(named("poll"))
35+
.and(takesArguments(1))
36+
.and(
37+
returns(
38+
named(
39+
"org.apache.kafka.clients.consumer.ConsumerRecords"))),
40+
ApacheKafkaConsumerAdvice.class.getName());
41+
}
42+
43+
public static class ApacheKafkaConsumerAdvice {
44+
public static final SpansContainer spansContainer = SpansContainer.getInstance();
45+
public static final LRUCache<String, Long> startTimeMap = new LRUCache<>(1000);
46+
47+
@Advice.OnMethodEnter(suppress = Throwable.class)
48+
public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
49+
try {
50+
startTimeMap.put(clientId, System.currentTimeMillis());
51+
} catch (Exception e) {
52+
Logger.error(e);
53+
}
54+
}
55+
56+
@Advice.OnMethodExit(suppress = Throwable.class)
57+
public static void methodExit(
58+
@Advice.This KafkaConsumer<?, ?> consumer,
59+
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
60+
@Advice.FieldValue("clientId") String clientId,
61+
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
62+
try {
63+
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
64+
spansContainer.addKafkaConsumeSpan(
65+
startTimeMap.get(clientId), consumer, metadata, consumerRecords);
66+
} catch (Throwable error) {
67+
Logger.error(error, "Failed to add kafka span");
68+
}
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)