Skip to content

Commit 59f1f93

Browse files
committed
fix cr comments
1 parent 1054ea6 commit 59f1f93

File tree

10 files changed

+61
-63
lines changed

10 files changed

+61
-63
lines changed

src/main/java/io/lumigo/core/SpansContainer.java

+11-19
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@
1111
import io.lumigo.core.utils.AwsUtils;
1212
import io.lumigo.core.utils.JsonUtils;
1313
import io.lumigo.core.utils.StringUtils;
14-
import io.lumigo.models.HttpSpan;
15-
import io.lumigo.models.KafkaSpan;
16-
import io.lumigo.models.KafkaSpanFactory;
17-
import io.lumigo.models.Span;
14+
import io.lumigo.models.*;
1815
import java.io.*;
1916
import java.util.*;
2017
import java.util.concurrent.Callable;
18+
import lombok.Getter;
2119
import org.apache.http.Header;
2220
import org.apache.http.HttpEntity;
2321
import org.apache.http.HttpResponse;
@@ -54,8 +52,8 @@ public class SpansContainer {
5452
private Long rttDuration;
5553
private Span endFunctionSpan;
5654
private Reporter reporter;
57-
private List<HttpSpan> httpSpans = new LinkedList<>();
58-
private List<KafkaSpan> kafkaSpans = new LinkedList<>();
55+
56+
@Getter private List<BaseSpan> spans = new LinkedList<>();
5957

6058
private static final SpansContainer ourInstance = new SpansContainer();
6159

@@ -71,8 +69,7 @@ public void clear() {
7169
rttDuration = null;
7270
endFunctionSpan = null;
7371
reporter = null;
74-
httpSpans = new LinkedList<>();
75-
kafkaSpans = new LinkedList<>();
72+
spans = new LinkedList<>();
7673
}
7774

7875
private SpansContainer() {}
@@ -230,19 +227,14 @@ public Span getStartFunctionSpan() {
230227
public List<Object> getAllCollectedSpans() {
231228
List<Object> spans = new LinkedList<>();
232229
spans.add(endFunctionSpan);
233-
spans.addAll(httpSpans);
234-
spans.addAll(kafkaSpans);
230+
spans.addAll(this.spans);
235231
return spans;
236232
}
237233

238234
public Span getEndSpan() {
239235
return endFunctionSpan;
240236
}
241237

242-
public List<HttpSpan> getHttpSpans() {
243-
return httpSpans;
244-
}
245-
246238
private String getStackTrace(Throwable throwable) {
247239
StringWriter sw = new StringWriter();
248240
PrintWriter pw = new PrintWriter(sw, true);
@@ -317,7 +309,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
317309
response.getStatusLine().getStatusCode())
318310
.build())
319311
.build());
320-
httpSpans.add(httpSpan);
312+
this.spans.add(httpSpan);
321313
}
322314

323315
public void addHttpSpan(Long startTime, Request<?> request, Response<?> response) {
@@ -376,7 +368,7 @@ public void addHttpSpan(Long startTime, Request<?> request, Response<?> response
376368
.build());
377369
AwsSdkV1ParserFactory.getParser(request.getServiceName())
378370
.safeParse(httpSpan, request, response);
379-
httpSpans.add(httpSpan);
371+
this.spans.add(httpSpan);
380372
}
381373

382374
public void addHttpSpan(
@@ -445,7 +437,7 @@ public void addHttpSpan(
445437
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME))
446438
.safeParse(httpSpan, context);
447439

448-
httpSpans.add(httpSpan);
440+
this.spans.add(httpSpan);
449441
}
450442

