29
29
import java .util .Objects ;
30
30
import java .util .stream .Collectors ;
31
31
32
+ import io .opentelemetry .api .baggage .Baggage ;
33
+ import io .quarkiverse .kafkastreamsprocessor .propagation .KafkaTextMapSetter ;
32
34
import jakarta .annotation .Priority ;
33
35
import jakarta .decorator .Decorator ;
34
36
import jakarta .enterprise .context .Dependent ;
58
60
import io .quarkiverse .kafkastreamsprocessor .impl .configuration .TopologyConfigurationImpl ;
59
61
import io .quarkiverse .kafkastreamsprocessor .impl .protocol .KafkaStreamsProcessorHeaders ;
60
62
import io .quarkiverse .kafkastreamsprocessor .propagation .KafkaTextMapGetter ;
63
+ import io .quarkiverse .kafkastreamsprocessor .propagation .KafkaTextMapSetter ;
61
64
import lombok .extern .slf4j .Slf4j ;
62
65
63
66
/**
@@ -77,7 +80,10 @@ public class TracingDecorator extends AbstractProcessorDecorator {
77
80
* The {@link OpenTelemetry} configured by Quarkus
78
81
*/
79
82
private final OpenTelemetry openTelemetry ;
80
-
83
+ /**
84
+ * Injects Context into the Kafka headers of a message
85
+ */
86
+ private final KafkaTextMapSetter textMapSetter ;
81
87
/**
82
88
* Extracts Context from the Kafka headers of a message
83
89
*/
@@ -116,16 +122,20 @@ public class TracingDecorator extends AbstractProcessorDecorator {
116
122
* The TopologyConfiguration after customization.
117
123
*/
118
124
@ Inject
119
- public TracingDecorator (OpenTelemetry openTelemetry , KafkaTextMapGetter textMapGetter , Tracer tracer ,
125
+ public TracingDecorator (OpenTelemetry openTelemetry , KafkaTextMapGetter textMapGetter ,
126
+ KafkaTextMapSetter textMapSetter ,
127
+ Tracer tracer ,
120
128
TopologyConfigurationImpl configuration ) {
121
- this (openTelemetry , textMapGetter , tracer , configuration .getProcessorPayloadType ().getName (),
129
+ this (openTelemetry , textMapGetter , textMapSetter , tracer , configuration .getProcessorPayloadType ().getName (),
122
130
JsonFormat .printer ());
123
131
}
124
132
125
133
public TracingDecorator (OpenTelemetry openTelemetry , KafkaTextMapGetter textMapGetter ,
126
- Tracer tracer , String applicationName , JsonFormat .Printer jsonPrinter ) {
134
+ KafkaTextMapSetter textMapSetter , Tracer tracer , String applicationName ,
135
+ JsonFormat .Printer jsonPrinter ) {
127
136
this .openTelemetry = openTelemetry ;
128
137
this .textMapGetter = textMapGetter ;
138
+ this .textMapSetter = textMapSetter ;
129
139
this .tracer = tracer ;
130
140
this .applicationName = applicationName ;
131
141
this .jsonPrinter = jsonPrinter ;
@@ -157,7 +167,7 @@ public void process(Record record) {
157
167
SpanBuilder spanBuilder = tracer .spanBuilder (applicationName );
158
168
final TextMapPropagator propagator = openTelemetry .getPropagators ().getTextMapPropagator ();
159
169
Scope parentScope = null ;
160
-
170
+ Context extractedContext = null ;
161
171
try {
162
172
// going through all propagation field names defined in the OTel configuration
163
173
// we look if any of them has been set with a non-null value in the headers of the incoming message
@@ -167,7 +177,7 @@ public void process(Record record) {
167
177
.anyMatch (Objects ::nonNull )) {
168
178
// if that is the case, let's extract a Context initialized with the parent trace id, span id
169
179
// and baggage present as headers in the incoming message
170
- Context extractedContext = propagator .extract (Context .current (), record .headers (), textMapGetter );
180
+ extractedContext = propagator .extract (Context .current (), record .headers (), textMapGetter );
171
181
// use the context as parent span for the built span
172
182
spanBuilder .setParent (extractedContext );
173
183
// we clean the headers to avoid their propagation in any outgoing message (knowing that by
@@ -179,8 +189,12 @@ public void process(Record record) {
179
189
Span span = spanBuilder .startSpan ();
180
190
// baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span
181
191
// in opentelemetry) and actually lost as kafka headers are cleaned
182
- try (Scope ignored = span .makeCurrent ()) {
192
+ try (Scope ignored = (extractedContext != null )
193
+ ? Baggage .fromContext (extractedContext ).makeCurrent ()
194
+ : Scope .noop ();
195
+ Scope scope = span .makeCurrent ()) {
183
196
try {
197
+ propagator .inject (Context .current (), record .headers (), this .textMapSetter );
184
198
getDelegate ().process (record );
185
199
span .setStatus (StatusCode .OK );
186
200
} catch (KafkaException e ) {
0 commit comments