|
1 | 1 | package com.signalfx.tracing.examples.opentracing.kafka; |
2 | 2 |
|
| 3 | +import java.util.Properties; |
| 4 | + |
| 5 | +import org.apache.kafka.clients.producer.KafkaProducer; |
| 6 | +import org.apache.kafka.clients.producer.Producer; |
| 7 | +import org.apache.kafka.clients.producer.ProducerConfig; |
| 8 | +import org.apache.kafka.clients.producer.ProducerRecord; |
| 9 | +import org.apache.kafka.common.serialization.LongSerializer; |
| 10 | +import org.apache.kafka.common.serialization.StringSerializer; |
| 11 | + |
| 12 | +import io.opentracing.Scope; |
| 13 | +import io.opentracing.Tracer; |
| 14 | +import io.opentracing.contrib.kafka.TracingKafkaProducer; |
| 15 | +import io.opentracing.util.GlobalTracer; |
| 16 | +import okhttp3.Request; |
| 17 | +import zipkin2.reporter.AsyncReporter; |
| 18 | +import zipkin2.reporter.okhttp3.OkHttpSender; |
| 19 | + |
3 | 20 | public class ProducerApp { |
4 | 21 |
|
| 22 | + private static final String NAME = "signalfx-opentracing-kafka-java-producer-example"; |
| 23 | + |
5 | 24 | public static void main(String[] args) { |
| 25 | + // Create a single instance of a Jaeger tracer that will be used throughout the application. |
| 26 | + // If you are using a DI framework, you should rely on that as much as possible to provide |
| 27 | + // this instance. Here we are defining the tracer as an OpenTracing tracer, since it implements |
| 28 | + // that interface. You should generally use the OpenTracing interface where possible to make |
| 29 | + // it potentially easier to swap out tracers in the future. |
| 30 | + Tracer tracer = createTracer(); |
| 31 | + |
| 32 | + Producer<Long, String> producer = createKafkaProducer(tracer); |
| 33 | + |
| 34 | + String kafkaTopic = System.getProperty("kafkaTopic"); |
| 35 | + |
| 36 | + try (Scope scope = tracer.buildSpan("root").startActive(true)) { |
| 37 | + System.out.printf("Sending message on Kafka topic %s...%n", kafkaTopic); |
| 38 | + producer.send(new ProducerRecord<>(kafkaTopic, 42L, "Hello, world!"), (r, e) -> { |
| 39 | + if (e != null) { |
| 40 | + System.err.printf("Failed to send Kafka message on %s: %s%n", kafkaTopic, e |
| 41 | + .getMessage()); |
| 42 | + } else { |
| 43 | + System.out.printf("Sent Kafka message on %s.%n", kafkaTopic); |
| 44 | + } |
| 45 | + }); |
| 46 | + } |
| 47 | + } |
| 48 | + |
| 49 | + /** |
| 50 | + * Create a Kafka producer that is configured to send to the brokers specified in the |
| 51 | + * kafkaBrokers system property. |
| 52 | + */ |
| 53 | + private static Producer<Long, String> createKafkaProducer(Tracer tracer) { |
| 54 | + Properties properties = new Properties(); |
| 55 | + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("kafkaBrokers")); |
| 56 | + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); |
| 57 | + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
| 58 | + properties.put(ProducerConfig.CLIENT_ID_CONFIG, NAME); |
| 59 | + return new TracingKafkaProducer<>(new KafkaProducer<>(properties), tracer); |
| 60 | + } |
| 61 | + |
| 62 | + /** |
| 63 | + * Create a Jaeger tracer instance that is configured to send span data to SignalFx. This is |
| 64 | + * intended to be called once. If you are using a DI framework, this logic would be used by |
| 65 | + * that to create a single instance of the tracer and inject it to every class that needs it. |
| 66 | + */ |
| 67 | + private static Tracer createTracer() { |
6 | 68 | String ingestUrl = System.getProperty("ingestUrl", "https://ingest.signalfx.com"); |
7 | 69 | String accessToken = System.getProperty("accessToken"); |
8 | | - String kafkaBrokers = System.getProperty("kafkaBrokers"); |
9 | | - String kafkaTopic = System.getProperty("kafkaTopic"); |
10 | 70 |
|
11 | | - System.out.printf("Producer: url=%s token=%s brokers=%s topic=%s%n", |
12 | | - ingestUrl, accessToken, kafkaBrokers, kafkaTopic); |
| 71 | + // Build the sender that does the HTTP request containing spans to our ingest server. |
| 72 | + OkHttpSender.Builder senderBuilder = OkHttpSender.newBuilder() |
| 73 | + .compressionEnabled(true) |
| 74 | + .endpoint(ingestUrl + "/v1/trace"); |
| 75 | + |
| 76 | + // Add an interceptor to inject the SignalFx X-SF-Token auth header. |
| 77 | + senderBuilder.clientBuilder().addInterceptor(chain -> { |
| 78 | + Request request = chain.request().newBuilder() |
| 79 | + .addHeader("X-SF-Token", accessToken) |
| 80 | + .build(); |
| 81 | + |
| 82 | + return chain.proceed(request); |
| 83 | + }); |
| 84 | + |
| 85 | + OkHttpSender sender = senderBuilder.build(); |
| 86 | + |
| 87 | + // Build the Jaeger Tracer instance, which implements the opentracing Tracer interface. |
| 88 | + io.opentracing.Tracer tracer = new io.jaegertracing.Tracer.Builder(NAME) |
| 89 | + // This configures the tracer to send all spans, but you will probably want to use |
| 90 | + // something less verbose. |
| 91 | + .withSampler(new ConstSampler(true)) |
| 92 | + // Configure the tracer to send spans in the Zipkin V2 JSON format instead of the |
| 93 | + // default Jaeger protocol, which we do not support. |
| 94 | + .withReporter(new Zipkin2Reporter(AsyncReporter.create(sender))) |
| 95 | + .build(); |
| 96 | + |
| 97 | + // It is considered best practice to at least register the GlobalTracer instance, even if you |
| 98 | + // don't generally use it. |
| 99 | + GlobalTracer.register(tracer); |
| 100 | + |
| 101 | + return tracer; |
13 | 102 | } |
14 | 103 | } |
0 commit comments