Skip to content

Commit 55280fe

Browse files
committed
WIP
1 parent 24062f4 commit 55280fe

20 files changed

+980
-47
lines changed

Diff for: README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,5 @@ Add the environment variable `JAVA_TOOL_OPTIONS` to your Lambda functions and se
107107

108108
- Aws SDK V1
109109
- Aws SDK V2
110-
- Apache HTTP Client
110+
- Apache HTTP Client
111+
- Apache Kafka

Diff for: findbugs/findbugs-exclude.xml

+9
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,13 @@
22
<Match>
33
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME"/>
44
</Match>
5+
<Match>
6+
<Bug pattern="EI_EXPOSE_REP2" />
7+
</Match>
8+
<Match>
9+
<Bug pattern="EI_EXPOSE_REP" />
10+
</Match>
11+
<Match>
12+
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
13+
</Match>
514
</FindBugsFilter>

Diff for: pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
<dependency>
6969
<groupId>com.amazonaws</groupId>
7070
<artifactId>aws-lambda-java-events</artifactId>
71-
<version>2.2.6</version>
71+
<version>3.11.5</version>
7272
</dependency>
7373
<dependency>
7474
<groupId>com.amazonaws</groupId>
@@ -133,6 +133,13 @@
133133
<version>2.25.45</version>
134134
</dependency>
135135

136+
<!-- Kafka dependencies -->
137+
<dependency>
138+
<groupId>org.apache.kafka</groupId>
139+
<artifactId>kafka-clients</artifactId>
140+
<version>3.1.0</version>
141+
</dependency>
142+
136143
<!-- Tracer dependencies -->
137144
<dependency>
138145
<groupId>com.fasterxml.jackson.core</groupId>

Diff for: src/main/java/io/lumigo/core/SpansContainer.java

