Skip to content

Commit

Permalink
remove debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Nov 12, 2024
1 parent 31cd5c9 commit 9c13633
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,42 +76,31 @@ public void methodAdvice(MethodTransformer transformer) {
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig")))
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice");
KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdviceBefore27");

transformer.applyAdvice(
isConstructor()
.and(takesArgument(0, Map.class))
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$SecondConstructorAdvice");
KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice27AndAfter");

transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(takesArgument(0, long.class))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice");

transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(takesArgument(0, named("java.time.Duration")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice");
}

public static class ConstructorAdvice {
public static class ConstructorAdviceBefore27 {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureGroup(
@Advice.This KafkaConsumer consumer,
@Advice.FieldValue("metadata") Metadata metadata,
@Advice.FieldValue("coordinator") ConsumerCoordinator coordinator,
@Advice.Argument(0) ConsumerConfig consumerConfig) {
System.out.println("constructor advice!!!");
String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
String normalizedConsumerGroup =
consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null;
Expand Down Expand Up @@ -149,40 +138,23 @@ public static void muzzleCheck(ConsumerRecord record) {
}
}

public static class SecondConstructorAdvice {
public static class ConstructorAdvice27AndAfter {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureGroup(
@Advice.This KafkaConsumer consumer,
@Advice.FieldValue("metadata") Metadata metadata,
@Advice.FieldValue("coordinator") ConsumerCoordinator coordinator,
@Advice.Argument(0) Map<String, Object> consumerConfig) {
System.out.println("new constructor advice!!!");
Object groupID = consumerConfig.get("group.id");
Object groupID = consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
String consumerGroup = groupID instanceof String ? (String) groupID : null;
String normalizedConsumerGroup =
consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null;
System.out.println("consume group " + normalizedConsumerGroup);

String bootstrapServers = null;
Object bootstrapServersObj = consumerConfig.get("bootstrap.servers");
Object bootstrapServersObj = consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootstrapServersObj instanceof String) {
bootstrapServers = (String) bootstrapServersObj;
System.out.println("bootstrap servers " + bootstrapServers);
} else {
System.out.println("it s not a string");
}
// if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) {
// bootstrapServers = String.join(",", bootstrapServersList);
// }
// Object bootstrapServersObj = consumerConfig.get("bootstrap.servers");
// if (bootstrapServersObj instanceof List) {
// List<?> tempList = (List<?>) bootstrapServersObj;

// // Verify each element is a String
// if (!tempList.isEmpty() && tempList.stream().allMatch(element -> element instanceof String)) {
// bootstrapServers = String.join(",", (List<String>) tempList);
// }
// }
KafkaConsumerInfo kafkaConsumerInfo;
if (Config.get().isDataStreamsEnabled()) {
kafkaConsumerInfo =
Expand Down Expand Up @@ -235,7 +207,6 @@ public static void captureGroup(
@Advice.Enter final AgentScope scope,
@Advice.This KafkaConsumer consumer,
@Advice.Return ConsumerRecords records) {
System.out.println("polling!");
int recordsCount = 0;
if (records != null) {
KafkaConsumerInfo kafkaConsumerInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ public static void wrap(
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records);
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
System.out.println("consumer info retrieved" + group);
String clusterId =
KafkaConsumerInstrumentationHelper.extractClusterId(
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
System.out.println("cluster ID retrieved" + clusterId);
String bootstrapServers =
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
iterator =
Expand Down

0 comments on commit 9c13633

Please sign in to comment.