Skip to content

Commit be8065e

Browse files
committed
add kafka consumer distributed traces
1 parent 76029c1 commit be8065e

File tree

4 files changed

+142
-0
lines changed

4 files changed

+142
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
dependencies {
3+
implementation(project(":agent-bridge"))
4+
implementation("org.apache.kafka:kafka-clients:0.11.0.0")
5+
}
6+
7+
jar {
8+
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-spans-consumer-0.11.0.0', 'Enabled': 'false',
9+
'Implementation-Title-Alias': 'kafka-clients-spans-consumer' }
10+
}
11+
12+
verifyInstrumentation {
13+
passes 'org.apache.kafka:kafka-clients:[0.11.0.0,)'
14+
}
15+
16+
site {
17+
title 'Kafka'
18+
type 'Messaging'
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.nr.instrumentation.kafka;
2+
3+
import com.newrelic.api.agent.HeaderType;
4+
import com.newrelic.api.agent.Headers;
5+
import org.apache.kafka.common.header.Header;
6+
7+
import java.util.ArrayList;
8+
import java.util.Collection;
9+
import java.util.HashSet;
10+
import java.util.Iterator;
11+
import java.util.Objects;
12+
13+
public class HeadersWrapper implements Headers {
14+
15+
private final org.apache.kafka.common.header.Headers delegate;
16+
17+
public HeadersWrapper(org.apache.kafka.common.header.Headers headers) {
18+
this.delegate = headers;
19+
}
20+
21+
@Override
22+
public HeaderType getHeaderType() {
23+
return HeaderType.MESSAGE;
24+
}
25+
26+
@Override
27+
public String getHeader(String name) {
28+
String value = null;
29+
Iterator<Header> iterator = delegate.headers(name).iterator();
30+
if (iterator.hasNext()) {
31+
byte[] bytes = iterator.next().value();
32+
if (bytes != null) {
33+
value = new String(bytes);
34+
}
35+
}
36+
return value;
37+
}
38+
39+
@Override
40+
public Collection<String> getHeaders(String name) {
41+
Collection<String> headers = new ArrayList<>();
42+
Iterator<Header> iterator = delegate.headers(name).iterator();
43+
while (iterator.hasNext()) {
44+
byte[] bytes = iterator.next().value();
45+
if (bytes != null) {
46+
headers.add(new String(bytes));
47+
}
48+
}
49+
return headers;
50+
}
51+
52+
@Override
53+
public void setHeader(String name, String value) {
54+
delegate.remove(name);
55+
delegate.add(name, value.getBytes());
56+
}
57+
58+
@Override
59+
public void addHeader(String name, String value) {
60+
delegate.add(name, value.getBytes());
61+
}
62+
63+
@Override
64+
public Collection<String> getHeaderNames() {
65+
Collection<String> headerNames = new HashSet<>();
66+
for(Header header : delegate) {
67+
headerNames.add(header.key());
68+
}
69+
return headerNames;
70+
}
71+
72+
@Override
73+
public boolean containsHeader(String name) {
74+
for(Header header : delegate) {
75+
if (Objects.equals(name,header.key())) {
76+
return true;
77+
}
78+
}
79+
return false;
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.apache.kafka.clients.consumer;
2+
3+
import com.newrelic.agent.bridge.AgentBridge;
4+
import com.newrelic.api.agent.Headers;
5+
import com.newrelic.api.agent.NewRelic;
6+
import com.newrelic.api.agent.TransportType;
7+
import com.newrelic.api.agent.weaver.Weave;
8+
import com.newrelic.api.agent.weaver.Weaver;
9+
import com.nr.instrumentation.kafka.HeadersWrapper;
10+
11+
import java.time.Duration;
12+
import java.util.logging.Level;
13+
14+
@Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer")
15+
public class KafkaConsumer_Instrumentation<K, V> {
16+
17+
public ConsumerRecords<K, V> poll(final Duration timeout) {
18+
final ConsumerRecords<K, V> records = Weaver.callOriginal();
19+
nrAcceptDtHeaders(records);
20+
return records;
21+
}
22+
23+
public ConsumerRecords<K, V> poll(final long timeoutMs) {
24+
final ConsumerRecords<K, V> records = Weaver.callOriginal();
25+
nrAcceptDtHeaders(records);
26+
return records;
27+
}
28+
29+
private void nrAcceptDtHeaders(ConsumerRecords<K, V> records) {
30+
AgentBridge.getAgent().getLogger().log(Level.INFO, "nrAcceptDtHeaders");
31+
if (AgentBridge.getAgent().getTransaction(false) != null) {
32+
AgentBridge.getAgent().getLogger().log(Level.INFO, "nrAcceptDtHeaders getTransaction exists");
33+
for (ConsumerRecord<?, ?> record : records) {
34+
AgentBridge.getAgent().getLogger().log(Level.INFO, "nrAcceptDtHeaders acceptDistributedTraceHeaders");
35+
Headers dtHeaders = new HeadersWrapper(record.headers());
36+
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, dtHeaders);
37+
break;
38+
}
39+
}
40+
}
41+
}

Diff for: settings.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ include 'instrumentation:kafka-clients-node-metrics-2.3.0'
255255
include 'instrumentation:kafka-clients-node-metrics-3.7.0'
256256
include 'instrumentation:kafka-clients-node-metrics-3.9.0'
257257
include 'instrumentation:kafka-clients-spans-0.11.0.0'
258+
include 'instrumentation:kafka-clients-spans-consumer-0.11.0.0'
258259
include 'instrumentation:kafka-connect-metrics-1.0.0'
259260
include 'instrumentation:kafka-connect-spans-2.0.0'
260261
include 'instrumentation:kafka-connect-spans-3.3.0'

0 commit comments

Comments
 (0)