Skip to content

Commit 9091388

Browse files
committed
unit tests
1 parent 1b1bba6 commit 9091388

File tree

6 files changed

+457
-199
lines changed

6 files changed

+457
-199
lines changed

Diff for: src/main/java/io/lumigo/core/SpansContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.lumigo.core.utils.StringUtils;
1414
import io.lumigo.models.HttpSpan;
1515
import io.lumigo.models.KafkaSpan;
16+
import io.lumigo.models.KafkaSpanFactory;
1617
import io.lumigo.models.Span;
1718
import java.io.*;
1819
import java.util.*;
@@ -455,7 +456,7 @@ public <K, V> void addKafkaProduceSpan(
455456
RecordMetadata recordMetadata,
456457
Exception exception) {
457458
this.kafkaSpans.add(
458-
KafkaSpan.createProduce(
459+
KafkaSpanFactory.createProduce(
459460
this.baseSpan,
460461
startTime,
461462
keySerializer,
@@ -472,7 +473,7 @@ public void addKafkaConsumeSpan(
472473
ConsumerMetadata consumerMetadata,
473474
ConsumerRecords<?, ?> consumerRecords) {
474475
this.kafkaSpans.add(
475-
KafkaSpan.createConsume(
476+
KafkaSpanFactory.createConsume(
476477
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
477478
}
478479

Diff for: src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaConsumerInstrumentation.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentation
2121

2222
@Override
2323
public ElementMatcher<TypeDescription> getTypeMatcher() {
24-
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTypeMatcher()");
2524
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
2625
}
2726

2827
@Override
2928
public AgentBuilder.Transformer.ForAdvice getTransformer() {
30-
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTransformer()");
3129
return new AgentBuilder.Transformer.ForAdvice()
3230
.include(Loader.class.getClassLoader())
3331
.advice(
@@ -44,15 +42,12 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {
4442

4543
public static class ApacheKafkaConsumerAdvice {
4644
public static final SpansContainer spansContainer = SpansContainer.getInstance();
47-
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
48-
public static final LRUCache<Integer, Long> startTimeMap = new LRUCache<>(1000);
45+
public static final LRUCache<String, Long> startTimeMap = new LRUCache<>(1000);
4946

5047
@Advice.OnMethodEnter(suppress = Throwable.class)
51-
public static void methodEnter() {
48+
public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
5249
try {
53-
System.out.println("Inside ApacheKafkaConsumerAdvice.methodEnter()");
54-
// TODO fix start time
55-
// startTimeMap.put(record.hashCode(), System.currentTimeMillis());
50+
startTimeMap.put(clientId, System.currentTimeMillis());
5651
} catch (Exception e) {
5752
Logger.error(e);
5853
}
@@ -62,13 +57,12 @@ public static void methodEnter() {
6257
public static void methodExit(
6358
@Advice.This KafkaConsumer<?, ?> consumer,
6459
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
60+
@Advice.FieldValue("clientId") String clientId,
6561
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
6662
try {
67-
System.out.println("Inside ApacheKafkaConsumerAdvice.methodExit()");
6863
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
6964
spansContainer.addKafkaConsumeSpan(
70-
System.currentTimeMillis(), consumer, metadata, consumerRecords);
71-
handled.put(consumerRecords.hashCode(), true);
65+
startTimeMap.get(clientId), consumer, metadata, consumerRecords);
7266
} catch (Throwable error) {
7367
Logger.error(error, "Failed to add kafka span");
7468
}

Diff for: src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@ public class ApacheKafkaProducerInstrumentation implements LumigoInstrumentation
2525

2626
@Override
2727
public ElementMatcher<TypeDescription> getTypeMatcher() {
28-
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTypeMatcher()");
2928
return named("org.apache.kafka.clients.producer.KafkaProducer");
3029
}
3130

3231
@Override
3332
public AgentBuilder.Transformer.ForAdvice getTransformer() {
34-
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTransformer()");
3533
return new AgentBuilder.Transformer.ForAdvice()
3634
.include(Loader.class.getClassLoader())
3735
.advice(
@@ -63,7 +61,6 @@ public static <K, V> void methodEnter(
6361
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<K, V> record,
6462
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
6563
try {
66-
System.out.println("Inside ApacheKafkaProducerAdvice.methodEnter()");
6764
callback =
6865
new KafkaProducerCallback<>(
6966
callback,
@@ -98,9 +95,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
9895
if (callback != null) {
9996
callback.onCompletion(recordMetadata, exception);
10097
}
101-
System.out.println("Inside KafkaProducerCallback.onCompletion()");
102-
103-
Logger.info("Handling kafka request {} from host {}", record.hashCode());
98+
Logger.info("Handling kafka request {}", record.hashCode());
10499
spansContainer.addKafkaProduceSpan(
105100
startTime,
106101
keySerializer,

Diff for: src/main/java/io/lumigo/models/KafkaSpan.java

+2-180
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,11 @@
11
package io.lumigo.models;
22

3-
import static io.lumigo.core.SpansContainer.KAFKA_SPAN_TYPE;
4-
53
import com.fasterxml.jackson.annotation.JsonProperty;
64
import java.util.*;
7-
import java.util.stream.Collectors;
85
import lombok.AllArgsConstructor;
96
import lombok.Builder;
10-
import lombok.Data;
11-
import org.apache.kafka.clients.consumer.ConsumerRecords;
12-
import org.apache.kafka.clients.consumer.KafkaConsumer;
13-
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
14-
import org.apache.kafka.clients.producer.ProducerRecord;
15-
import org.apache.kafka.clients.producer.RecordMetadata;
16-
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
17-
import org.apache.kafka.common.header.Header;
18-
import org.apache.kafka.common.header.Headers;
19-
import org.apache.kafka.common.serialization.Serializer;
207

21-
@Data
228
@Builder(toBuilder = true)
23-
@AllArgsConstructor
249
public class KafkaSpan {
2510
private static final String KAFKA_PRODUCER_TYPE = "PRODUCER";
2611
private static final String KAFKA_CONSUMER_TYPE = "CONSUMER";
@@ -37,7 +22,6 @@ public class KafkaSpan {
3722
private Info info;
3823

3924
@Builder(toBuilder = true)
40-
@Data(staticConstructor = "of")
4125
public static class Info {
4226
private KafkaSpan.Tracer tracer;
4327
private KafkaSpan.TraceId traceId;
@@ -50,35 +34,29 @@ public static class Info {
5034

5135
@AllArgsConstructor
5236
@Builder(toBuilder = true)
53-
@Data(staticConstructor = "of")
5437
public static class TraceId {
5538
@JsonProperty("Root")
5639
private String root;
5740
}
5841

5942
@AllArgsConstructor
6043
@Builder(toBuilder = true)
61-
@Data(staticConstructor = "of")
6244
public static class Tracer {
6345
private String version;
6446
}
6547

6648
public interface KafkaInfo {}
6749

68-
@AllArgsConstructor
6950
@Builder(toBuilder = true)
70-
@Data(staticConstructor = "of")
7151
public static class KafkaProducerInfo implements KafkaInfo {
72-
private final String kafkaInfoType = KAFKA_PRODUCER_TYPE;
52+
private static final String kafkaInfoType = KAFKA_PRODUCER_TYPE;
7353
private List<String> bootstrapServers;
7454
private String topic;
7555
private KafkaSpan.KafkaProducerRecord record;
7656
private KafkaSpan.KafkaProducerResponse response;
7757
}
7858

79-
@AllArgsConstructor
8059
@Builder(toBuilder = true)
81-
@Data(staticConstructor = "of")
8260
public static class KafkaProducerRecord {
8361
private byte[] key;
8462
private byte[] value;
@@ -87,36 +65,28 @@ public static class KafkaProducerRecord {
8765

8866
public interface KafkaProducerResponse {}
8967

90-
@AllArgsConstructor
9168
@Builder(toBuilder = true)
92-
@Data(staticConstructor = "of")
9369
public static class KafkaProducerSuccessResponse implements KafkaProducerResponse {
9470
private Integer partition;
9571
private Long offset;
9672
}
9773

98-
@AllArgsConstructor
9974
@Builder(toBuilder = true)
100-
@Data(staticConstructor = "of")
10175
public static class KafkaProducerErrorResponse implements KafkaProducerResponse {
10276
private String errorMessage;
10377
}
10478

105-
@AllArgsConstructor
10679
@Builder(toBuilder = true)
107-
@Data(staticConstructor = "of")
10880
public static class KafkaConsumerInfo implements KafkaInfo {
109-
private final String kafkaInfoType = KAFKA_CONSUMER_TYPE;
81+
private static final String kafkaInfoType = KAFKA_CONSUMER_TYPE;
11082
private List<String> bootstrapServers;
11183
private String consumerGroupId;
11284
private Integer recordsCount;
11385
private List<String> topics;
11486
private List<KafkaSpan.KafkaConsumerRecord> records;
11587
}
11688

117-
@AllArgsConstructor
11889
@Builder(toBuilder = true)
119-
@Data(staticConstructor = "of")
12090
public static class KafkaConsumerRecord {
12191
private String topic;
12292
private Integer partition;
@@ -125,152 +95,4 @@ public static class KafkaConsumerRecord {
12595
private String value;
12696
private Map<String, byte[]> headers;
12797
}
128-
129-
public static <K, V> KafkaSpan createProduce(
130-
Span baseSpan,
131-
Long startTime,
132-
Serializer<K> keySerializer,
133-
Serializer<V> valueSerializer,
134-
ProducerMetadata producerMetadata,
135-
ProducerRecord<K, V> record,
136-
RecordMetadata recordMetadata,
137-
Exception exception) {
138-
List<String> bootstrapServers =
139-
producerMetadata.fetch().nodes().stream()
140-
.map(node -> node.host() + ":" + node.port())
141-
.collect(Collectors.toList());
142-
String topic = record.topic();
143-
KafkaProducerRecord producerRecord =
144-
KafkaProducerRecord.builder()
145-
.key(
146-
keySerializer.serialize(
147-
record.topic(), record.headers(), record.key()))
148-
.value(
149-
valueSerializer.serialize(
150-
record.topic(), record.headers(), record.value()))
151-
.headers(extractHeaders(record.headers()))
152-
.build();
153-
154-
KafkaInfo info;
155-
if (exception == null) {
156-
info =
157-
KafkaSpan.KafkaProducerInfo.builder()
158-
.bootstrapServers(bootstrapServers)
159-
.topic(topic)
160-
.record(producerRecord)
161-
.response(
162-
KafkaProducerSuccessResponse.builder()
163-
.partition(recordMetadata.partition())
164-
.offset(recordMetadata.offset())
165-
.build())
166-
.build();
167-
} else {
168-
info =
169-
KafkaProducerInfo.builder()
170-
.bootstrapServers(bootstrapServers)
171-
.topic(topic)
172-
.record(producerRecord)
173-
.response(
174-
KafkaProducerErrorResponse.builder()
175-
.errorMessage(exception.getMessage())
176-
.build())
177-
.build();
178-
}
179-
180-
return new KafkaSpanBuilder()
181-
.id(UUID.randomUUID().toString())
182-
.started(startTime)
183-
.ended(System.currentTimeMillis())
184-
.type(KAFKA_SPAN_TYPE)
185-
.transactionId(baseSpan.getTransactionId())
186-
.account(baseSpan.getAccount())
187-
.region(baseSpan.getRegion())
188-
.token(baseSpan.getToken())
189-
.parentId(baseSpan.getId())
190-
.info(
191-
KafkaSpan.Info.builder()
192-
.tracer(
193-
KafkaSpan.Tracer.builder()
194-
.version(
195-
baseSpan.getInfo().getTracer().getVersion())
196-
.build())
197-
.traceId(
198-
KafkaSpan.TraceId.builder()
199-
.root(baseSpan.getInfo().getTraceId().getRoot())
200-
.build())
201-
.messageId(
202-
new String(
203-
record.headers()
204-
.lastHeader("lumigoMessageId")
205-
.value()))
206-
.kafkaInfo(info)
207-
.build())
208-
.build();
209-
}
210-
211-
public static KafkaSpan createConsume(
212-
Span baseSpan,
213-
Long startTime,
214-
KafkaConsumer<?, ?> consumer,
215-
ConsumerMetadata consumerMetadata,
216-
ConsumerRecords<?, ?> consumerRecords) {
217-
List<String> messageIds = new ArrayList<>();
218-
List<String> bootstrapServers =
219-
consumerMetadata.fetch().nodes().stream()
220-
.map(node -> node.host() + ":" + node.port())
221-
.collect(Collectors.toList());
222-
List<String> topics = new ArrayList<>(consumer.subscription());
223-
List<KafkaConsumerRecord> records = new ArrayList<>();
224-
consumerRecords.forEach(
225-
record -> {
226-
messageIds.add(
227-
new String(record.headers().lastHeader("lumigoMessageId").value()));
228-
records.add(
229-
KafkaConsumerRecord.builder()
230-
.topic(record.topic())
231-
.partition(record.partition())
232-
.offset(record.offset())
233-
.key(record.key().toString())
234-
.value(record.value().toString())
235-
.headers(extractHeaders(record.headers()))
236-
.build());
237-
});
238-
return KafkaSpan.builder()
239-
.id(UUID.randomUUID().toString())
240-
.started(startTime)
241-
.ended(System.currentTimeMillis())
242-
.type(KAFKA_SPAN_TYPE)
243-
.transactionId(baseSpan.getTransactionId())
244-
.account(baseSpan.getAccount())
245-
.region(baseSpan.getRegion())
246-
.token(baseSpan.getToken())
247-
.parentId(baseSpan.getId())
248-
.info(
249-
Info.builder()
250-
.tracer(
251-
KafkaSpan.Tracer.builder()
252-
.version(
253-
baseSpan.getInfo().getTracer().getVersion())
254-
.build())
255-
.traceId(
256-
KafkaSpan.TraceId.builder()
257-
.root(baseSpan.getInfo().getTraceId().getRoot())
258-
.build())
259-
.messageIds(messageIds)
260-
.kafkaInfo(
261-
KafkaSpan.KafkaConsumerInfo.builder()
262-
.bootstrapServers(bootstrapServers)
263-
.consumerGroupId(consumer.groupMetadata().groupId())
264-
.topics(topics)
265-
.recordsCount(consumerRecords.count())
266-
.records(records)
267-
.build())
268-
.build())
269-
.build();
270-
}
271-
272-
private static Map<String, byte[]> extractHeaders(Headers headers) {
273-
return Arrays.stream(headers.toArray())
274-
.collect(Collectors.toMap(Header::key, Header::value));
275-
}
27698
}

0 commit comments

Comments
 (0)