451443
public <K, V> void addKafkaProduceSpan(
@@ -456,7 +448,7 @@ public <K, V> void addKafkaProduceSpan(
456448
ProducerRecord<K, V> record,
457449
RecordMetadata recordMetadata,
458450
Exception exception) {
459-
this.kafkaSpans.add(
451+
this.spans.add(
460452
KafkaSpanFactory.createProduce(
461453
this.baseSpan,
462454
startTime,
@@ -473,7 +465,7 @@ public void addKafkaConsumeSpan(
473465
KafkaConsumer<?, ?> consumer,
474466
ConsumerMetadata consumerMetadata,
475467
ConsumerRecords<?, ?> consumerRecords) {
476-
this.kafkaSpans.add(
468+
this.spans.add(
477469
KafkaSpanFactory.createConsume(
478470
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
479471
}

src/main/java/io/lumigo/core/configuration/Configuration.java

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class Configuration {
2727
public static final String LUMIGO_MAX_RESPONSE_SIZE = "LUMIGO_MAX_RESPONSE_SIZE";
2828
public static final String LUMIGO_MAX_SIZE_FOR_REQUEST = "LUMIGO_MAX_SIZE_FOR_REQUEST";
2929
public static final String LUMIGO_INSTRUMENTATION = "LUMIGO_INSTRUMENTATION";
30+
public static final String LUMIGO_MAX_BATCH_MESSAGE_IDS = "LUMIGO_MAX_BATCH_MESSAGE_IDS";
3031

3132
private static Configuration instance;
3233
private LumigoConfiguration inlineConf;
@@ -136,4 +137,8 @@ public int maxRequestSize() {
136137
LUMIGO_MAX_SIZE_FOR_REQUEST,
137138
envUtil.getIntegerEnv(LUMIGO_MAX_RESPONSE_SIZE, 1024 * 500));
138139
}
140+
141+
public int maxBatchMessageIds() {
142+
return envUtil.getIntegerEnv(LUMIGO_MAX_BATCH_MESSAGE_IDS, 20);
143+
}
139144
}

src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java

-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.lumigo.core.SpansContainer;
66
import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
77
import io.lumigo.core.instrumentation.agent.Loader;
8-
import io.lumigo.core.utils.LRUCache;
98
import io.lumigo.models.KafkaSpan;
109
import java.nio.charset.StandardCharsets;
1110
import java.util.UUID;
@@ -53,7 +52,6 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {
5352

5453
public static class ApacheKafkaProducerAdvice {
5554
public static final SpansContainer spansContainer = SpansContainer.getInstance();
56-
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
5755

5856
@Advice.OnMethodEnter
5957
public static <K, V> void methodEnter(
@@ -109,7 +107,6 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
109107
record,
110108
recordMetadata,
111109
exception);
112-
handled.put(record.hashCode(), true);
113110
} catch (Throwable error) {
114111
Logger.error(error, "Failed to add kafka span");
115112
}

src/main/java/io/lumigo/core/utils/AwsUtils.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.amazonaws.services.lambda.runtime.events.*;
44
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue;
55
import com.fasterxml.jackson.annotation.JsonInclude;
6+
import io.lumigo.core.configuration.Configuration;
67
import io.lumigo.models.KafkaSpan;
78
import io.lumigo.models.Span;
89
import java.nio.charset.StandardCharsets;
@@ -158,21 +159,28 @@ public static TriggeredBy extractTriggeredByFromEvent(Object event) {
158159
triggeredBy.setTriggeredBy("kafka");
159160
triggeredBy.setArn(((KafkaEvent) event).getEventSourceArn());
160161
List<String> messageIds = new ArrayList<>();
161-
for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry :
162-
((KafkaEvent) event).getRecords().entrySet()) {
163-
for (KafkaEvent.KafkaEventRecord record : entry.getValue()) {
164-
for (Map<String, byte[]> headers : record.getHeaders()) {
165-
if (headers.containsKey(KafkaSpan.LUMIGO_MESSAGE_ID_KEY)) {
166-
messageIds.add(
167-
new String(
168-
headers.get(KafkaSpan.LUMIGO_MESSAGE_ID_KEY),
169-
StandardCharsets.UTF_8));
170-
break;
162+
if (((KafkaEvent) event).getRecords() != null) {
163+
for (Map.Entry<String, List<KafkaEvent.KafkaEventRecord>> entry :
164+
((KafkaEvent) event).getRecords().entrySet()) {
165+
for (KafkaEvent.KafkaEventRecord record : entry.getValue()) {
166+
for (Map<String, byte[]> headers : record.getHeaders()) {
167+
if (headers.containsKey(KafkaSpan.LUMIGO_MESSAGE_ID_KEY)) {
168+
messageIds.add(
169+
new String(
170+
headers.get(KafkaSpan.LUMIGO_MESSAGE_ID_KEY),
171+
StandardCharsets.UTF_8));
172+
break;
173+
}
171174
}
172175
}
173176
}
174177
}
175-
triggeredBy.setMessageIds(messageIds);
178+
triggeredBy.setMessageIds(
179+
messageIds.subList(
180+
0,
181+
Math.min(
182+
messageIds.size(),
183+
Configuration.getInstance().maxBatchMessageIds())));
176184
} else {
177185
Logger.info(
178186
"Failed to found relevant triggered by found for event {} ",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package io.lumigo.models;
2+
3+
public interface BaseSpan {}

src/main/java/io/lumigo/models/HttpSpan.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
@AllArgsConstructor
1111
@Builder(toBuilder = true)
1212
@Data(staticConstructor = "of")
13-
public class HttpSpan {
13+
public class HttpSpan implements BaseSpan {
1414
private Long started;
1515
private Long ended;
1616
private String id;

src/main/java/io/lumigo/models/KafkaSpan.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
@Getter
1010
@Builder(toBuilder = true)
1111
@AllArgsConstructor
12-
public class KafkaSpan {
12+
public class KafkaSpan implements BaseSpan {
1313
public static final String LUMIGO_MESSAGE_ID_KEY = "lumigoMessageId";
1414
public static final String KAFKA_PRODUCER_TYPE = "PRODUCER";
1515
public static final String KAFKA_CONSUMER_TYPE = "CONSUMER";

src/main/java/io/lumigo/models/KafkaSpanFactory.java

+16-23
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,17 @@ public static <K, V> KafkaSpan createProduce(
4343
.headers(extractHeaders(record.headers()))
4444
.build();
4545

46-
KafkaSpan.KafkaInfo info;
46+
KafkaSpan.KafkaProducerResponse response;
4747
if (exception == null) {
48-
info =
49-
KafkaSpan.KafkaProducerInfo.builder()
50-
.kafkaInfoType(KafkaSpan.KAFKA_PRODUCER_TYPE)
51-
.bootstrapServers(bootstrapServers)
52-
.topic(topic)
53-
.record(producerRecord)
54-
.response(
55-
KafkaSpan.KafkaProducerSuccessResponse.builder()
56-
.partition(recordMetadata.partition())
57-
.offset(recordMetadata.offset())
58-
.build())
48+
response =
49+
KafkaSpan.KafkaProducerSuccessResponse.builder()
50+
.partition(recordMetadata.partition())
51+
.offset(recordMetadata.offset())
5952
.build();
6053
} else {
61-
info =
62-
KafkaSpan.KafkaProducerInfo.builder()
63-
.kafkaInfoType(KafkaSpan.KAFKA_PRODUCER_TYPE)
64-
.bootstrapServers(bootstrapServers)
65-
.topic(topic)
66-
.record(producerRecord)
67-
.response(
68-
KafkaSpan.KafkaProducerErrorResponse.builder()
69-
.errorMessage(exception.getMessage())
70-
.build())
54+
response =
55+
KafkaSpan.KafkaProducerErrorResponse.builder()
56+
.errorMessage(exception.getMessage())
7157
.build();
7258
}
7359

@@ -99,7 +85,14 @@ public static <K, V> KafkaSpan createProduce(
9985
.root(baseSpan.getInfo().getTraceId().getRoot())
10086
.build())
10187
.messageId(messageId)
102-
.kafkaInfo(info)
88+
.kafkaInfo(
89+
KafkaSpan.KafkaProducerInfo.builder()
90+
.kafkaInfoType(KafkaSpan.KAFKA_PRODUCER_TYPE)
91+
.bootstrapServers(bootstrapServers)
92+
.topic(topic)
93+
.record(producerRecord)
94+
.response(response)
95+
.build())
10396
.build())
10497
.build();
10598
}

src/test/java/io/lumigo/core/SpansContainerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ void add_http_span() throws Exception {
315315
long startTime = System.currentTimeMillis();
316316
spansContainer.addHttpSpan(startTime, httpRequest, httpResponse);
317317

318-
HttpSpan actualSpan = spansContainer.getHttpSpans().get(0);
318+
HttpSpan actualSpan = (HttpSpan) spansContainer.getSpans().get(0);
319319
String expectedSpan =
320320
"{\n"
321321
+ " \"started\":1559127760071,\n"
@@ -379,7 +379,7 @@ void add_aws_http_span_with_spnid_from_header_amzn() throws Exception {
379379
long startTime = System.currentTimeMillis();
380380
spansContainer.addHttpSpan(startTime, awsRequest, awsResponse);
381381

382-
HttpSpan actualSpan = spansContainer.getHttpSpans().get(0);
382+
HttpSpan actualSpan = (HttpSpan) spansContainer.getSpans().get(0);
383383
String expectedSpan =
384384
"{\n"
385385
+ " \"started\":1559127760071,\n"
@@ -445,7 +445,7 @@ void add_aws_http_span_with_spnid_from_header_amz() throws Exception {
445445
long startTime = System.currentTimeMillis();
446446
spansContainer.addHttpSpan(startTime, awsRequest, awsResponse);
447447

448-
HttpSpan actualSpan = spansContainer.getHttpSpans().get(0);
448+
HttpSpan actualSpan = (HttpSpan) spansContainer.getSpans().get(0);
449449
String expectedSpan =
450450
"{\n"
451451
+ " \"started\":1559127760071,\n"
@@ -531,7 +531,7 @@ void add_aws_sdk_v2_http_span() throws Exception {
531531

532532
spansContainer.addHttpSpan(startTime, requestContext, executionAttributes);
533533

534-
HttpSpan actualSpan = spansContainer.getHttpSpans().get(0);
534+
HttpSpan actualSpan = (HttpSpan) spansContainer.getSpans().get(0);
535535
String expectedSpan =
536536
"{\n"
537537
+ " \"started\":1559127760071,\n"

src/test/java/io/lumigo/core/instrumentation/impl/ApacheHttpInstrumentationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void handling_exit_response_create_new_span() throws Exception {
9292

9393
ApacheHttpInstrumentation.ApacheHttpAdvice.methodExit(request, response);
9494

95-
assertEquals(1, SpansContainer.getInstance().getHttpSpans().size());
95+
assertEquals(1, SpansContainer.getInstance().getSpans().size());
9696
assertNotNull(ApacheHttpInstrumentation.ApacheHttpAdvice.handled.get(request.hashCode()));
9797
}
9898

0 commit comments

Comments
 (0)