Skip to content

Commit

Permalink
Test support for reactor-kafka (#7886)
Browse files Browse the repository at this point in the history
* Test support for reactor-kafka

* leftover

* Run test foked

* dispose kafka receivers at test end

* Test on 3.8 are just flaky
  • Loading branch information
amarziali authored Nov 7, 2024
1 parent a63909c commit 8e1ab2b
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 21 deletions.
5 changes: 5 additions & 0 deletions dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ dependencies {
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE'
testImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.0.0.RELEASE'
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1')
testRuntimeOnly project(':dd-java-agent:instrumentation:reactor-core-3.1')
testRuntimeOnly project(':dd-java-agent:instrumentation:reactive-streams')
testImplementation(testFixtures(project(':dd-java-agent:agent-iast')))


Expand All @@ -38,6 +41,8 @@ dependencies {
// This seems to help with jar compatibility hell.
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+'
// latest depending to kafka client 2.x -> to be fixed when this instrumentation will test 3.x as well
latestDepTestImplementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.3.21'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.+'
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.common.writer.ListWriter
import datadog.trace.core.DDSpan
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.Rule
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.KafkaTestUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import reactor.kafka.sender.SenderRecord

import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

class KafkaReactorForkedTest extends AgentTestRunner {
@Rule
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, 4, KafkaClientTest.SHARED_TOPIC)
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka


@Override
boolean useStrictTraceWrites() {
false
}

def setup() {
TEST_WRITER.setFilter(new ListWriter.Filter() {
@Override
boolean accept(List<DDSpan> trace) {
return !(trace.size() == 1 &&
trace.get(0).getResourceName().toString().equals("kafka.poll"))
}
})
}

def "test reactive produce and consume"() {
setup:
def senderProps = KafkaTestUtils.producerProps(embeddedKafka)
if (isDataStreamsEnabled()) {
senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000)
}

def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps))
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic())

final KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(ReceiverOptions.<String, String> create(consumerProperties)
.subscription([KafkaClientTest.SHARED_TOPIC])
.addAssignListener {
it.each { subscriptionReady.countDown() }
})

// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
kafkaReceiver.receive()
// publish on another thread to be sure we're propagating that receive span correctly
.publishOn(Schedulers.parallel())
.flatMap { receiverRecord ->
{
records.add(receiverRecord)
receiverRecord.receiverOffset().commit()
}
}.subscribe()


// wait until the container has the required number of assigned partitions
subscriptionReady.await()

when:
String greeting = "Hello Reactor Kafka Sender!"
runUnderTrace("parent") {
kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null)))
.doOnError { ex -> runUnderTrace("producer exception: " + ex) {} }
.doOnNext { runUnderTrace("producer callback") {} }
.blockFirst()
blockUntilChildSpansFinished(2)
}
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.value() == greeting
received.key() == null


assertTraces(2, SORT_TRACES_BY_START) {
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
producerSpan(it, senderProps, span(0))
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[2])
}
}
}

def "test reactive 100 msg produce and consume have only one parent"() {
setup:
def senderProps = KafkaTestUtils.producerProps(embeddedKafka)
if (isDataStreamsEnabled()) {
senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000)
}

def kafkaSender = KafkaSender.create(SenderOptions.create(senderProps))
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def subscriptionReady = new CountDownLatch(embeddedKafka.getPartitionsPerTopic())

final KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(ReceiverOptions.<String, String> create(consumerProperties)
.subscription([KafkaClientTest.SHARED_TOPIC])
.addAssignListener {
it.each { subscriptionReady.countDown() }
})

// create a thread safe queue to store the received message
kafkaReceiver.receive()
// publish on another thread to be sure we're propagating that receive span correctly
.publishOn(Schedulers.parallel())
.flatMap { receiverRecord ->
{
receiverRecord.receiverOffset().commit()
}
}
.subscribeOn(Schedulers.parallel())
.subscribe()


// wait until the container has the required number of assigned partitions
subscriptionReady.await()

when:
String greeting = "Hello Reactor Kafka Sender!"
Flux.range(0, 100)
.flatMap { kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(KafkaClientTest.SHARED_TOPIC, greeting), null))) }
.publishOn(Schedulers.parallel())
.subscribe()
then:
// check that the all the consume (100) and the send (100) are reported
TEST_WRITER.waitForTraces(200)
Map<String, List<DDSpan>> traces = TEST_WRITER.inject([:]) { map, entry ->
def key = entry.get(0).getTraceId().toString()
map[key] = (map[key] ?: []) + entry
return map
}
traces.values().each {
assert it.size() == 2
int produceIndex = 0
int consumeIndex = 1
if ("kafka.produce".contentEquals(it.get(1).getOperationName().toString())) {
produceIndex = 1
consumeIndex = 0
}
//assert that the consumer has the producer as parent and that the producer is root
assert it.get(consumeIndex).getParentId() == it.get(produceIndex).getSpanId()
assert it.get(produceIndex).getParentId() == 0
}
}

def producerSpan(
TraceAssert trace,
Map<String, ?> config,
DDSpan parentSpan = null) {
trace.span {
serviceName "kafka"
operationName "kafka.produce"
resourceName "Produce Topic $KafkaClientTest.SHARED_TOPIC"
spanType "queue"
errored false
measured true
if (parentSpan) {
childOf parentSpan
} else {
parent()
}
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
defaultTags()
}
}
}

def consumerSpan(
TraceAssert trace,
Map<String, Object> config,
DDSpan parentSpan = null) {
trace.span {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $KafkaClientTest.SHARED_TOPIC"
spanType "queue"
errored false
measured true
if (parentSpan) {
childOf parentSpan
} else {
parent()
}
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"$InstrumentationTags.PARTITION" { it >= 0 }
"$InstrumentationTags.OFFSET" { Integer }
"$InstrumentationTags.CONSUMER_GROUP" "sender"
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
defaultTags(true)
}
}
}
}
Loading

0 comments on commit 8e1ab2b

Please sign in to comment.