Skip to content

Commit 48f4ab9

Browse files
committed
opentracing-kafka-java: ConsumerApp implementation
Use brave-opentracing and a Zipkin tracer since the Jaeger tracer that can report Zipkin v2 spans isn't available yet.
1 parent 31b9d21 commit 48f4ab9

File tree

5 files changed

+218
-54
lines changed

5 files changed

+218
-54
lines changed

opentracing-kafka-java/consumer/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111

12-
<groupId>com.signalfx.tracing.examples</groupId>
1312
<artifactId>opentracing-kafka-java-consumer-example</artifactId>
1413
<packaging>jar</packaging>
1514
<version>1.0-SNAPSHOT</version>
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,141 @@
11
package com.signalfx.tracing.examples.opentracing.kafka;
22

3+
import java.io.Closeable;
4+
import java.util.Collections;
5+
import java.util.Properties;
6+
7+
import org.apache.kafka.clients.consumer.Consumer;
8+
import org.apache.kafka.clients.consumer.ConsumerConfig;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.apache.kafka.clients.consumer.ConsumerRecords;
11+
import org.apache.kafka.clients.consumer.KafkaConsumer;
12+
import org.apache.kafka.common.serialization.LongDeserializer;
13+
import org.apache.kafka.common.serialization.StringDeserializer;
14+
15+
import brave.Tracing;
16+
import brave.opentracing.BraveTracer;
17+
import brave.sampler.CountingSampler;
18+
import io.opentracing.Scope;
19+
import io.opentracing.Tracer;
20+
import io.opentracing.contrib.kafka.TracingKafkaConsumer;
21+
import io.opentracing.contrib.kafka.TracingKafkaUtils;
22+
import okhttp3.Request;
23+
import zipkin2.reporter.AsyncReporter;
24+
import zipkin2.reporter.okhttp3.OkHttpSender;
25+
326
public class ConsumerApp {
427

28+
private static final String NAME = "signalfx-opentracing-kafka-java-consumer-example";
29+
530
public static void main(String[] args) {
6-
String ingestUrl = System.getProperty("ingestUrl", "https://ingest.signalfx.com");
7-
String accessToken = System.getProperty("accessToken");
8-
String kafkaBrokers = System.getProperty("kafkaBrokers");
31+
// Here we instantiate our TracingHelper class and get the tracer from it. Normally this would
32+
// be done by your DI framework and the resulting tracer injected to each class that needs it.
33+
TracingHelper tracingHelper = new TracingHelper();
34+
Tracer tracer = tracingHelper.getTracer();
35+
36+
Consumer<Long, String> consumer = createKafkaConsumer(tracer);
937
String kafkaTopic = System.getProperty("kafkaTopic");
1038

11-
System.out.printf("Consumer: url=%s token=%s brokers=%s topic=%s%n",
12-
ingestUrl, accessToken, kafkaBrokers, kafkaTopic);
39+
System.out.printf("Subscribing to Kafka topic %s...%n", kafkaTopic);
40+
consumer.subscribe(Collections.singletonList(kafkaTopic));
41+
42+
System.out.printf("Polling for messages on %s (5s)...%n", kafkaTopic);
43+
ConsumerRecords<Long, String> records = consumer.poll(5000);
44+
if (!records.isEmpty()) {
45+
for (ConsumerRecord<Long, String> record : records) {
46+
try (Scope scope = tracer.buildSpan("consumer.handle_greeting")
47+
.asChildOf(TracingKafkaUtils.extractSpanContext(record.headers(), tracer))
48+
.startActive(true)) {
49+
String greeting = record.value();
50+
System.out.printf("Producer said '%s'%n", greeting);
51+
scope.span().setTag("greeting", greeting);
52+
}
53+
}
54+
} else {
55+
System.err.printf("No messages seen on Kafka topic %s!%n", kafkaTopic);
56+
}
57+
58+
tracingHelper.close();
59+
consumer.close();
60+
System.out.println("Done.");
61+
}
62+
63+
private static Consumer<Long, String> createKafkaConsumer(Tracer tracer) {
64+
Properties properties = new Properties();
65+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("kafkaBrokers"));
66+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
67+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
68+
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, NAME);
69+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, NAME);
70+
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
71+
return new TracingKafkaConsumer<>(new KafkaConsumer<>(properties), tracer);
72+
}
73+
74+
/**
75+
* A helper class that encapsulates all of the Brave objects that need to be created and cleaned up
76+
* upon shutdown. If you are using a DI framework, this logic would be used by that
77+
* to create a single instance of the tracer and inject it to every class that needs it. Be sure
78+
* to incorporate the logic in the close method to your DI framework's shutdown logic.
79+
*/
80+
private static class TracingHelper implements Closeable {
81+
82+
// We need to keep references to all of these components because they have to be closed upon
83+
// shutdown in a certain order to avoid losing spans.
84+
private AsyncReporter<zipkin2.Span> reporter;
85+
private Tracer tracer;
86+
private OkHttpSender sender;
87+
88+
TracingHelper() {
89+
// The ingest url is where the span data will be sent, which can normally just be the default
90+
// value of this property.
91+
String ingestUrl = System.getProperty("ingestUrl", "https://ingest.signalfx.com");
92+
// This would be your organization's SignalFx access token, accessed in whatever manner most
93+
// appropriate to your environment.
94+
String accessToken = System.getProperty("accessToken");
95+
96+
// Build the sender that does the HTTP request containing spans to our ingest server.
97+
OkHttpSender.Builder senderBuilder = OkHttpSender.newBuilder()
98+
.compressionEnabled(true)
99+
.endpoint(ingestUrl + "/v1/trace");
100+
101+
// Add an interceptor to inject the SignalFx X-SF-Token auth header.
102+
senderBuilder.clientBuilder().addInterceptor(chain -> {
103+
Request request = chain.request().newBuilder()
104+
.addHeader("X-SF-Token", accessToken)
105+
.build();
106+
return chain.proceed(request);
107+
});
108+
109+
this.sender = senderBuilder.build();
110+
this.reporter = AsyncReporter.create(sender);
111+
112+
// Create the Tracing instance from which we obtain the tracer instance
113+
this.tracer = BraveTracer.create(
114+
Tracing.newBuilder()
115+
// This sets the name of the local application and will be fairly prominent in the Zipkin UI.
116+
.localServiceName(NAME)
117+
.spanReporter(reporter)
118+
// Use a sampler that always reports spans. You can swap this out for other samplers.
119+
.sampler(CountingSampler.create(1.0f))
120+
.build());
121+
}
122+
123+
/**
124+
* Return the tracer instance from the Tracing object. This is what spans are created through.
125+
*/
126+
public Tracer getTracer() {
127+
return tracer;
128+
}
129+
130+
/**
131+
* This might be part of the shutdown logic if using a DI framework. It should be called one way
132+
* or another though.
133+
*/
134+
@Override
135+
public void close() {
136+
reporter.flush();
137+
reporter.close();
138+
sender.close();
139+
}
13140
}
14141
}

