101
101
public class TracingDecoratorTest {
102
102
// header value format here: https://www.w3.org/TR/trace-context/#traceparent-header
103
103
private static final String TRACE_PARENT = w3cHeader ("1" , "9" );
104
- private static final String PROCESSOR_NAME = MockType .class .getName ();
104
+ private static final String PROCESSOR_NAME = io . quarkiverse . kafkastreamsprocessor . impl . decorator . processor . TracingDecoratorTest . MockType .class .getName ();
105
105
private static final Formatter LOG_FORMATTER = new PatternFormatter ("%p %s %e" );
106
106
private static final InMemoryLogHandler inMemoryLogHandler = new InMemoryLogHandler (record -> true );
107
107
private static final java .util .logging .Logger rootLogger = LogManager .getLogManager ().getLogger ("io.quarkiverse" );
@@ -115,7 +115,7 @@ public class TracingDecoratorTest {
115
115
JsonFormat .Printer jsonPrinter ;
116
116
117
117
@ Spy
118
- ReadMDCProcessor kafkaProcessor ;
118
+ io . quarkiverse . kafkastreamsprocessor . impl . decorator . processor . TracingDecoratorTest . ReadMDCProcessor kafkaProcessor ;
119
119
120
120
@ Mock
121
121
InternalProcessorContext <String , Ping > processorContext ;
@@ -134,9 +134,10 @@ public void setUp() {
134
134
inMemoryLogHandler .getRecords ().clear ();
135
135
rootLogger .addHandler (inMemoryLogHandler );
136
136
rootLogger .setLevel (Level .DEBUG );
137
- when (topologyConfiguration .getProcessorPayloadType ()).thenReturn ((Class ) MockType .class );
137
+ when (topologyConfiguration .getProcessorPayloadType ()).thenReturn ((Class ) io . quarkiverse . kafkastreamsprocessor . impl . decorator . processor . TracingDecoratorTest . MockType .class );
138
138
decorator = new TracingDecorator (otel .getOpenTelemetry (), kafkaTextMapGetter ,
139
- tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
139
+ kafkaTextMapSetter , tracer , topologyConfiguration .getProcessorPayloadType ().getName (),
140
+ jsonPrinter );
140
141
decorator .setDelegate (kafkaProcessor );
141
142
decorator .init (processorContext );
142
143
}
@@ -147,9 +148,9 @@ public void shouldSetMDCFromUberTraceId() {
147
148
try (Scope parentScope = parentSpan .makeCurrent ()) {
148
149
Headers headers = new RecordHeaders ();
149
150
otel .getOpenTelemetry ()
150
- .getPropagators ()
151
- .getTextMapPropagator ()
152
- .inject (Context .current (), headers , kafkaTextMapSetter );
151
+ .getPropagators ()
152
+ .getTextMapPropagator ()
153
+ .inject (Context .current (), headers , kafkaTextMapSetter );
153
154
Record <String , Ping > record = new Record <>(null , null , 0L , headers );
154
155
155
156
decorator .process (record );
@@ -166,13 +167,13 @@ public void shouldSetMDCFromUberTraceId() {
166
167
public void shouldStartAndFinishSpan () {
167
168
// manually build parent span to inject some TraceState and test the state is well recorded in the created span
168
169
Span parentSpan = Span .wrap (SpanContext .create (IdGenerator .random ().generateTraceId (), IdGenerator .random ()
169
- .generateSpanId (), TraceFlags .getSampled (), TraceState .builder ().put ("state1" , "value2" ).build ()));
170
+ .generateSpanId (), TraceFlags .getSampled (), TraceState .builder ().put ("state1" , "value2" ).build ()));
170
171
try (Scope parentScope = parentSpan .makeCurrent ()) {
171
172
RecordHeaders headers = new RecordHeaders ();
172
173
otel .getOpenTelemetry ()
173
- .getPropagators ()
174
- .getTextMapPropagator ()
175
- .inject (Context .current (), headers , kafkaTextMapSetter );
174
+ .getPropagators ()
175
+ .getTextMapPropagator ()
176
+ .inject (Context .current (), headers , kafkaTextMapSetter );
176
177
Record <String , Ping > record = new Record <>(null , null , 0L , headers );
177
178
178
179
decorator .process (record );
@@ -181,12 +182,12 @@ public void shouldStartAndFinishSpan() {
181
182
}
182
183
183
184
assertThat (otel .getSpans ())
184
- .hasTracesSatisfyingExactly (
185
- trace -> trace .hasSpansSatisfyingExactly (
186
- span -> span .hasTraceId (parentSpan .getSpanContext ().getTraceId ())
187
- .hasName (PROCESSOR_NAME )
188
- .hasParentSpanId (parentSpan .getSpanContext ().getSpanId ())
189
- .hasTraceState (TraceState .builder ().put ("state1" , "value2" ).build ())));
185
+ .hasTracesSatisfyingExactly (
186
+ trace -> trace .hasSpansSatisfyingExactly (
187
+ span -> span .hasTraceId (parentSpan .getSpanContext ().getTraceId ())
188
+ .hasName (PROCESSOR_NAME )
189
+ .hasParentSpanId (parentSpan .getSpanContext ().getSpanId ())
190
+ .hasTraceState (TraceState .builder ().put ("state1" , "value2" ).build ())));
190
191
}
191
192
192
193
@ Test
@@ -195,16 +196,17 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
195
196
try (Scope parentScope = parentSpan .makeCurrent ()) {
196
197
Headers headers = new RecordHeaders ();
197
198
otel .getOpenTelemetry ()
198
- .getPropagators ()
199
- .getTextMapPropagator ()
200
- .inject (Context .current (), headers , kafkaTextMapSetter );
199
+ .getPropagators ()
200
+ .getTextMapPropagator ()
201
+ .inject (Context .current (), headers , kafkaTextMapSetter );
201
202
Record <String , Ping > record = new Record <>(null , Ping .newBuilder ()
202
- .setMessage ("blabla" )
203
- .build (), 0L , headers );
203
+ .setMessage ("blabla" )
204
+ .build (), 0L , headers );
204
205
205
206
decorator = new TracingDecorator (otel .getOpenTelemetry (), kafkaTextMapGetter ,
206
- tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
207
- decorator .setDelegate (new ThrowExceptionProcessor ());
207
+ kafkaTextMapSetter ,
208
+ tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
209
+ decorator .setDelegate (new io .quarkiverse .kafkastreamsprocessor .impl .decorator .processor .TracingDecoratorTest .ThrowExceptionProcessor ());
208
210
decorator .init (processorContext );
209
211
210
212
assertDoesNotThrow (() -> decorator .process (record ));
@@ -215,12 +217,12 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
215
217
assertNull (MDC .get ("traceId" ));
216
218
217
219
assertThat (otel .getSpans ())
218
- .hasTracesSatisfyingExactly (trace -> trace .hasSpansSatisfyingExactly (
219
- span -> span .hasSpanId (parentSpan .getSpanContext ().getSpanId ()),
220
- span -> span .hasTraceId (parentSpan .getSpanContext ().getTraceId ())
221
- .hasName (PROCESSOR_NAME )
222
- .hasStatusSatisfying (status -> status .hasCode (StatusCode .ERROR ))
223
- .hasException (new TestException ())));
220
+ .hasTracesSatisfyingExactly (trace -> trace .hasSpansSatisfyingExactly (
221
+ span -> span .hasSpanId (parentSpan .getSpanContext ().getSpanId ()),
222
+ span -> span .hasTraceId (parentSpan .getSpanContext ().getTraceId ())
223
+ .hasName (PROCESSOR_NAME )
224
+ .hasStatusSatisfying (status -> status .hasCode (StatusCode .ERROR ))
225
+ .hasException (new TestException ())));
224
226
}
225
227
226
228
@ Test
@@ -257,7 +259,7 @@ void shouldManageRuntimeException() throws Throwable {
257
259
decorator .process (new Record <>("key" , inputMessage , 0L ));
258
260
259
261
assertThat (getLogs (), hasItem (allOf (containsString ("ERROR" ),
260
- containsString ("Runtime error caught while processing the message" ), containsString (exception .getMessage ()))));
262
+ containsString ("Runtime error caught while processing the message" ), containsString (exception .getMessage ()))));
261
263
assertThat (getLogs (), hasItem (allOf (containsString ("DEBUG" ), containsString ("marshalled" ))));
262
264
}
263
265
@@ -269,7 +271,7 @@ private static List<String> getLogs() {
269
271
void shouldLetBubbleUpKafkaExceptionAndLogMessage () {
270
272
doThrow (new KafkaException ()).when (kafkaProcessor ).process (any ());
271
273
Assertions .assertThrows (KafkaException .class ,
272
- () -> decorator .process (new Record <>("key" , inputMessage , 0L )));
274
+ () -> decorator .process (new Record <>("key" , inputMessage , 0L )));
273
275
}
274
276
275
277
@ Test
@@ -296,7 +298,7 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable {
296
298
decorator .process (new Record <>("key" , inputMessage , 0L ));
297
299
298
300
assertThat (getLogs (),
299
- hasItem (allOf (containsString ("ERROR" ), containsString (protocolBufferException .getMessage ()))));
301
+ hasItem (allOf (containsString ("ERROR" ), containsString (protocolBufferException .getMessage ()))));
300
302
assertThat (getLogs (), hasItem (allOf (containsString ("DEBUG" ), containsString ("value=null" ))));
301
303
}
302
304
@@ -305,7 +307,8 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
305
307
Processor <String , String , String , String > kafkaProcessor = mock (Processor .class );
306
308
ProcessorContext <String , String > processorContext = mock (ProcessorContext .class );
307
309
TracingDecorator decorator = new TracingDecorator (GlobalOpenTelemetry .get (), kafkaTextMapGetter ,
308
- tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
310
+ kafkaTextMapSetter , tracer , topologyConfiguration .getProcessorPayloadType ().getName (),
311
+ jsonPrinter );
309
312
decorator .setDelegate (kafkaProcessor );
310
313
decorator .init (processorContext );
311
314
@@ -322,11 +325,11 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
322
325
void shouldPropagateOpentelemetryW3CBaggage () {
323
326
// header value format here: https://www.w3.org/TR/baggage/#baggage-http-header-format
324
327
Headers headers = new RecordHeaders ().add (W3C_TRACE_ID , TRACE_PARENT .getBytes ())
325
- .add (W3C_BAGGAGE , "key1=value1,key2=value2" .getBytes ());
328
+ .add (W3C_BAGGAGE , "key1=value1,key2=value2" .getBytes ());
326
329
Record <String , Ping > record = new Record <>(null , Ping .newBuilder ().setMessage ("blabla" ).build (), 0L , headers );
327
- decorator = new TracingDecorator (otel .getOpenTelemetry (), kafkaTextMapGetter ,
328
- tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
329
- decorator .setDelegate (new LogOpentelemetryBaggageProcessor ());
330
+ decorator = new TracingDecorator (otel .getOpenTelemetry (), kafkaTextMapGetter , kafkaTextMapSetter ,
331
+ tracer , topologyConfiguration .getProcessorPayloadType ().getName (), jsonPrinter );
332
+ decorator .setDelegate (new io . quarkiverse . kafkastreamsprocessor . impl . decorator . processor . TracingDecoratorTest . LogOpentelemetryBaggageProcessor ());
330
333
decorator .init (processorContext );
331
334
332
335
decorator .process (record );
@@ -363,7 +366,7 @@ public void process(Record<String, Ping> record) {
363
366
364
367
public static String w3cHeader (String traceId , String spanId ) {
365
368
return String .format ("00-%s-%s-01" , StringUtils .leftPad (traceId , TraceId .getLength (), '0' ),
366
- StringUtils .leftPad (spanId , SpanId .getLength (), '0' ));
369
+ StringUtils .leftPad (spanId , SpanId .getLength (), '0' ));
367
370
}
368
371
369
372
public static class MockType {
0 commit comments