From 8e1ab2bcd89e9b0eacd4bd96635cc1ef07b49599 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 7 Nov 2024 08:40:24 +0100 Subject: [PATCH] Test support for reactor-kafka (#7886) * Test support for reactor-kafka * leftover * Run test foked * dispose kafka receivers at test end * Test on 3.8 are just flaky --- .../kafka-clients-0.11/build.gradle | 5 + .../groovy/KafkaReactorForkedTest.groovy | 230 ++++++++++++++++++ .../test/groovy/KafkaClientTestBase.groovy | 43 ++-- .../test/groovy/KafkaReactorForkedTest.groovy | 218 +++++++++++++++++ 4 files changed, 475 insertions(+), 21 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle index 0168abcdd42..5560d1f6c4f 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle @@ -19,10 +19,13 @@ dependencies { testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE' testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE' + testImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.0.0.RELEASE' testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+' testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0' testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1') + testRuntimeOnly project(':dd-java-agent:instrumentation:reactor-core-3.1') + testRuntimeOnly project(':dd-java-agent:instrumentation:reactive-streams') testImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) @@ -38,6 +41,8 @@ dependencies { // This seems to help with jar compatibility hell. latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+' latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+' + // latest depending to kafka client 2.x -> to be fixed when this instrumentation will test 3.x as well + latestDepTestImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.3.21' latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.+' latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.+' latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+' diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy new file mode 100644 index 00000000000..0930c836fe8 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaReactorForkedTest.groovy @@ -0,0 +1,230 @@ +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.Rule +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.rule.EmbeddedKafkaRule +import org.springframework.kafka.test.utils.KafkaTestUtils +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import reactor.kafka.sender.SenderRecord + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaReactorForkedTest extends AgentTestRunner { + @Rule + EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, 4, KafkaClientTest.SHARED_TOPIC) + EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + + + @Override + boolean useStrictTraceWrites() { + false + } + + def setup() { + TEST_WRITER.setFilter(new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + }) + } + + def "test reactive produce and consume"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTest.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + records.add(receiverRecord) + receiverRecord.receiverOffset().commit() + } + }.subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + runUnderTrace("parent") { + kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null))) + .doOnError { ex -> runUnderTrace("producer exception: " + ex) {} } + .doOnNext { runUnderTrace("producer callback") {} } + .blockFirst() + blockUntilChildSpansFinished(2) + } + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + + assertTraces(2, SORT_TRACES_BY_START) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0)) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def "test reactive 100 msg produce and consume have only one parent"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTest.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + receiverRecord.receiverOffset().commit() + } + } + .subscribeOn(Schedulers.parallel()) + .subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + Flux.range(0, 100) + .flatMap { kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null))) } + .publishOn(Schedulers.parallel()) + .subscribe() + then: + // check that the all the consume (100) and the send (100) are reported + TEST_WRITER.waitForTraces(200) + Map> traces = TEST_WRITER.inject([:]) { map, entry -> + def key = entry.get(0).getTraceId().toString() + map[key] = (map[key] ?: []) + entry + return map + } + traces.values().each { + assert it.size() == 2 + int produceIndex = 0 + int consumeIndex = 1 + if ("kafka.produce".contentEquals(it.get(1).getOperationName().toString())) { + produceIndex = 1 + consumeIndex = 0 + } + //assert that the consumer has the producer as parent and that the producer is root + assert it.get(consumeIndex).getParentId() == it.get(produceIndex).getSpanId() + assert it.get(produceIndex).getParentId() == 0 + } + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $KafkaClientTest.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $KafkaClientTest.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { Integer } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + defaultTags(true) + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index f17c4ad8fb5..cecf73f26d4 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,5 +1,3 @@ -import datadog.trace.common.writer.ListWriter - import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope @@ -11,6 +9,7 @@ import datadog.trace.api.Config import datadog.trace.api.DDTags import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter import datadog.trace.core.DDSpan import datadog.trace.core.datastreams.StatsGroup import datadog.trace.test.util.Flaky @@ -34,6 +33,7 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared import java.util.concurrent.ExecutionException import java.util.concurrent.Future @@ -56,7 +56,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS // filter out Kafka poll, since the function is called in a loop, giving inconsistent results - final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Shared + static final ListWriter.Filter DROP_KAFKA_POLL = new ListWriter.Filter() { @Override boolean accept(List trace) { return !(trace.size() == 1 && @@ -103,7 +104,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } def setup() { - TEST_WRITER.setFilter(dropKafkaPoll) + TEST_WRITER.setFilter(DROP_KAFKA_POLL) } @Override @@ -299,15 +300,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } List produce = [ "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ] List commit = [ "consumer_group:sender", "kafka_cluster_id:$clusterId", - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_commit" ] verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { @@ -452,15 +453,15 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { } List produce = [ "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_produce" ] List commit = [ "consumer_group:sender", "kafka_cluster_id:$clusterId".toString(), - "partition:"+received.partition(), - "topic:"+SHARED_TOPIC, + "partition:" + received.partition(), + "topic:" + SHARED_TOPIC, "type:kafka_commit" ] verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { @@ -1014,7 +1015,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def producerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, boolean partitioned = true, boolean tombstone = false, @@ -1042,7 +1043,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } if (schema != null) { "$DDTags.SCHEMA_DEFINITION" schema @@ -1063,7 +1064,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { DDSpan parentSpan = null ) { trace.span { - serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() + serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() operationName "kafka.deliver" resourceName "$SHARED_TOPIC" spanType "queue" @@ -1084,7 +1085,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def consumerSpan( TraceAssert trace, - Map config, + Map config, DDSpan parentSpan = null, Range offset = 0..0, boolean tombstone = false, @@ -1114,7 +1115,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (tombstone) { "$InstrumentationTags.TOMBSTONE" true } - if ({isDataStreamsEnabled()}) { + if ({ isDataStreamsEnabled() }) { "$DDTags.PATHWAY_HASH" { String } } defaultTags(distributedRootSpan) @@ -1146,9 +1147,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { kafkaTemplate.flush() Producer wrappedProducer = kafkaTemplate.getTheProducer() - assert(wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) + assert (wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) Producer producer = wrappedProducer.delegate - assert(producer instanceof KafkaProducer) + assert (producer instanceof KafkaProducer) String clusterId = producer.metadata.cluster.clusterResource().clusterId() while (clusterId == null || clusterId.isEmpty()) { Thread.sleep(1500) @@ -1265,12 +1266,12 @@ abstract class KafkaClientLegacyTracingForkedTest extends KafkaClientTestBase { } } -class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest{ +class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest { } -class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest{ +class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest { @Override int version() { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy new file mode 100644 index 00000000000..da6eaf91644 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy @@ -0,0 +1,218 @@ +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.Rule +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.KafkaTestUtils +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.kafka.receiver.KafkaReceiver +import reactor.kafka.receiver.ReceiverOptions +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import reactor.kafka.sender.SenderRecord + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaReactorForkedTest extends AgentTestRunner { + @Rule + // create 4 partitions for more parallelism + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 4, KafkaClientTestBase.SHARED_TOPIC) + + @Override + boolean useStrictTraceWrites() { + false + } + + def setup() { + TEST_WRITER.setFilter(KafkaClientTestBase.DROP_KAFKA_POLL) + } + + def "test reactive produce and consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTestBase.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + records.add(receiverRecord) + receiverRecord.receiverOffset().commit() + } + }.subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + runUnderTrace("parent") { + kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, greeting), null))) + .doOnError { ex -> runUnderTrace("producer exception: " + ex) {} } + .doOnNext { runUnderTrace("producer callback") {} } + .blockFirst() + blockUntilChildSpansFinished(2) + } + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + + assertTraces(2, SORT_TRACES_BY_START) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0)) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def "test reactive 100 msg produce and consume have only one parent"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + + def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps)) + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic()) + + final KafkaReceiver kafkaReceiver = KafkaReceiver.create(ReceiverOptions. create(consumerProperties) + .subscription([KafkaClientTestBase.SHARED_TOPIC]) + .addAssignListener { + it.each { subscriptionReady.countDown() } + }) + + // create a thread safe queue to store the received message + kafkaReceiver.receive() + // publish on another thread to be sure we're propagating that receive span correctly + .publishOn(Schedulers.parallel()) + .flatMap { receiverRecord -> + { + receiverRecord.receiverOffset().commit() + } + } + .subscribeOn(Schedulers.parallel()) + .subscribe() + + + // wait until the container has the required number of assigned partitions + subscriptionReady.await() + + when: + String greeting = "Hello Reactor Kafka Sender!" + Flux.range(0, 100) + .flatMap { kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, greeting), null))) } + .publishOn(Schedulers.parallel()) + .subscribe() + then: + // check that the all the consume (100) and the send (100) are reported + TEST_WRITER.waitForTraces(200) + Map> traces = TEST_WRITER.inject([:]) { map, entry -> + def key = entry.get(0).getTraceId().toString() + map[key] = (map[key] ?: []) + entry + return map + } + traces.values().each { + assert it.size() == 2 + int produceIndex = 0 + int consumeIndex = 1 + if ("kafka.produce".contentEquals(it.get(1).getOperationName().toString())) { + produceIndex = 1 + consumeIndex = 0 + } + //assert that the consumer has the producer as parent and that the producer is root + assert it.get(consumeIndex).getParentId() == it.get(produceIndex).getSpanId() + assert it.get(produceIndex).getParentId() == 0 + } + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $KafkaClientTestBase.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null) { + trace.span { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $KafkaClientTestBase.SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { Integer } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + defaultTags(true) + } + } + } +}