Skip to content

Commit a6829ea

Browse files
committed
WIP testing tests
1 parent 235737f commit a6829ea

20 files changed

+1586
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
muzzle {
2+
pass {
3+
group = "org.apache.kafka"
4+
module = "kafka-clients"
5+
versions = "[3.1.0,)"
6+
assertInverse = false
7+
}
8+
}
9+
10+
apply from: "$rootDir/gradle/java.gradle"
11+
12+
addTestSuite('latestDepTest')
13+
addTestSuite('iastLatestDepTest3')
14+
15+
16+
java {
17+
toolchain {
18+
languageVersion.set(JavaLanguageVersion.of(17))
19+
}
20+
}
21+
dependencies {
22+
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
23+
implementation project(':dd-java-agent:instrumentation:kafka-common')
24+
25+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
26+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
27+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
28+
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.17.0'
29+
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
30+
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
31+
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
32+
testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1')
33+
testImplementation(testFixtures(project(':dd-java-agent:agent-iast')))
34+
35+
//IAST
36+
testRuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter')
37+
testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang')
38+
testRuntimeOnly project(':dd-java-agent:instrumentation:java-io')
39+
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
40+
testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10')
41+
42+
43+
// Include latest version of kafka itself along with latest version of client libs.
44+
// This seems to help with jar compatibility hell.
45+
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+'
46+
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+'
47+
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+'
48+
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+'
49+
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
50+
latestDepTestImplementation libs.guava
51+
52+
}
53+
54+
configurations.testRuntimeClasspath {
55+
// spock-core depends on assertj version that is not compatible with kafka-clients
56+
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
57+
}
58+
59+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import java.util.Objects;
4+
import javax.annotation.Nullable;
5+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
6+
7+
public class KafkaConsumerInfo {
8+
private final String consumerGroup;
9+
private final ConsumerGroupMetadata clientMetadata;
10+
private final String bootstrapServers;
11+
12+
public KafkaConsumerInfo(
13+
String consumerGroup, ConsumerGroupMetadata clientMetadata, String bootstrapServers) {
14+
this.consumerGroup = consumerGroup;
15+
this.clientMetadata = clientMetadata;
16+
this.bootstrapServers = bootstrapServers;
17+
}
18+
19+
public KafkaConsumerInfo(String consumerGroup, String bootstrapServers) {
20+
this.consumerGroup = consumerGroup;
21+
this.clientMetadata = null;
22+
this.bootstrapServers = bootstrapServers;
23+
}
24+
25+
@Nullable
26+
public String getConsumerGroup() {
27+
return consumerGroup;
28+
}
29+
30+
@Nullable
31+
public ConsumerGroupMetadata getClientMetadata() {
32+
return clientMetadata;
33+
}
34+
35+
@Nullable
36+
public String getBootstrapServers() {
37+
return bootstrapServers;
38+
}
39+
40+
@Override
41+
public boolean equals(Object o) {
42+
if (this == o) return true;
43+
if (o == null || getClass() != o.getClass()) return false;
44+
KafkaConsumerInfo consumerInfo = (KafkaConsumerInfo) o;
45+
return Objects.equals(consumerGroup, consumerInfo.consumerGroup)
46+
&& Objects.equals(clientMetadata, consumerInfo.clientMetadata);
47+
}
48+
49+
@Override
50+
public int hashCode() {
51+
return 31 * (null == consumerGroup ? 0 : consumerGroup.hashCode())
52+
+ (null == clientMetadata ? 0 : clientMetadata.hashCode());
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith;
5+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
7+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
8+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
9+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT;
10+
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_POLL;
11+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
12+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
13+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
14+
import static net.bytebuddy.matcher.ElementMatchers.returns;
15+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
16+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
17+
18+
import com.google.auto.service.AutoService;
19+
import datadog.trace.agent.tooling.Instrumenter;
20+
import datadog.trace.agent.tooling.InstrumenterModule;
21+
import datadog.trace.api.Config;
22+
import datadog.trace.bootstrap.InstrumentationContext;
23+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
24+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import net.bytebuddy.asm.Advice;
29+
import net.bytebuddy.description.type.TypeDescription;
30+
import net.bytebuddy.matcher.ElementMatcher;
31+
import org.apache.kafka.clients.consumer.ConsumerConfig;
32+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
33+
import org.apache.kafka.clients.consumer.ConsumerRecord;
34+
import org.apache.kafka.clients.consumer.ConsumerRecords;
35+
import org.apache.kafka.clients.consumer.KafkaConsumer;
36+
37+
/**
38+
* This instrumentation saves additional information from the KafkaConsumer, such as consumer group
39+
* and cluster ID, in the context store for later use.
40+
*/
41+
@AutoService(InstrumenterModule.class)
42+
public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing
43+
implements Instrumenter.ForTypeHierarchy {
44+
45+
public KafkaConsumerInfoInstrumentation() {
46+
super("kafka");
47+
}
48+
49+
@Override
50+
public Map<String, String> contextStore() {
51+
Map<String, String> contextStores = new HashMap<>();
52+
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
53+
contextStores.put(
54+
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
55+
contextStores.put(
56+
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
57+
KafkaConsumerInfo.class.getName());
58+
contextStores.put(
59+
"org.apache.kafka.clients.consumer.KafkaConsumer", KafkaConsumerInfo.class.getName());
60+
return contextStores;
61+
}
62+
63+
@Override
64+
public String hierarchyMarkerType() {
65+
return "org.apache.kafka.clients.consumer.KafkaConsumer";
66+
}
67+
68+
@Override
69+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
70+
return extendsClass(nameStartsWith(hierarchyMarkerType()));
71+
}
72+
73+
@Override
74+
public String[] helperClassNames() {
75+
return new String[] {
76+
packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo",
77+
};
78+
}
79+
80+
@Override
81+
public void methodAdvice(MethodTransformer transformer) {
82+
transformer.applyAdvice(
83+
isConstructor()
84+
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig")))
85+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
86+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
87+
KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice");
88+
89+
transformer.applyAdvice(
90+
isMethod()
91+
.and(isPublic())
92+
.and(named("poll"))
93+
.and(takesArguments(1))
94+
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
95+
KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice");
96+
}
97+
98+
public static class ConstructorAdvice {
99+
@Advice.OnMethodExit(suppress = Throwable.class)
100+
public static void captureGroup(
101+
@Advice.This KafkaConsumer consumer, @Advice.Argument(0) ConsumerConfig consumerConfig) {
102+
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
103+
104+
String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
105+
String normalizedConsumerGroup =
106+
consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null;
107+
108+
if (normalizedConsumerGroup == null) {
109+
if (groupMetadata != null) {
110+
normalizedConsumerGroup = groupMetadata.groupId();
111+
}
112+
}
113+
List<String> bootstrapServersList =
114+
consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
115+
String bootstrapServers = null;
116+
if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) {
117+
bootstrapServers = String.join(",", bootstrapServersList);
118+
}
119+
120+
KafkaConsumerInfo kafkaConsumerInfo;
121+
if (Config.get().isDataStreamsEnabled()) {
122+
kafkaConsumerInfo =
123+
new KafkaConsumerInfo(normalizedConsumerGroup, groupMetadata, bootstrapServers);
124+
} else {
125+
kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, bootstrapServers);
126+
}
127+
128+
if (kafkaConsumerInfo.getConsumerGroup() != null
129+
|| kafkaConsumerInfo.getClientMetadata() != null) {
130+
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class)
131+
.put(consumer, kafkaConsumerInfo);
132+
// if (coordinator != null) {
133+
// InstrumentationContext.get(ConsumerCoordinator.class, KafkaConsumerInfo.class)
134+
// .put(coordinator, kafkaConsumerInfo);
135+
// }
136+
}
137+
}
138+
139+
public static void muzzleCheck(ConsumerRecord record) {
140+
// KafkaConsumerInstrumentation only applies for kafka versions with headers
141+
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
142+
record.headers();
143+
}
144+
}
145+
146+
/**
147+
* this method transfers the consumer group from the KafkaConsumer class key to the
148+
* ConsumerRecords key. This is necessary because in the poll method, we don't have access to the
149+
* KafkaConsumer class.
150+
*/
151+
public static class RecordsAdvice {
152+
@Advice.OnMethodEnter(suppress = Throwable.class)
153+
public static AgentScope onEnter() {
154+
boolean dataStreamsEnabled;
155+
if (activeSpan() != null) {
156+
dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled();
157+
} else {
158+
dataStreamsEnabled = Config.get().isDataStreamsEnabled();
159+
}
160+
if (dataStreamsEnabled) {
161+
final AgentSpan span = startSpan(KAFKA_POLL);
162+
return activateSpan(span);
163+
}
164+
return null;
165+
}
166+
167+
@Advice.OnMethodExit(suppress = Throwable.class)
168+
public static void captureGroup(
169+
@Advice.Enter final AgentScope scope,
170+
@Advice.This KafkaConsumer consumer,
171+
@Advice.Return ConsumerRecords records) {
172+
int recordsCount = 0;
173+
if (records != null) {
174+
KafkaConsumerInfo kafkaConsumerInfo =
175+
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer);
176+
if (kafkaConsumerInfo != null) {
177+
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class)
178+
.put(records, kafkaConsumerInfo);
179+
}
180+
recordsCount = records.count();
181+
}
182+
if (scope == null) {
183+
return;
184+
}
185+
AgentSpan span = scope.span();
186+
span.setTag(KAFKA_RECORDS_COUNT, recordsCount);
187+
span.finish();
188+
scope.close();
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)