diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java index b1b5607149cc..685c7161e002 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java @@ -76,42 +76,31 @@ public void methodAdvice(MethodTransformer transformer) { .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig"))) .and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer"))) .and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))), - KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice"); + KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdviceBefore27"); transformer.applyAdvice( isConstructor() .and(takesArgument(0, Map.class)) .and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer"))) .and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))), - KafkaConsumerInfoInstrumentation.class.getName() + "$SecondConstructorAdvice"); + KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice27AndAfter"); transformer.applyAdvice( isMethod() .and(isPublic()) .and(named("poll")) .and(takesArguments(1)) - .and(takesArgument(0, long.class)) - .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), - KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice"); - - transformer.applyAdvice( - isMethod() - .and(isPublic()) - .and(named("poll")) - .and(takesArguments(1)) - .and(takesArgument(0, named("java.time.Duration"))) .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice"); } - public static class ConstructorAdvice { + public static class ConstructorAdviceBefore27 { @Advice.OnMethodExit(suppress = Throwable.class) public static void captureGroup( @Advice.This KafkaConsumer consumer, @Advice.FieldValue("metadata") Metadata metadata, @Advice.FieldValue("coordinator") ConsumerCoordinator coordinator, @Advice.Argument(0) ConsumerConfig consumerConfig) { - System.out.println("constructor advice!!!"); String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG); String normalizedConsumerGroup = consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null; @@ -149,40 +138,23 @@ public static void muzzleCheck(ConsumerRecord record) { } } - public static class SecondConstructorAdvice { + public static class ConstructorAdvice27AndAfter { @Advice.OnMethodExit(suppress = Throwable.class) public static void captureGroup( @Advice.This KafkaConsumer consumer, @Advice.FieldValue("metadata") Metadata metadata, @Advice.FieldValue("coordinator") ConsumerCoordinator coordinator, @Advice.Argument(0) Map consumerConfig) { - System.out.println("new constructor advice!!!"); - Object groupID = consumerConfig.get("group.id"); + Object groupID = consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG); String consumerGroup = groupID instanceof String ? (String) groupID : null; String normalizedConsumerGroup = consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null; - System.out.println("consume group " + normalizedConsumerGroup); String bootstrapServers = null; - Object bootstrapServersObj = consumerConfig.get("bootstrap.servers"); + Object bootstrapServersObj = consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); if (bootstrapServersObj instanceof String) { bootstrapServers = (String) bootstrapServersObj; - System.out.println("bootstrap servers " + bootstrapServers); - } else { - System.out.println("it s not a string"); } - // if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) { - // bootstrapServers = String.join(",", bootstrapServersList); - // } - // Object bootstrapServersObj = consumerConfig.get("bootstrap.servers"); - // if (bootstrapServersObj instanceof List) { - // List tempList = (List) bootstrapServersObj; - - // // Verify each element is a String - // if (!tempList.isEmpty() && tempList.stream().allMatch(element -> element instanceof String)) { - // bootstrapServers = String.join(",", (List) tempList); - // } - // } KafkaConsumerInfo kafkaConsumerInfo; if (Config.get().isDataStreamsEnabled()) { kafkaConsumerInfo = @@ -235,7 +207,6 @@ public static void captureGroup( @Advice.Enter final AgentScope scope, @Advice.This KafkaConsumer consumer, @Advice.Return ConsumerRecords records) { - System.out.println("polling!"); int recordsCount = 0; if (records != null) { KafkaConsumerInfo kafkaConsumerInfo = diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index 25a17113b2a7..6d488d3a759a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -142,11 +142,9 @@ public static void wrap( KafkaConsumerInfo kafkaConsumerInfo = InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); - System.out.println("consumer info retrieved" + group); String clusterId = KafkaConsumerInstrumentationHelper.extractClusterId( kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class)); - System.out.println("cluster ID retrieved" + clusterId); String bootstrapServers = KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); iterator =