opentracing-kafka-java/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
<artifactId>zipkin-sender-okhttp3</artifactId>
2828
<version>2.6.0</version>
2929
</dependency>
30+
<dependency>
31+
<groupId>io.opentracing.brave</groupId>
32+
<artifactId>brave-opentracing</artifactId>
33+
<version>0.29.0</version>
34+
</dependency>
3035
<dependency>
3136
<groupId>io.opentracing.contrib</groupId>
3237
<artifactId>opentracing-kafka-client</artifactId>

opentracing-kafka-java/producer/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111

12-
<groupId>com.signalfx.tracing.examples</groupId>
1312
<artifactId>opentracing-kafka-java-producer-example</artifactId>
1413
<packaging>jar</packaging>
1514
<version>1.0-SNAPSHOT</version>
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.signalfx.tracing.examples.opentracing.kafka;
22

3+
import java.io.Closeable;
34
import java.util.Properties;
5+
import java.util.concurrent.CountDownLatch;
46

57
import org.apache.kafka.clients.producer.KafkaProducer;
68
import org.apache.kafka.clients.producer.Producer;
@@ -9,10 +11,12 @@
911
import org.apache.kafka.common.serialization.LongSerializer;
1012
import org.apache.kafka.common.serialization.StringSerializer;
1113

