diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle index 5560d1f6c4f..4335e7a754d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle @@ -2,7 +2,7 @@ muzzle { pass { group = "org.apache.kafka" module = "kafka-clients" - versions = "[0.11.0.0,)" + versions = "[0.11.0.0,3.7.0)" assertInverse = true } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java index 5a299a30bda..fdc012a5276 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.core.datastreams.TagsProcessor.CONSUMER_GROUP_TAG; import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; @@ -17,11 +18,13 @@ import java.util.LinkedHashMap; import java.util.Map; import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.RequestFuture; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; @AutoService(InstrumenterModule.class) @@ -32,6 +35,13 @@ public ConsumerCoordinatorInstrumentation() { super("kafka"); } + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public Map contextStore() { Map contextStores = new HashMap<>(); @@ -107,10 +117,12 @@ public static void trackCommitOffset( } } - public static void muzzleCheck(ConsumerRecord record) { + public static void muzzleCheck(ConsumerRecord record, Producer producer) { // KafkaConsumerInstrumentation only applies for kafka versions with headers // Make an explicit call so ConsumerCoordinatorInstrumentation does the same record.headers(); + record.checksum(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java index 2ef11751691..cd747d641ae 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -9,6 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -24,12 +26,14 @@ import java.util.List; import java.util.Map; import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; +import org.apache.kafka.clients.producer.Producer; /** * This instrumentation saves additional information from the KafkaConsumer, such as consumer group @@ -57,6 +61,13 @@ public Map contextStore() { return contextStores; } + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public String instrumentedType() { return "org.apache.kafka.clients.consumer.KafkaConsumer"; @@ -176,4 +187,11 @@ public static void captureGroup( scope.close(); } } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index 6d488d3a759..111727118fe 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -1,10 +1,12 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_CONSUME; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -18,9 +20,11 @@ import java.util.List; import java.util.Map; import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; @AutoService(InstrumenterModule.class) public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing @@ -39,6 +43,12 @@ public Map contextStore() { return contextStores; } + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public String instrumentedType() { return "org.apache.kafka.clients.consumer.ConsumerRecords"; @@ -108,6 +118,14 @@ public static void wrap( iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); } } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + record.checksum(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } public static class ListAdvice { @@ -130,6 +148,14 @@ public static void wrap( iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); } } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + record.checksum(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } public static class IteratorAdvice { @@ -152,5 +178,13 @@ public static void wrap( iterator, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); } } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + record.checksum(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDeserializerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDeserializerInstrumentation.java index 23690c63b8f..557705b3ee7 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDeserializerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDeserializerInstrumentation.java @@ -1,8 +1,10 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; @@ -20,6 +22,8 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.Deserializer; @AutoService(InstrumenterModule.class) @@ -44,6 +48,12 @@ public KafkaDeserializerInstrumentation() { super("kafka"); } + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public String hierarchyMarkerType() { return DESERIALIZER_CLASS; @@ -95,6 +105,13 @@ public static void configure( InstrumentationContext.get(Deserializer.class, Boolean.class); KafkaIastHelper.configure(store, deserializer, isKey); } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } @SuppressWarnings("rawtypes") @@ -120,6 +137,13 @@ public static void afterDeserialize( InstrumentationContext.get(Deserializer.class, Boolean.class); KafkaIastHelper.afterDeserialize(ctx, store, deserializer, result); } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } @SuppressWarnings({"rawtypes", "JavaExistingMethodCanBeUsed"}) @@ -145,6 +169,13 @@ public static void afterDeserialize( InstrumentationContext.get(Deserializer.class, Boolean.class); KafkaIastHelper.afterDeserialize(ctx, store, deserializer, result); } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } @SuppressWarnings({"rawtypes", "JavaExistingMethodCanBeUsed"}) @@ -170,5 +201,12 @@ public static void afterDeserialize( InstrumentationContext.get(Deserializer.class, Boolean.class); KafkaIastHelper.afterDeserialize(ctx, store, deserializer, result); } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4b09bd46af1..93d730fd81d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -18,6 +19,7 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; @@ -33,9 +35,12 @@ import java.util.LinkedHashMap; import java.util.Map; import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.internals.Sender; @@ -49,6 +54,12 @@ public KafkaProducerInstrumentation() { super("kafka"); } + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public String instrumentedType() { return "org.apache.kafka.clients.producer.KafkaProducer"; @@ -177,6 +188,13 @@ public static void stopSpan( PRODUCER_DECORATE.beforeFinish(scope); scope.close(); } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } public static class PayloadSizeAdvice { @@ -205,5 +223,12 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) AgentTracer.get().getDataStreamsMonitoring().add(updated); } } + + public static void muzzleCheck(ConsumerRecord record, Producer producer) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); + } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java index e9a10abd035..e85fc38e546 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java @@ -1,9 +1,11 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; @@ -16,6 +18,7 @@ import net.bytebuddy.matcher.ElementMatcher; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.requests.MetadataResponse; @@ -27,6 +30,12 @@ public MetadataInstrumentation() { super("kafka"); } + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching kafka 3.8 which has its own instrumentation + return not( + hasClassNamed("org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker")); + } + @Override public String hierarchyMarkerType() { return "org.apache.kafka.clients.Metadata"; @@ -88,10 +97,11 @@ public static void onEnter( } } - public static void muzzleCheck(ConsumerRecord record) { + public static void muzzleCheck(ConsumerRecord record, Producer producer) { // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so MetadataInstrumentation does the same + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same record.headers(); + producer.close(2, java.util.concurrent.TimeUnit.SECONDS); } } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle b/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle index c662f285fcd..fceec499038 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle @@ -6,7 +6,7 @@ muzzle { group = "org.apache.kafka" module = "kafka-clients" versions = "[3.8.0,)" - assertInverse = false + assertInverse = true } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java index 9e17baae146..73d4bd3cb46 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java @@ -7,7 +7,6 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; @@ -54,9 +53,8 @@ public static void captureGroup( } } - public static void muzzleCheck(ConsumerRecord record) { - // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so KafkaConsumerGroupInstrumentation does the same - record.headers(); + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java index ecec8bd1e77..6d6e999328f 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java @@ -12,9 +12,9 @@ import java.util.Map; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.RequestFuture; import org.apache.kafka.common.TopicPartition; @@ -66,9 +66,8 @@ public static void trackCommitOffset( } } - public static void muzzleCheck(ConsumerRecord record) { - // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so ConsumerCoordinatorInstrumentation does the same - record.headers(); + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java index d9471fbbc8f..6c3684ecc00 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class IterableAdvice { @@ -31,4 +32,9 @@ public static void wrap( bootstrapServers); } } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java index e0021f2f024..d40f4abd459 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class IteratorAdvice { @@ -32,4 +33,9 @@ public static void wrap( bootstrapServers); } } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java index 1791aa7e308..6d09b45644f 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java @@ -3,6 +3,7 @@ import datadog.trace.api.Config; import datadog.trace.bootstrap.ContextStore; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class KafkaConsumerInstrumentationHelper { public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) { @@ -26,4 +27,9 @@ public static String extractClusterId( public static String extractBootstrapServers(KafkaConsumerInfo kafkaConsumerInfo) { return kafkaConsumerInfo == null ? null : kafkaConsumerInfo.getBootstrapServers().get(); } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java index 811bd804698..3c48f8b4d88 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java @@ -6,9 +6,9 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class LegacyConstructorAdvice { // new - capture the ConsumerDelegate instead of KafkaConsumer @@ -46,9 +46,8 @@ public static void captureGroup( } } - public static void muzzleCheck(ConsumerRecord record) { - // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so KafkaConsumerGroupInstrumentation does the same - record.headers(); + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java index 8ac0cf4d307..d845aeaeb5d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java @@ -9,6 +9,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class ListAdvice { @@ -30,4 +31,9 @@ public static void wrap( iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); } } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java index 0ed5c124a81..8d99a885d80 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java @@ -3,7 +3,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.common.requests.MetadataResponse; public class MetadataUpdate22AndAfterAdvice { @@ -15,9 +15,8 @@ public static void onEnter( } } - public static void muzzleCheck(ConsumerRecord record) { - // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so MetadataInstrumentation does the same - record.headers(); + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java index 29847323a98..72b11d4585d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java @@ -3,7 +3,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.common.Cluster; public class MetadataUpdateBefore22Advice { @@ -16,9 +16,8 @@ public static void onEnter( } } - public static void muzzleCheck(ConsumerRecord record) { - // KafkaConsumerInstrumentation only applies for kafka versions with headers - // Make an explicit call so MetadataInstrumentation does the same - record.headers(); + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java index fc58fff319d..2047873c2d1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java @@ -15,4 +15,9 @@ public static void onEnqueue( .get(callbackInvoker); callback = new DDOffsetCommitCallback(callback, kafkaConsumerInfo); } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java index 47a2e6d0201..630c1dc965f 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java @@ -5,6 +5,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.StatsPoint; import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; public class PayloadSizeAdvice { @@ -32,4 +33,9 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) AgentTracer.get().getDataStreamsMonitoring().add(updated); } } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index 0d245d16bf4..b67c750dea6 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -23,6 +23,7 @@ import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -114,4 +115,9 @@ public static void stopSpan( PRODUCER_DECORATE.beforeFinish(scope); scope.close(); } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } } diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java index eafa5f3bfd8..d6ef50765e2 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java @@ -13,6 +13,7 @@ import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; /** * this method transfers the consumer group from the KafkaConsumer class key to the ConsumerRecords @@ -60,4 +61,9 @@ public static void captureGroup( span.finish(); scope.close(); } + + public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) { + // Only applies for kafka versions with OffsetCommitCallbackInvoker + invoker.executeCallbacks(); + } }