Skip to content

fix tracing decorator for span context and baggages #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import io.opentelemetry.api.baggage.Baggage;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.enterprise.context.Dependent;
Expand Down Expand Up @@ -58,6 +60,7 @@
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -77,7 +80,10 @@ public class TracingDecorator extends AbstractProcessorDecorator {
* The {@link OpenTelemetry} configured by Quarkus
*/
private final OpenTelemetry openTelemetry;

/**
* Injects Context into the Kafka headers of a message
*/
private final KafkaTextMapSetter textMapSetter;
/**
* Extracts Context from the Kafka headers of a message
*/
Expand Down Expand Up @@ -116,16 +122,20 @@ public class TracingDecorator extends AbstractProcessorDecorator {
* The TopologyConfiguration after customization.
*/
@Inject
public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer,
public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter,
KafkaTextMapSetter textMapSetter,
Tracer tracer,
TopologyConfigurationImpl configuration) {
this(openTelemetry, textMapGetter, tracer, configuration.getProcessorPayloadType().getName(),
this(openTelemetry, textMapGetter, textMapSetter, tracer, configuration.getProcessorPayloadType().getName(),
JsonFormat.printer());
}

public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter,
Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) {
KafkaTextMapSetter textMapSetter, Tracer tracer, String applicationName,
JsonFormat.Printer jsonPrinter) {
this.openTelemetry = openTelemetry;
this.textMapGetter = textMapGetter;
this.textMapSetter = textMapSetter;
this.tracer = tracer;
this.applicationName = applicationName;
this.jsonPrinter = jsonPrinter;
Expand Down Expand Up @@ -157,7 +167,7 @@ public void process(Record record) {
SpanBuilder spanBuilder = tracer.spanBuilder(applicationName);
final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
Scope parentScope = null;

Context extractedContext = null;
try {
// going through all propagation field names defined in the OTel configuration
// we look if any of them has been set with a non-null value in the headers of the incoming message
Expand All @@ -167,7 +177,7 @@ public void process(Record record) {
.anyMatch(Objects::nonNull)) {
// if that is the case, let's extract a Context initialized with the parent trace id, span id
// and baggage present as headers in the incoming message
Context extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter);
extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter);
// use the context as parent span for the built span
spanBuilder.setParent(extractedContext);
// we clean the headers to avoid their propagation in any outgoing message (knowing that by
Expand All @@ -179,8 +189,12 @@ public void process(Record record) {
Span span = spanBuilder.startSpan();
// baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span
// in opentelemetry) and actually lost as kafka headers are cleaned
try (Scope ignored = span.makeCurrent()) {
try (Scope ignored = (extractedContext != null)
? Baggage.fromContext(extractedContext).makeCurrent()
: Scope.noop();
Scope scope = span.makeCurrent()) {
try {
propagator.inject(Context.current(), record.headers(), this.textMapSetter);
getDelegate().process(record);
span.setStatus(StatusCode.OK);
} catch (KafkaException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void setUp() {
rootLogger.setLevel(Level.DEBUG);
when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class);
decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(),
jsonPrinter);
decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);
}
Expand All @@ -147,9 +148,9 @@ public void shouldSetMDCFromUberTraceId() {
try (Scope parentScope = parentSpan.makeCurrent()) {
Headers headers = new RecordHeaders();
otel.getOpenTelemetry()
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
Record<String, Ping> record = new Record<>(null, null, 0L, headers);

decorator.process(record);
Expand All @@ -166,13 +167,13 @@ public void shouldSetMDCFromUberTraceId() {
public void shouldStartAndFinishSpan() {
// manually build parent span to inject some TraceState and test the state is well recorded in the created span
Span parentSpan = Span.wrap(SpanContext.create(IdGenerator.random().generateTraceId(), IdGenerator.random()
.generateSpanId(), TraceFlags.getSampled(), TraceState.builder().put("state1", "value2").build()));
.generateSpanId(), TraceFlags.getSampled(), TraceState.builder().put("state1", "value2").build()));
try (Scope parentScope = parentSpan.makeCurrent()) {
RecordHeaders headers = new RecordHeaders();
otel.getOpenTelemetry()
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
Record<String, Ping> record = new Record<>(null, null, 0L, headers);

decorator.process(record);
Expand All @@ -181,12 +182,12 @@ public void shouldStartAndFinishSpan() {
}

assertThat(otel.getSpans())
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(
span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId())
.hasName(PROCESSOR_NAME)
.hasParentSpanId(parentSpan.getSpanContext().getSpanId())
.hasTraceState(TraceState.builder().put("state1", "value2").build())));
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(
span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId())
.hasName(PROCESSOR_NAME)
.hasParentSpanId(parentSpan.getSpanContext().getSpanId())
.hasTraceState(TraceState.builder().put("state1", "value2").build())));
}

@Test
Expand All @@ -195,15 +196,16 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
try (Scope parentScope = parentSpan.makeCurrent()) {
Headers headers = new RecordHeaders();
otel.getOpenTelemetry()
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), headers, kafkaTextMapSetter);
Record<String, Ping> record = new Record<>(null, Ping.newBuilder()
.setMessage("blabla")
.build(), 0L, headers);
.setMessage("blabla")
.build(), 0L, headers);

decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
kafkaTextMapSetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
decorator.setDelegate(new ThrowExceptionProcessor());
decorator.init(processorContext);

Expand All @@ -215,12 +217,12 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
assertNull(MDC.get("traceId"));

assertThat(otel.getSpans())
.hasTracesSatisfyingExactly(trace -> trace.hasSpansSatisfyingExactly(
span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()),
span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId())
.hasName(PROCESSOR_NAME)
.hasStatusSatisfying(status -> status.hasCode(StatusCode.ERROR))
.hasException(new TestException())));
.hasTracesSatisfyingExactly(trace -> trace.hasSpansSatisfyingExactly(
span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()),
span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId())
.hasName(PROCESSOR_NAME)
.hasStatusSatisfying(status -> status.hasCode(StatusCode.ERROR))
.hasException(new TestException())));
}

@Test
Expand Down Expand Up @@ -257,7 +259,7 @@ void shouldManageRuntimeException() throws Throwable {
decorator.process(new Record<>("key", inputMessage, 0L));

assertThat(getLogs(), hasItem(allOf(containsString("ERROR"),
containsString("Runtime error caught while processing the message"), containsString(exception.getMessage()))));
containsString("Runtime error caught while processing the message"), containsString(exception.getMessage()))));
assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("marshalled"))));
}

Expand All @@ -269,7 +271,7 @@ private static List<String> getLogs() {
void shouldLetBubbleUpKafkaExceptionAndLogMessage() {
doThrow(new KafkaException()).when(kafkaProcessor).process(any());
Assertions.assertThrows(KafkaException.class,
() -> decorator.process(new Record<>("key", inputMessage, 0L)));
() -> decorator.process(new Record<>("key", inputMessage, 0L)));
}

@Test
Expand All @@ -296,7 +298,7 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable {
decorator.process(new Record<>("key", inputMessage, 0L));

assertThat(getLogs(),
hasItem(allOf(containsString("ERROR"), containsString(protocolBufferException.getMessage()))));
hasItem(allOf(containsString("ERROR"), containsString(protocolBufferException.getMessage()))));
assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("value=null"))));
}

Expand All @@ -305,7 +307,8 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
Processor<String, String, String, String> kafkaProcessor = mock(Processor.class);
ProcessorContext<String, String> processorContext = mock(ProcessorContext.class);
TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(),
jsonPrinter);
decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);

Expand All @@ -322,10 +325,10 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
void shouldPropagateOpentelemetryW3CBaggage() {
// header value format here: https://www.w3.org/TR/baggage/#baggage-http-header-format
Headers headers = new RecordHeaders().add(W3C_TRACE_ID, TRACE_PARENT.getBytes())
.add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes());
.add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes());
Record<String, Ping> record = new Record<>(null, Ping.newBuilder().setMessage("blabla").build(), 0L, headers);
decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
decorator.setDelegate(new LogOpentelemetryBaggageProcessor());
decorator.init(processorContext);

Expand Down Expand Up @@ -363,7 +366,7 @@ public void process(Record<String, Ping> record) {

public static String w3cHeader(String traceId, String spanId) {
return String.format("00-%s-%s-01", StringUtils.leftPad(traceId, TraceId.getLength(), '0'),
StringUtils.leftPad(spanId, SpanId.getLength(), '0'));
StringUtils.leftPad(spanId, SpanId.getLength(), '0'));
}

public static class MockType {
Expand Down