14+
import brave.Tracing;
15+
import brave.opentracing.BraveTracer;
16+
import brave.sampler.CountingSampler;
1217
import io.opentracing.Scope;
1318
import io.opentracing.Tracer;
1419
import io.opentracing.contrib.kafka.TracingKafkaProducer;
15-
import io.opentracing.util.GlobalTracer;
1620
import okhttp3.Request;
1721
import zipkin2.reporter.AsyncReporter;
1822
import zipkin2.reporter.okhttp3.OkHttpSender;
@@ -21,19 +25,18 @@ public class ProducerApp {
2125

2226
private static final String NAME = "signalfx-opentracing-kafka-java-producer-example";
2327

24-
public static void main(String[] args) {
25-
// Create a single instance of a Jaeger tracer that will be used throughout the application.
26-
// If you are using a DI framework, you should rely on that as much as possible to provide
27-
// this instance. Here we are defining the tracer as an OpenTracing tracer, since it implements
28-
// that interface. You should generally use the OpenTracing interface where possible to make
29-
// it potentially easier to swap out tracers in the future.
30-
Tracer tracer = createTracer();
28+
public static void main(String[] args) throws Exception {
29+
// Here we instantiate our TracingHelper class and get the tracer from it. Normally this would
30+
// be done by your DI framework and the resulting tracer injected to each class that needs it.
31+
TracingHelper tracingHelper = new TracingHelper();
32+
Tracer tracer = tracingHelper.getTracer();
3133

3234
Producer<Long, String> producer = createKafkaProducer(tracer);
3335

3436
String kafkaTopic = System.getProperty("kafkaTopic");
37+
CountDownLatch latch = new CountDownLatch(1);
3538

36-
try (Scope scope = tracer.buildSpan("root").startActive(true)) {
39+
try (Scope scope = tracer.buildSpan("producer.say_hi").startActive(true)) {
3740
System.out.printf("Sending message on Kafka topic %s...%n", kafkaTopic);
3841
producer.send(new ProducerRecord<>(kafkaTopic, 42L, "Hello, world!"), (r, e) -> {
3942
if (e != null) {
@@ -42,8 +45,13 @@ public static void main(String[] args) {
4245
} else {
4346
System.out.printf("Sent Kafka message on %s.%n", kafkaTopic);
4447
}
48+
latch.countDown();
4549
});
4650
}
51+
52+
latch.await();
53+
tracingHelper.close();
54+
System.out.println("Done.");
4755
}
4856

4957
/**
@@ -60,44 +68,70 @@ private static Producer<Long, String> createKafkaProducer(Tracer tracer) {
6068
}
6169

6270
/**
63-
* Create a Jaeger tracer instance that is configured to send span data to SignalFx. This is
64-
* intended to be called once. If you are using a DI framework, this logic would be used by
65-
* that to create a single instance of the tracer and inject it to every class that needs it.
71+
* A helper class that encapsulates all of the Brave objects that need to be created and cleaned up
72+
* upon shutdown. If you are using a DI framework, this logic would be used by that
73+
* to create a single instance of the tracer and inject it to every class that needs it. Be sure
74+
* to incorporate the logic in the close method to your DI framework's shutdown logic.
6675
*/
67-
private static Tracer createTracer() {
68-
String ingestUrl = System.getProperty("ingestUrl", "https://ingest.signalfx.com");
69-
String accessToken = System.getProperty("accessToken");
70-
71-
// Build the sender that does the HTTP request containing spans to our ingest server.
72-
OkHttpSender.Builder senderBuilder = OkHttpSender.newBuilder()
73-
.compressionEnabled(true)
74-
.endpoint(ingestUrl + "/v1/trace");
75-
76-
// Add an interceptor to inject the SignalFx X-SF-Token auth header.
77-
senderBuilder.clientBuilder().addInterceptor(chain -> {
78-
Request request = chain.request().newBuilder()
79-
.addHeader("X-SF-Token", accessToken)
80-
.build();
81-
82-
return chain.proceed(request);
83-
});
84-
85-
OkHttpSender sender = senderBuilder.build();
86-
87-
// Build the Jaeger Tracer instance, which implements the opentracing Tracer interface.
88-
io.opentracing.Tracer tracer = new io.jaegertracing.Tracer.Builder(NAME)
89-
// This configures the tracer to send all spans, but you will probably want to use
90-
// something less verbose.
91-
.withSampler(new ConstSampler(true))
92-
// Configure the tracer to send spans in the Zipkin V2 JSON format instead of the
93-
// default Jaeger protocol, which we do not support.
94-
.withReporter(new Zipkin2Reporter(AsyncReporter.create(sender)))
95-
.build();
96-
97-
// It is considered best practice to at least register the GlobalTracer instance, even if you
98-
// don't generally use it.
99-
GlobalTracer.register(tracer);
100-
101-
return tracer;
76+
private static class TracingHelper implements Closeable {
77+
78+
// We need to keep references to all of these components because they have to be closed upon
79+
// shutdown in a certain order to avoid losing spans.
80+
private AsyncReporter<zipkin2.Span> reporter;
81+
private Tracer tracer;
82+
private OkHttpSender sender;
83+
84+
TracingHelper() {
85+
// The ingest url is where the span data will be sent, which can normally just be the default
86+
// value of this property.
87+
String ingestUrl = System.getProperty("ingestUrl", "https://ingest.signalfx.com");
88+
// This would be your organization's SignalFx access token, accessed in whatever manner most
89+
// appropriate to your environment.
90+
String accessToken = System.getProperty("accessToken");
91+
92+
// Build the sender that does the HTTP request containing spans to our ingest server.
93+
OkHttpSender.Builder senderBuilder = OkHttpSender.newBuilder()
94+
.compressionEnabled(true)
95+
.endpoint(ingestUrl + "/v1/trace");
96+
97+
// Add an interceptor to inject the SignalFx X-SF-Token auth header.
98+
senderBuilder.clientBuilder().addInterceptor(chain -> {
99+
Request request = chain.request().newBuilder()
100+
.addHeader("X-SF-Token", accessToken)
101+
.build();
102+
return chain.proceed(request);
103+
});
104+
105+
this.sender = senderBuilder.build();
106+
this.reporter = AsyncReporter.create(sender);
107+
108+
// Create the Tracing instance from which we obtain the tracer instance
109+
this.tracer = BraveTracer.create(
110+
Tracing.newBuilder()
111+
// This sets the name of the local application and will be fairly prominent in the Zipkin UI.
112+
.localServiceName(NAME)
113+
.spanReporter(reporter)
114+
// Use a sampler that always reports spans. You can swap this out for other samplers.
115+
.sampler(CountingSampler.create(1.0f))
116+
.build());
117+
}
118+
119+
/**
120+
* Return the tracer instance from the Tracing object. This is what spans are created through.
121+
*/
122+
public Tracer getTracer() {
123+
return tracer;
124+
}
125+
126+
/**
127+
* This might be part of the shutdown logic if using a DI framework. It should be called one way
128+
* or another though.
129+
*/
130+
@Override
131+
public void close() {
132+
reporter.flush();
133+
reporter.close();
134+
sender.close();
135+
}
102136
}
103137
}

0 commit comments

Comments
 (0)