Skip to content

Update muzzleCheck for kafka #7912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ muzzle {
group = "org.apache.kafka"
module = "kafka-clients"
versions = "[3.8.0,)"
assertInverse = false
assertInverse = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

Expand Down Expand Up @@ -54,9 +53,8 @@ public static void captureGroup(
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
record.headers();
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.TopicPartition;

Expand Down Expand Up @@ -66,9 +66,8 @@ public static void trackCommitOffset(
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so ConsumerCoordinatorInstrumentation does the same
record.headers();
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class IterableAdvice {

Expand All @@ -31,4 +32,9 @@ public static void wrap(
bootstrapServers);
}
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class IteratorAdvice {

Expand All @@ -32,4 +33,9 @@ public static void wrap(
bootstrapServers);
}
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.trace.api.Config;
import datadog.trace.bootstrap.ContextStore;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class KafkaConsumerInstrumentationHelper {
public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
Expand All @@ -26,4 +27,9 @@ public static String extractClusterId(
public static String extractBootstrapServers(KafkaConsumerInfo kafkaConsumerInfo) {
return kafkaConsumerInfo == null ? null : kafkaConsumerInfo.getBootstrapServers().get();
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class LegacyConstructorAdvice {
// new - capture the ConsumerDelegate instead of KafkaConsumer
Expand Down Expand Up @@ -46,9 +46,8 @@ public static void captureGroup(
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
record.headers();
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class ListAdvice {

Expand All @@ -30,4 +31,9 @@ public static void wrap(
iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers);
}
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.common.requests.MetadataResponse;

public class MetadataUpdate22AndAfterAdvice {
Expand All @@ -15,9 +15,8 @@ public static void onEnter(
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so MetadataInstrumentation does the same
record.headers();
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.common.Cluster;

public class MetadataUpdateBefore22Advice {
Expand All @@ -16,9 +16,8 @@ public static void onEnter(
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so MetadataInstrumentation does the same
record.headers();
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public static void onEnqueue(
.get(callbackInvoker);
callback = new DDOffsetCommitCallback(callback, kafkaConsumerInfo);
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

public class PayloadSizeAdvice {

Expand Down Expand Up @@ -32,4 +33,9 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize)
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -114,4 +115,9 @@ public static void stopSpan(
PRODUCER_DECORATE.beforeFinish(scope);
scope.close();
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;

/**
* this method transfers the consumer group from the KafkaConsumer class key to the ConsumerRecords
Expand Down Expand Up @@ -60,4 +61,9 @@ public static void captureGroup(
span.finish();
scope.close();
}

public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
// Only applies for kafka versions with OffsetCommitCallbackInvoker
invoker.executeCallbacks();
}
}