+49-10
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,23 @@
1616
import io.lumigo.models.HttpSpan;
1717
import io.lumigo.models.Reportable;
1818
import io.lumigo.models.Span;
19+
import io.lumigo.models.*;
1920
import java.io.*;
2021
import java.util.*;
2122
import java.util.concurrent.Callable;
23+
import lombok.Getter;
2224
import org.apache.http.Header;
2325
import org.apache.http.HttpEntity;
2426
import org.apache.http.HttpResponse;
2527
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
2628
import org.apache.http.client.methods.HttpUriRequest;
29+
import org.apache.kafka.clients.consumer.ConsumerRecords;
30+
import org.apache.kafka.clients.consumer.KafkaConsumer;
31+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
32+
import org.apache.kafka.clients.producer.ProducerRecord;
33+
import org.apache.kafka.clients.producer.RecordMetadata;
34+
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
35+
import org.apache.kafka.common.serialization.Serializer;
2736
import org.pmw.tinylog.Logger;
2837
import software.amazon.awssdk.awscore.AwsResponse;
2938
import software.amazon.awssdk.core.SdkResponse;
@@ -41,14 +50,17 @@ public class SpansContainer {
4150
private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID";
4251
private static final String FUNCTION_SPAN_TYPE = "function";
4352
private static final String HTTP_SPAN_TYPE = "http";
53+
public static final String KAFKA_SPAN_TYPE = "kafka";
4454
private static final SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
4555

4656
private Span baseSpan;
4757
private Span startFunctionSpan;
4858
private Long rttDuration;
4959
private Span endFunctionSpan;
5060
private Reporter reporter;
51-
private List<HttpSpan> httpSpans = new LinkedList<>();
61+
62+
@Getter private List<BaseSpan> spans = new LinkedList<>();
63+
5264
private static final SpansContainer ourInstance = new SpansContainer();
5365

5466
public static SpansContainer getInstance() {
@@ -63,7 +75,7 @@ public void clear() {
6375
rttDuration = null;
6476
endFunctionSpan = null;
6577
reporter = null;
66-
httpSpans = new LinkedList<>();
78+
spans = new LinkedList<>();
6779
}
6880

6981
private SpansContainer() {}
@@ -81,6 +93,7 @@ public void init(Map<String, String> env, Reporter reporter, Context context, Ob
8193
Logger.debug("awsTracerId {}", awsTracerId);
8294

8395
AwsUtils.TriggeredBy triggeredBy = AwsUtils.extractTriggeredByFromEvent(event);
96+
8497
long startTime = System.currentTimeMillis();
8598
this.baseSpan =
8699
Span.builder()
@@ -221,18 +234,14 @@ public Span getStartFunctionSpan() {
221234
public List<Reportable> getAllCollectedSpans() {
222235
List<Reportable> spans = new LinkedList<>();
223236
spans.add(endFunctionSpan);
224-
spans.addAll(httpSpans);
237+
spans.addAll(this.spans);
225238
return spans;
226239
}
227240

228241
public Span getEndSpan() {
229242
return endFunctionSpan;
230243
}
231244

232-
public List<HttpSpan> getHttpSpans() {
233-
return httpSpans;
234-
}
235-
236245
private String getStackTrace(Throwable throwable) {
237246
StringWriter sw = new StringWriter();
238247
PrintWriter pw = new PrintWriter(sw, true);
@@ -307,7 +316,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
307316
response.getStatusLine().getStatusCode())
308317
.build())
309318
.build());
310-
httpSpans.add(httpSpan);
319+
this.spans.add(httpSpan);
311320
}
312321

313322
public void addHttpSpan(Long startTime, Request<?> request, Response<?> response) {
@@ -366,7 +375,7 @@ public void addHttpSpan(Long startTime, Request<?> request, Response<?> response
366375
.build());
367376
AwsSdkV1ParserFactory.getParser(request.getServiceName())
368377
.safeParse(httpSpan, request, response);
369-
httpSpans.add(httpSpan);
378+
this.spans.add(httpSpan);
370379
}
371380

372381
public void addHttpSpan(
@@ -435,7 +444,37 @@ public void addHttpSpan(
435444
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME))
436445
.safeParse(httpSpan, context);
437446

438-
httpSpans.add(httpSpan);
447+
this.spans.add(httpSpan);
448+
}
449+
450+
public <K, V> void addKafkaProduceSpan(
451+
Long startTime,
452+
Serializer<K> keySerializer,
453+
Serializer<V> valueSerializer,
454+
ProducerMetadata producerMetadata,
455+
ProducerRecord<K, V> record,
456+
RecordMetadata recordMetadata,
457+
Exception exception) {
458+
this.spans.add(
459+
KafkaSpanFactory.createProduce(
460+
this.baseSpan,
461+
startTime,
462+
keySerializer,
463+
valueSerializer,
464+
producerMetadata,
465+
record,
466+
recordMetadata,
467+
exception));
468+
}
469+
470+
public void addKafkaConsumeSpan(
471+
Long startTime,
472+
KafkaConsumer<?, ?> consumer,
473+
ConsumerMetadata consumerMetadata,
474+
ConsumerRecords<?, ?> consumerRecords) {
475+
this.spans.add(
476+
KafkaSpanFactory.createConsume(
477+
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
439478
}
440479

441480
private static String extractHeaders(Map<String, String> headers) {

Diff for: src/main/java/io/lumigo/core/configuration/Configuration.java

+9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class Configuration {
2828
public static final String LUMIGO_MAX_SIZE_FOR_REQUEST = "LUMIGO_MAX_SIZE_FOR_REQUEST";
2929
public static final String LUMIGO_INSTRUMENTATION = "LUMIGO_INSTRUMENTATION";
3030
public static final String LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX";
31+
public static final String LUMIGO_MAX_BATCH_MESSAGE_IDS = "LUMIGO_MAX_BATCH_MESSAGE_IDS";
3132

3233
private static Configuration instance;
3334
private LumigoConfiguration inlineConf;
@@ -137,4 +138,12 @@ public int maxRequestSize() {
137138
LUMIGO_MAX_SIZE_FOR_REQUEST,
138139
envUtil.getIntegerEnv(LUMIGO_MAX_RESPONSE_SIZE, 1024 * 500));
139140
}
141+
142+
public int maxBatchMessageIds() {
143+
int value = envUtil.getIntegerEnv(LUMIGO_MAX_BATCH_MESSAGE_IDS, 20);
144+
if (value == 0) {
145+
value = 20;
146+
}
147+
return value;
148+
}
140149
}

Diff for: src/main/java/io/lumigo/core/instrumentation/agent/Loader.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
44
import static net.bytebuddy.matcher.ElementMatchers.not;
55

6-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientInstrumentation;
7-
import io.lumigo.core.instrumentation.impl.AmazonHttpClientV2Instrumentation;
8-
import io.lumigo.core.instrumentation.impl.ApacheHttpInstrumentation;
6+
import io.lumigo.core.instrumentation.impl.*;
97
import net.bytebuddy.agent.builder.AgentBuilder;
108
import org.pmw.tinylog.Logger;
119

@@ -17,6 +15,10 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
1715
new AmazonHttpClientInstrumentation();
1816
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
1917
new AmazonHttpClientV2Instrumentation();
18+
ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation =
19+
new ApacheKafkaProducerInstrumentation();
20+
ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation =
21+
new ApacheKafkaConsumerInstrumentation();
2022
AgentBuilder builder =
2123
new AgentBuilder.Default()
2224
.disableClassFormatChanges()
@@ -27,13 +29,28 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
2729
.and(
2830
not(
2931
nameStartsWith(
30-
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"))))
32+
AmazonHttpClientV2Instrumentation
33+
.INSTRUMENTATION_PACKAGE_PREFIX)))
34+
.and(
35+
not(
36+
nameStartsWith(
37+
ApacheKafkaProducerInstrumentation
38+
.INSTRUMENTATION_PACKAGE_PREFIX)))
39+
.and(
40+
not(
41+
nameStartsWith(
42+
ApacheKafkaConsumerInstrumentation
43+
.INSTRUMENTATION_PACKAGE_PREFIX))))
3144
.type(apacheHttpInstrumentation.getTypeMatcher())
3245
.transform(apacheHttpInstrumentation.getTransformer())
3346
.type(amazonHttpClientInstrumentation.getTypeMatcher())
3447
.transform(amazonHttpClientInstrumentation.getTransformer())
3548
.type(amazonHttpClientV2Instrumentation.getTypeMatcher())
36-
.transform(amazonHttpClientV2Instrumentation.getTransformer());
49+
.transform(amazonHttpClientV2Instrumentation.getTransformer())
50+
.type(apacheKafkaInstrumentation.getTypeMatcher())
51+
.transform(apacheKafkaInstrumentation.getTransformer())
52+
.type(apacheKafkaConsumerInstrumentation.getTypeMatcher())
53+
.transform(apacheKafkaConsumerInstrumentation.getTransformer());
3754

3855
builder.installOn(inst);
3956
Logger.debug("Finish Instrumentation");

Diff for: src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
import software.amazon.awssdk.http.SdkHttpRequest;
1919

2020
public class AmazonHttpClientV2Instrumentation implements LumigoInstrumentationApi {
21+
22+
public static final String INSTRUMENTATION_PACKAGE_PREFIX =
23+
"software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
24+
2125
@Override
2226
public ElementMatcher<TypeDescription> getTypeMatcher() {
23-
return named("software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder");
27+
return named(INSTRUMENTATION_PACKAGE_PREFIX);
2428
}
2529

2630
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.lumigo.core.instrumentation.impl;
2+
3+
import static net.bytebuddy.matcher.ElementMatchers.*;
4+
5+
import io.lumigo.core.SpansContainer;
6+
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
7+
import io.lumigo.core.instrumentation.agent.Loader;
8+
import io.lumigo.core.utils.LRUCache;
9+
import net.bytebuddy.agent.builder.AgentBuilder;
10+
import net.bytebuddy.asm.Advice;
11+
import net.bytebuddy.description.type.TypeDescription;
12+
import net.bytebuddy.matcher.ElementMatcher;
13+
import org.apache.kafka.clients.consumer.ConsumerRecords;
14+
import org.apache.kafka.clients.consumer.KafkaConsumer;
15+
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
16+
import org.pmw.tinylog.Logger;
17+
18+
public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {
19+
20+
public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";
21+
22+
@Override
23+
public ElementMatcher<TypeDescription> getTypeMatcher() {
24+
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
25+
}
26+
27+
@Override
28+
public AgentBuilder.Transformer.ForAdvice getTransformer() {
29+
return new AgentBuilder.Transformer.ForAdvice()
30+
.include(Loader.class.getClassLoader())
31+
.advice(
32+
isMethod()
33+
.and(isPublic())
34+
.and(named("poll"))
35+
.and(takesArguments(1))
36+
.and(
37+
returns(
38+
named(
39+
"org.apache.kafka.clients.consumer.ConsumerRecords"))),
40+
ApacheKafkaConsumerAdvice.class.getName());
41+
}
42+
43+
public static class ApacheKafkaConsumerAdvice {
44+
public static final SpansContainer spansContainer = SpansContainer.getInstance();
45+
public static final LRUCache<String, Long> startTimeMap = new LRUCache<>(1000);
46+
47+
@Advice.OnMethodEnter(suppress = Throwable.class)
48+
public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
49+
try {
50+
startTimeMap.put(clientId, System.currentTimeMillis());
51+
} catch (Exception e) {
52+
Logger.error(e);
53+
}
54+
}
55+
56+
@Advice.OnMethodExit(suppress = Throwable.class)
57+
public static void methodExit(
58+
@Advice.This KafkaConsumer<?, ?> consumer,
59+
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
60+
@Advice.FieldValue("clientId") String clientId,
61+
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
62+
try {
63+
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
64+
spansContainer.addKafkaConsumeSpan(
65+
startTimeMap.get(clientId), consumer, metadata, consumerRecords);
66+
} catch (Throwable error) {
67+
Logger.error(error, "Failed to add kafka span");
68+
}
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)