diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/build.gradle b/dd-java-agent/instrumentation/kafka-clients-3.1/build.gradle new file mode 100644 index 00000000000..793b79b5612 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/build.gradle @@ -0,0 +1,59 @@ +muzzle { + pass { + group = "org.apache.kafka" + module = "kafka-clients" + versions = "[3.1.0,)" + assertInverse = false + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuite('latestDepTest') +addTestSuite('iastLatestDepTest3') + + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(17)) + } +} +dependencies { + compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0' + implementation project(':dd-java-agent:instrumentation:kafka-common') + + testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.17.0' + testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' + testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0' + testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1') + testImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) + + //IAST + testRuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter') + testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang') + testRuntimeOnly project(':dd-java-agent:instrumentation:java-io') + testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core') + testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10') + + + // Include latest version of kafka itself along with latest version of client libs. + // This seems to help with jar compatibility hell. + latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+' + latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+' + latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+' + latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+' + latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+' + latestDepTestImplementation libs.guava + +} + +configurations.testRuntimeClasspath { + // spock-core depends on assertj version that is not compatible with kafka-clients + resolutionStrategy.force 'org.assertj:assertj-core:2.9.1' +} + + diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java new file mode 100644 index 00000000000..2bca41b48ec --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java @@ -0,0 +1,54 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; + +public class KafkaConsumerInfo { + private final String consumerGroup; + private final ConsumerGroupMetadata clientMetadata; + private final String bootstrapServers; + + public KafkaConsumerInfo( + String consumerGroup, ConsumerGroupMetadata clientMetadata, String bootstrapServers) { + this.consumerGroup = consumerGroup; + this.clientMetadata = clientMetadata; + this.bootstrapServers = bootstrapServers; + } + + public KafkaConsumerInfo(String consumerGroup, String bootstrapServers) { + this.consumerGroup = consumerGroup; + this.clientMetadata = null; + this.bootstrapServers = bootstrapServers; + } + + @Nullable + public String getConsumerGroup() { + return consumerGroup; + } + + @Nullable + public ConsumerGroupMetadata getClientMetadata() { + return clientMetadata; + } + + @Nullable + public String getBootstrapServers() { + return bootstrapServers; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KafkaConsumerInfo consumerInfo = (KafkaConsumerInfo) o; + return Objects.equals(consumerGroup, consumerInfo.consumerGroup) + && Objects.equals(clientMetadata, consumerInfo.clientMetadata); + } + + @Override + public int hashCode() { + return 31 * (null == consumerGroup ? 0 : consumerGroup.hashCode()) + + (null == clientMetadata ? 0 : clientMetadata.hashCode()); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java new file mode 100644 index 00000000000..dd06c20c38d --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java @@ -0,0 +1,191 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_POLL; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * This instrumentation saves additional information from the KafkaConsumer, such as consumer group + * and cluster ID, in the context store for later use. + */ +@AutoService(InstrumenterModule.class) +public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public KafkaConsumerInfoInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", + KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.KafkaConsumer", KafkaConsumerInfo.class.getName()); + return contextStores; + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.kafka.clients.consumer.KafkaConsumer"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(nameStartsWith(hierarchyMarkerType())); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isConstructor() + .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig"))) + .and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer"))) + .and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))), + KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice"); + + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("poll")) + .and(takesArguments(1)) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice"); + } + + public static class ConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureGroup( + @Advice.This KafkaConsumer consumer, @Advice.Argument(0) ConsumerConfig consumerConfig) { + ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + + String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG); + String normalizedConsumerGroup = + consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null; + + if (normalizedConsumerGroup == null) { + if (groupMetadata != null) { + normalizedConsumerGroup = groupMetadata.groupId(); + } + } + List bootstrapServersList = + consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + String bootstrapServers = null; + if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) { + bootstrapServers = String.join(",", bootstrapServersList); + } + + KafkaConsumerInfo kafkaConsumerInfo; + if (Config.get().isDataStreamsEnabled()) { + kafkaConsumerInfo = + new KafkaConsumerInfo(normalizedConsumerGroup, groupMetadata, bootstrapServers); + } else { + kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, bootstrapServers); + } + + if (kafkaConsumerInfo.getConsumerGroup() != null + || kafkaConsumerInfo.getClientMetadata() != null) { + InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class) + .put(consumer, kafkaConsumerInfo); + // if (coordinator != null) { + // InstrumentationContext.get(ConsumerCoordinator.class, KafkaConsumerInfo.class) + // .put(coordinator, kafkaConsumerInfo); + // } + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so KafkaConsumerGroupInstrumentation does the same + record.headers(); + } + } + + /** + * this method transfers the consumer group from the KafkaConsumer class key to the + * ConsumerRecords key. This is necessary because in the poll method, we don't have access to the + * KafkaConsumer class. + */ + public static class RecordsAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter() { + boolean dataStreamsEnabled; + if (activeSpan() != null) { + dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled(); + } else { + dataStreamsEnabled = Config.get().isDataStreamsEnabled(); + } + if (dataStreamsEnabled) { + final AgentSpan span = startSpan(KAFKA_POLL); + return activateSpan(span); + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureGroup( + @Advice.Enter final AgentScope scope, + @Advice.This KafkaConsumer consumer, + @Advice.Return ConsumerRecords records) { + int recordsCount = 0; + if (records != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer); + if (kafkaConsumerInfo != null) { + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class) + .put(records, kafkaConsumerInfo); + } + recordsCount = records.count(); + } + if (scope == null) { + return; + } + AgentSpan span = scope.span(); + span.setTag(KAFKA_RECORDS_COUNT, recordsCount); + span.finish(); + scope.close(); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java new file mode 100644 index 00000000000..a9f25c3bebc --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java @@ -0,0 +1,159 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.CONSUMER_DECORATE; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_CONSUME; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +@AutoService(InstrumenterModule.class) +public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public KafkaConsumerInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(2); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName()); + return contextStores; + } + + @Override + public String instrumentedType() { + return "org.apache.kafka.clients.consumer.ConsumerRecords"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TextMapInjectAdapterInterface", + packageName + ".KafkaConsumerInfo", + packageName + ".KafkaConsumerInstrumentationHelper", + packageName + ".KafkaDecorator", + packageName + ".TextMapExtractAdapter", + packageName + ".TracingIterableDelegator", + packageName + ".TracingIterable", + packageName + ".TracingIterator", + packageName + ".TracingList", + packageName + ".TracingListIterator", + packageName + ".TextMapInjectAdapter", + "datadog.trace.instrumentation.kafka_common.Utils", + "datadog.trace.instrumentation.kafka_common.StreamingContext", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, String.class)) + .and(returns(Iterable.class)), + KafkaConsumerInstrumentation.class.getName() + "$IterableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) + .and(returns(List.class)), + KafkaConsumerInstrumentation.class.getName() + "$ListAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("iterator")) + .and(takesArguments(0)) + .and(returns(Iterator.class)), + KafkaConsumerInstrumentation.class.getName() + "$IteratorAdvice"); + } + + public static class IterableAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterable> iterable, + @Advice.This ConsumerRecords records) { + if (iterable != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, + InstrumentationContext.get(ConsumerGroupMetadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterable = + new TracingIterable( + iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); + } + } + } + + public static class ListAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) List> iterable, + @Advice.This ConsumerRecords records) { + if (iterable != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, + InstrumentationContext.get(ConsumerGroupMetadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterable = + new TracingList( + iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); + } + } + } + + public static class IteratorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterator> iterator, + @Advice.This ConsumerRecords records) { + if (iterator != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, + InstrumentationContext.get(ConsumerGroupMetadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterator = + new TracingIterator( + iterator, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java new file mode 100644 index 00000000000..82853bc4891 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java @@ -0,0 +1,30 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.ContextStore; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; + +public class KafkaConsumerInstrumentationHelper { + public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) { + if (kafkaConsumerInfo != null) { + return kafkaConsumerInfo.getConsumerGroup(); + } + return null; + } + + public static String extractClusterId( + KafkaConsumerInfo kafkaConsumerInfo, + ContextStore metadataContextStore) { + if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) { + ConsumerGroupMetadata consumerMetadata = kafkaConsumerInfo.getClientMetadata(); + if (consumerMetadata != null) { + return metadataContextStore.get(consumerMetadata); + } + } + return null; + } + + public static String extractBootstrapServers(KafkaConsumerInfo kafkaConsumerInfo) { + return kafkaConsumerInfo == null ? null : kafkaConsumerInfo.getBootstrapServers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java new file mode 100644 index 00000000000..16ff81ab572 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java @@ -0,0 +1,165 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import datadog.trace.api.Config; +import datadog.trace.api.Functions; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.api.naming.SpanNaming; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; + +public class KafkaDecorator extends MessagingClientDecorator { + private static final String KAFKA = "kafka"; + public static final CharSequence JAVA_KAFKA = UTF8BytesString.create("java-kafka"); + public static final CharSequence KAFKA_CONSUME = + UTF8BytesString.create( + SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA)); + + public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll"); + public static final CharSequence KAFKA_PRODUCE = + UTF8BytesString.create( + SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA)); + public static final CharSequence KAFKA_DELIVER = UTF8BytesString.create("kafka.deliver"); + public static final boolean KAFKA_LEGACY_TRACING = Config.get().isKafkaLegacyTracingEnabled(); + public static final boolean TIME_IN_QUEUE_ENABLED = + Config.get().isTimeInQueueEnabled(!KAFKA_LEGACY_TRACING, KAFKA); + public static final String KAFKA_PRODUCED_KEY = "x_datadog_kafka_produced"; + private final String spanKind; + private final CharSequence spanType; + private final String serviceName; + + private static final DDCache PRODUCER_RESOURCE_NAME_CACHE = + DDCaches.newFixedSizeCache(32); + private static final Functions.Prefix PRODUCER_PREFIX = new Functions.Prefix("Produce Topic "); + private static final DDCache CONSUMER_RESOURCE_NAME_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache PRODUCER_BOOSTRAP_SERVERS_CACHE = + DDCaches.newFixedSizeWeakKeyCache(16); + private static final Function BOOTSTRAP_SERVERS_JOINER = + pc -> String.join(",", pc.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + private static final Functions.Prefix CONSUMER_PREFIX = new Functions.Prefix("Consume Topic "); + + public static final KafkaDecorator PRODUCER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_PRODUCER, + InternalSpanTypes.MESSAGE_PRODUCER, + SpanNaming.instance() + .namingSchema() + .messaging() + .outboundService(KAFKA, KAFKA_LEGACY_TRACING)); + + public static final KafkaDecorator CONSUMER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_CONSUMER, + InternalSpanTypes.MESSAGE_CONSUMER, + SpanNaming.instance() + .namingSchema() + .messaging() + .inboundService(KAFKA, KAFKA_LEGACY_TRACING)); + + public static final KafkaDecorator BROKER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_BROKER, + InternalSpanTypes.MESSAGE_BROKER, + SpanNaming.instance().namingSchema().messaging().timeInQueueService(KAFKA)); + + protected KafkaDecorator(String spanKind, CharSequence spanType, String serviceName) { + this.spanKind = spanKind; + this.spanType = spanType; + this.serviceName = serviceName; + } + + @Override + protected CharSequence spanType() { + return spanType; + } + + @Override + protected String[] instrumentationNames() { + return new String[] {"kafka"}; + } + + @Override + protected String service() { + return serviceName; + } + + @Override + protected CharSequence component() { + return JAVA_KAFKA; + } + + @Override + protected String spanKind() { + return spanKind; + } + + public void onConsume( + final AgentSpan span, + final ConsumerRecord record, + String consumerGroup, + String bootstrapServers) { + if (record != null) { + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX)); + span.setTag(PARTITION, record.partition()); + span.setTag(OFFSET, record.offset()); + if (consumerGroup != null) { + span.setTag(CONSUMER_GROUP, consumerGroup); + } + + if (bootstrapServers != null) { + span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); + } + // TODO - do we really need both? This mechanism already adds a lot of... baggage. + // check to not record a duration if the message was sent from an old Kafka client + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + long consumeTime = NANOSECONDS.toMillis(span.getStartTime()); + final long produceTime = record.timestamp(); + span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + } + } + } + + public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) { + if (record != null) { + String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(topic); + if (Config.get().isMessageBrokerSplitByDestination()) { + span.setServiceName(topic); + } + } + } + + public void onProduce( + final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) { + if (record != null) { + if (record.partition() != null) { + span.setTag(PARTITION, record.partition()); + } + if (producerConfig != null) { + span.setTag( + KAFKA_BOOTSTRAP_SERVERS, + PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent( + producerConfig, BOOTSTRAP_SERVERS_JOINER)); + } + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java new file mode 100644 index 00000000000..bad09090337 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java @@ -0,0 +1,16 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import org.apache.kafka.common.header.Headers; + +public class NoopTextMapInjectAdapter implements TextMapInjectAdapterInterface { + + public static final NoopTextMapInjectAdapter NOOP_SETTER = new NoopTextMapInjectAdapter(); + + @Override + public void set(final Headers headers, final String key, final String value) {} + + @Override + public void set(Headers headers, String key, byte[] value) {} + + public void injectTimeInQueue(Headers headers) {} +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java new file mode 100644 index 00000000000..1b78fc59466 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java @@ -0,0 +1,74 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_PRODUCED_KEY; +import static java.nio.charset.StandardCharsets.UTF_8; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import java.nio.ByteBuffer; +import java.util.Base64; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextMapExtractAdapter + implements AgentPropagation.ContextVisitor, + AgentPropagation.BinaryContextVisitor { + + private static final Logger log = LoggerFactory.getLogger(TextMapExtractAdapter.class); + + public static final TextMapExtractAdapter GETTER = + new TextMapExtractAdapter(Config.get().isKafkaClientBase64DecodingEnabled()); + + private final Base64.Decoder base64; + + public TextMapExtractAdapter(boolean base64DecodeHeaders) { + this.base64 = base64DecodeHeaders ? Base64.getDecoder() : null; + } + + @Override + public void forEachKey(Headers carrier, AgentPropagation.KeyClassifier classifier) { + for (Header header : carrier) { + String key = header.key(); + byte[] value = header.value(); + if (null != value) { + String string = + base64 != null + ? new String(base64.decode(header.value()), UTF_8) + : new String(header.value(), UTF_8); + if (!classifier.accept(key, string)) { + return; + } + } + } + } + + @Override + public void forEachKey(Headers carrier, AgentPropagation.BinaryKeyClassifier classifier) { + for (Header header : carrier) { + String key = header.key(); + byte[] value = header.value(); + if (null != value) { + if (!classifier.accept(key, value)) { + return; + } + } + } + } + + public long extractTimeInQueueStart(Headers carrier) { + Header header = carrier.lastHeader(KAFKA_PRODUCED_KEY); + if (null != header) { + try { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.put(base64 != null ? base64.decode(header.value()) : header.value()); + buf.flip(); + return buf.getLong(); + } catch (Exception e) { + log.debug("Unable to get kafka produced time", e); + } + } + return 0; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java new file mode 100644 index 00000000000..9bc92b4f2ba --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java @@ -0,0 +1,27 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_PRODUCED_KEY; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.kafka.common.header.Headers; + +public class TextMapInjectAdapter implements TextMapInjectAdapterInterface { + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(final Headers headers, final String key, final String value) { + headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void set(Headers headers, String key, byte[] value) { + headers.remove(key).add(key, value); + } + + public void injectTimeInQueue(Headers headers) { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.putLong(System.currentTimeMillis()); + headers.add(KAFKA_PRODUCED_KEY, buf.array()); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java new file mode 100644 index 00000000000..e911c1a6df9 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java @@ -0,0 +1,8 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import org.apache.kafka.common.header.Headers; + +public interface TextMapInjectAdapterInterface extends AgentPropagation.BinarySetter { + public void injectTimeInQueue(Headers headers); +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java new file mode 100644 index 00000000000..59b53b1c363 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java @@ -0,0 +1,40 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingIterable implements Iterable>, TracingIterableDelegator { + private final Iterable> delegate; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingIterable( + final Iterable> delegate, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.delegate = delegate; + this.operationName = operationName; + this.decorator = decorator; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public Iterator> iterator() { + // every iteration will add spans. Not only the very first one + return new TracingIterator( + delegate.iterator(), operationName, decorator, group, clusterId, bootstrapServers); + } + + @Override + public Iterable> getDelegate() { + return delegate; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java new file mode 100644 index 00000000000..8b9f94a3e4f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java @@ -0,0 +1,8 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface TracingIterableDelegator { + // Used by the streams instrumentation to unwrap (disable) the iteration advice. + Iterable> getDelegate(); +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java new file mode 100644 index 00000000000..7f6c18c1b7f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -0,0 +1,146 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.BROKER_DECORATE; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_DELIVER; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.TIME_IN_QUEUE_ENABLED; +import static datadog.trace.instrumentation.kafka_clients38.TextMapExtractAdapter.GETTER; +import static datadog.trace.instrumentation.kafka_clients38.TextMapInjectAdapter.SETTER; +import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT; +import static datadog.trace.instrumentation.kafka_common.Utils.computePayloadSizeBytes; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import java.util.Iterator; +import java.util.LinkedHashMap; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TracingIterator implements Iterator> { + + private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); + + private final Iterator> delegateIterator; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingIterator( + final Iterator> delegateIterator, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.delegateIterator = delegateIterator; + this.operationName = operationName; + this.decorator = decorator; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public boolean hasNext() { + boolean moreRecords = delegateIterator.hasNext(); + if (!moreRecords) { + // no more records, use this as a signal to close the last iteration scope + closePrevious(true); + } + return moreRecords; + } + + @Override + public ConsumerRecord next() { + final ConsumerRecord next = delegateIterator.next(); + startNewRecordSpan(next); + return next; + } + + protected void startNewRecordSpan(ConsumerRecord val) { + try { + closePrevious(true); + AgentSpan span, queueSpan = null; + if (val != null) { + if (!Config.get().isKafkaClientPropagationDisabledForTopic(val.topic())) { + final Context spanContext = propagate().extract(val.headers(), GETTER); + long timeInQueueStart = GETTER.extractTimeInQueueStart(val.headers()); + if (timeInQueueStart == 0 || !TIME_IN_QUEUE_ENABLED) { + span = startSpan(operationName, spanContext); + } else { + queueSpan = + startSpan(KAFKA_DELIVER, spanContext, MILLISECONDS.toMicros(timeInQueueStart)); + BROKER_DECORATE.afterStart(queueSpan); + BROKER_DECORATE.onTimeInQueue(queueSpan, val); + span = startSpan(operationName, queueSpan.context()); + BROKER_DECORATE.beforeFinish(queueSpan); + // The queueSpan will be finished after inner span has been activated to ensure that + // spans are written out together by TraceStructureWriter when running in strict mode + } + + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_IN); + sortedTags.put(GROUP_TAG, group); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + sortedTags.put(TOPIC_TAG, val.topic()); + sortedTags.put(TYPE_TAG, "kafka"); + + final long payloadSize = + span.traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(val) : 0; + if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, sortedTags, val.timestamp(), payloadSize); + } else { + // when we're in a streaming context we want to consume only from source topics + if (STREAMING_CONTEXT.isSourceTopic(val.topic())) { + // We have to inject the context to headers here, + // since the data received from the source may leave the topology on + // some other instance of the application, breaking the context propagation + // for DSM users + propagate() + .injectPathwayContext( + span, val.headers(), SETTER, sortedTags, val.timestamp(), payloadSize); + } + } + } else { + span = startSpan(operationName, null); + } + if (val.value() == null) { + span.setTag(InstrumentationTags.TOMBSTONE, true); + } + decorator.afterStart(span); + decorator.onConsume(span, val, group, bootstrapServers); + activateNext(span); + if (null != queueSpan) { + queueSpan.finish(); + } + } + } catch (final Exception e) { + log.debug("Error starting new record span", e); + } + } + + @Override + public void remove() { + delegateIterator.remove(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingList.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingList.java new file mode 100644 index 00000000000..24cd318d175 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingList.java @@ -0,0 +1,161 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingList implements List>, TracingIterableDelegator { + + private final List> delegate; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingList( + final List> delegate, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.operationName = operationName; + this.decorator = decorator; + this.delegate = delegate; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return delegate.contains(o); + } + + @Override + public Iterator> iterator() { + return listIterator(0); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T[] toArray(final T[] a) { + return delegate.toArray(a); + } + + @Override + public boolean add(final ConsumerRecord consumerRecord) { + return delegate.add(consumerRecord); + } + + @Override + public boolean remove(final Object o) { + return delegate.remove(o); + } + + @Override + public boolean containsAll(final Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean addAll(final Collection> c) { + return delegate.addAll(c); + } + + @Override + public boolean addAll(final int index, final Collection> c) { + return delegate.addAll(index, c); + } + + @Override + public boolean removeAll(final Collection c) { + return delegate.removeAll(c); + } + + @Override + public boolean retainAll(final Collection c) { + return delegate.retainAll(c); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public ConsumerRecord get(final int index) { + // TODO: should this be instrumented as well? + return delegate.get(index); + } + + @Override + public ConsumerRecord set(final int index, final ConsumerRecord element) { + return delegate.set(index, element); + } + + @Override + public void add(final int index, final ConsumerRecord element) { + delegate.add(index, element); + } + + @Override + public ConsumerRecord remove(final int index) { + return delegate.remove(index); + } + + @Override + public int indexOf(final Object o) { + return delegate.indexOf(o); + } + + @Override + public int lastIndexOf(final Object o) { + return delegate.lastIndexOf(o); + } + + @Override + public ListIterator> listIterator() { + return listIterator(0); + } + + @Override + public ListIterator> listIterator(final int index) { + // every iteration will add spans. Not only the very first one + return new TracingListIterator( + delegate.listIterator(index), operationName, decorator, group, clusterId, bootstrapServers); + } + + @Override + public List> subList(final int fromIndex, final int toIndex) { + return new TracingList( + delegate.subList(fromIndex, toIndex), + operationName, + decorator, + group, + clusterId, + bootstrapServers); + } + + @Override + public List> getDelegate() { + return delegate; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java new file mode 100644 index 00000000000..9014ff51966 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/main/java/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java @@ -0,0 +1,65 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; + +import java.util.ListIterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingListIterator extends TracingIterator + implements ListIterator> { + + private final ListIterator> delegateIterator; + + public TracingListIterator( + ListIterator> delegateIterator, + CharSequence operationName, + KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + super(delegateIterator, operationName, decorator, group, clusterId, bootstrapServers); + this.delegateIterator = delegateIterator; + } + + @Override + public boolean hasPrevious() { + boolean moreRecords = delegateIterator.hasPrevious(); + if (!moreRecords) { + // no more records, use this as a signal to close the last iteration scope + closePrevious(true); + } + return moreRecords; + } + + @Override + public ConsumerRecord previous() { + final ConsumerRecord prev = delegateIterator.previous(); + startNewRecordSpan(prev); + return prev; + } + + @Override + public int nextIndex() { + return delegateIterator.nextIndex(); + } + + @Override + public int previousIndex() { + return delegateIterator.previousIndex(); + } + + /* + * org.apache.kafka.clients.consumer.ConsumerRecords::records(TopicPartition) always returns + * UnmodifiableList. Modifiable operations will lead to exception + */ + + @Override + public void set(ConsumerRecord consumerRecord) { + delegateIterator.set(consumerRecord); + } + + @Override + public void add(ConsumerRecord consumerRecord) { + delegateIterator.add(consumerRecord); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMock.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMock.java new file mode 100644 index 00000000000..5bea93a1875 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMock.java @@ -0,0 +1,7 @@ +public class AvroMock { + private final String schema; + + AvroMock(String schema) { + this.schema = schema; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMockSerializer.java b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMockSerializer.java new file mode 100644 index 00000000000..244359cae98 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/AvroMockSerializer.java @@ -0,0 +1,15 @@ +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; + +class AvroMockSerializer implements Serializer { + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public byte[] serialize(String topic, AvroMock data) { + return new byte[0]; + } + + @Override + public void close() {} +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/UpdatedKafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/UpdatedKafkaClientTestBase.groovy new file mode 100644 index 00000000000..47fff00fb72 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.1/src/test/groovy/UpdatedKafkaClientTestBase.groovy @@ -0,0 +1,337 @@ +import datadog.trace.agent.test.naming.VersionedNamingTestBase +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.DDSpan +import datadog.trace.core.datastreams.StatsGroup +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope + +abstract class UpdatedKafkaClientTestBase extends VersionedNamingTestBase { + static final List SHARED_TOPIC = List.of("topic1", "topic2", "topic3", "topic4") + static final String MESSAGE = "Testing without headers for certain topics" + + static final dataTable() { + [ + ["topic1,topic2,topic3,topic4", false, false, false, false], + ["topic1,topic2", false, false, true, true], + ["topic1", false, true, true, true], + ["", true, true, true, true], + ["randomTopic", true, true, true, true] + ] + } + + @Override + boolean useStrictTraceWrites() { + // TODO fix this by making sure that spans get closed properly + return false + } + + @Rule + public KafkaContainer embeddedKafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + .withEmbeddedZookeeper() + + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig("dd.kafka.e2e.duration.enabled", "true") + } + public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS + + // filter out Kafka poll, since the function is called in a loop, giving inconsistent results + final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + } + + final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll") && + trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0)) + } + } + + // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results + private static class SortKafkaTraces implements Comparator> { + @Override + int compare(List o1, List o2) { + return rootSpanTrace(o1) - rootSpanTrace(o2) + } + + int rootSpanTrace(List trace) { + assert !trace.isEmpty() + def rootSpan = trace.get(0).localRootSpan + switch (rootSpan.operationName.toString()) { + case "parent": + return 3 + case "kafka.poll": + return 2 + default: + return 1 + } + } + } + + static { + PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3) + PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out") + PRODUCER_PATHWAY_EDGE_TAGS.put("topic", SHARED_TOPIC) + PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka") + } + + def setup() { + TEST_WRITER.setFilter(dropKafkaPoll) + } + + @Override + int version() { + 0 + } + + @Override + String operation() { + return null + } + + String operationForProducer() { + "kafka.produce" + } + + String operationForConsumer() { + "kafka.consume" + } + + String serviceForTimeInQueue() { + "kafka" + } + + abstract boolean hasQueueSpan() + + abstract boolean splitByDestination() + + @Override + protected boolean isDataStreamsEnabled() { + return true + } + + def "test kafka produce and consume"() { + setup: + // Create and start a Kafka container using Testcontainers + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.senderProps(kafkaContainer.getBootstrapServers()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + TEST_WRITER.setFilter(dropEmptyKafkaPoll) + KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + String clusterId = "" + if (isDataStreamsEnabled()) { + producer.flush() + clusterId = producer.metadata.cluster.clusterResource().clusterId() + while (clusterId == null || clusterId.isEmpty()) { + Thread.sleep(1500) + clusterId = producer.metadata.cluster.clusterResource().clusterId() + } + } + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", kafkaContainer.getBootstrapServers()) + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + // set the topic that needs to be consumed + def containerProperties = containerProperties() + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) + // ensure consistent ordering of traces + records.add(record) + } + }) + // start the container and underlying message listener + container.start() + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, kafkaContainer.getNumberOfPartitions()) + when: + String greeting = "Hello Spring Kafka Sender!" + runUnderTrace("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + assert activeScope().isAsyncPropagating() + if (ex == null) { + runUnderTrace("producer callback") {} + } else { + runUnderTrace("producer exception: " + ex) {} + } + } + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + int nTraces = isDataStreamsEnabled() ? 3 : 2 + int produceTraceIdx = nTraces - 1 + TEST_WRITER.waitForTraces(nTraces) + def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) + Collections.sort(traces, new SortKafkaTraces()) + assertTraces(nTraces, new SortKafkaTraces()) { + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, span(1)) + queueSpan(it, trace(produceTraceIdx)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2]) + } + } + if (isDataStreamsEnabled()) { + trace(1, { pollSpan(it) }) + } + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + } + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}" + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka"] + edgeTags.size() == 4 + } + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId", + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + List produce = [ + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_produce" + ] + List commit = [ + "consumer_group:sender", + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:$SHARED_TOPIC", + "type:kafka_commit" + ] + verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { + contains(new AbstractMap.SimpleEntry, Long>(commit, 1).toString()) + contains(new AbstractMap.SimpleEntry, Long>(produce, 0).toString()) + } + } + + cleanup: + producer.close() + container?.stop() + kafkaContainer.stop() + } + + +} +abstract class UpdatedKafkaClientForkedTest extends UpdatedKafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") + injectSysConfig("dd.service", "KafkaClientTest") + } + + @Override + boolean hasQueueSpan() { + return true + } + + @Override + boolean splitByDestination() { + return false + } +} + +class UKafkaClientV0ForkedTest extends UpdatedKafkaClientForkedTest { + @Override + String service() { + return "KafkaClientTest" + } +} + +class UKafkaClientV1ForkedTest extends UpdatedKafkaClientForkedTest { + @Override + int version() { + 1 + } + + @Override + String service() { + return "KafkaClientTest" + } + + @Override + String operationForProducer() { + "kafka.send" + } + + @Override + String operationForConsumer() { + return "kafka.process" + } + + @Override + String serviceForTimeInQueue() { + "kafka-queue" + } + + @Override + boolean hasQueueSpan() { + false + } +} diff --git a/dd-java-agent/instrumentation/spring-kafka-3.8/build.gradle b/dd-java-agent/instrumentation/spring-kafka-3.8/build.gradle new file mode 100644 index 00000000000..75e19a6bdf8 --- /dev/null +++ b/dd-java-agent/instrumentation/spring-kafka-3.8/build.gradle @@ -0,0 +1,19 @@ +plugins { + id("java") +} + +group = "com.datadoghq" +version = "1.39.0-SNAPSHOT" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation(platform("org.junit:junit-bom:5.10.0")) + testImplementation("org.junit.jupiter:junit-jupiter") +} + +tasks.test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index daa3070c2eb..3424d61f528 100644 --- a/settings.gradle +++ b/settings.gradle @@ -480,3 +480,8 @@ include ':dd-java-agent:benchmark' include ':dd-java-agent:benchmark-integration' include ':dd-java-agent:benchmark-integration:jetty-perftest' include ':dd-java-agent:benchmark-integration:play-perftest' +include 'dd-java-agent:kafka-clients-3.1' +findProject(':dd-java-agent:kafka-clients-3.1')?.name = 'kafka-clients-3.1' +include 'dd-java-agent:instrumentation:kafka-clients-3.1' +findProject(':dd-java-agent:instrumentation:kafka-clients-3.1')?.name = 'kafka-clients-3.1' +