Skip to content

Commit 718fa0d

Browse files
Mateusz Rzeszutektrask
Mateusz Rzeszutek
andauthored
Implement HTTP resend spec for Reactor Netty (excl CONNECT spans) (open-telemetry#8111)
Co-authored-by: Trask Stalnaker <[email protected]>
1 parent bd8ddf4 commit 718fa0d

File tree

15 files changed

+285
-252
lines changed

15 files changed

+285
-252
lines changed

instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,19 @@ public void accept(M message, Throwable throwable) {
7474
@Nullable
7575
private static Context getChannelContext(
7676
ContextView contextView, PropagatedContext propagatedContext) {
77+
78+
InstrumentationContexts contexts =
79+
contextView.getOrDefault(ReactorContextKeys.CONTEXTS_HOLDER_KEY, null);
80+
if (contexts == null) {
81+
return null;
82+
}
83+
7784
Context context = null;
7885
if (propagatedContext.useClientContext) {
79-
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
86+
context = contexts.getClientContext();
8087
}
8188
if (context == null) {
82-
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
89+
context = contexts.getParentContext();
8390
}
8491
return context;
8592
}

instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java

+59-108
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,17 @@
55

66
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
77

8-
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY;
9-
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY;
10-
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
8+
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CONTEXTS_HOLDER_KEY;
119

12-
import io.opentelemetry.api.GlobalOpenTelemetry;
1310
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
1412
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
15-
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
16-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
1713
import java.util.function.BiConsumer;
1814
import java.util.function.Function;
1915
import javax.annotation.Nullable;
2016
import reactor.core.publisher.Mono;
2117
import reactor.netty.Connection;
2218
import reactor.netty.http.client.HttpClient;
23-
import reactor.netty.http.client.HttpClientConfig;
2419
import reactor.netty.http.client.HttpClientRequest;
2520
import reactor.netty.http.client.HttpClientResponse;
2621

@@ -29,25 +24,26 @@ public final class HttpResponseReceiverInstrumenter {
2924
// this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP
3025
// request processing
3126
// it should be used just before one of the response*() methods is called - after this point the
32-
// HTTP
33-
// request is no longer modifiable by the user
27+
// HTTP request is no longer modifiable by the user
3428
@Nullable
3529
public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseReceiver<?> receiver) {
3630
// receiver should always be an HttpClientFinalizer, which both extends HttpClient and
3731
// implements ResponseReceiver
3832
if (receiver instanceof HttpClient) {
3933
HttpClient client = (HttpClient) receiver;
40-
HttpClientConfig config = client.configuration();
4134

42-
ContextHolder contextHolder = new ContextHolder();
35+
InstrumentationContexts instrumentationContexts = new InstrumentationContexts();
4336

4437
HttpClient modified =
4538
client
46-
.mapConnect(new StartOperation(contextHolder, config))
47-
.doOnRequest(new PropagateContext(contextHolder))
48-
.doOnRequestError(new EndOperationWithRequestError(contextHolder, config))
49-
.doOnResponseError(new EndOperationWithResponseError(contextHolder, config))
50-
.doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config));
39+
.mapConnect(new CaptureParentContext(instrumentationContexts))
40+
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts))
41+
.doOnRequest(new StartOperation(instrumentationContexts))
42+
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
43+
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
44+
// end the current span on redirects; StartOperation will start another one for the
45+
// next resend
46+
.doOnRedirect(new EndOperationWithSuccess(instrumentationContexts));
5147

5248
// modified should always be an HttpClientFinalizer too
5349
if (modified instanceof HttpClient.ResponseReceiver) {
@@ -58,151 +54,106 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
5854
return null;
5955
}
6056

61-
static final class ContextHolder {
62-
63-
private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
64-
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");
65-
66-
volatile Context parentContext;
67-
volatile Context context;
68-
69-
void setContext(Context context) {
70-
contextUpdater.set(this, context);
71-
}
72-
73-
Context getAndRemoveContext() {
74-
return contextUpdater.getAndSet(this, null);
75-
}
76-
}
77-
78-
static final class StartOperation
57+
private static final class CaptureParentContext
7958
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {
8059

81-
private final ContextHolder contextHolder;
82-
private final HttpClientConfig config;
60+
private final InstrumentationContexts instrumentationContexts;
8361

84-
StartOperation(ContextHolder contextHolder, HttpClientConfig config) {
85-
this.contextHolder = contextHolder;
86-
this.config = config;
62+
CaptureParentContext(InstrumentationContexts instrumentationContexts) {
63+
this.instrumentationContexts = instrumentationContexts;
8764
}
8865

8966
@Override
9067
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
9168
return Mono.defer(
9269
() -> {
9370
Context parentContext = Context.current();
94-
contextHolder.parentContext = parentContext;
95-
if (!instrumenter().shouldStart(parentContext, config)) {
96-
// make context accessible via the reactor ContextView - the doOn* callbacks
97-
// instrumentation uses this to set the proper context for callbacks
98-
return mono.contextWrite(
99-
ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
100-
}
101-
102-
Context context = instrumenter().start(parentContext, config);
103-
contextHolder.setContext(context);
104-
return ContextPropagationOperator.runWithContext(mono, context)
105-
// make contexts accessible via the reactor ContextView - the doOn* callbacks
106-
// instrumentation uses the parent context to set the proper context for
107-
// callbacks
108-
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
109-
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
71+
instrumentationContexts.initialize(parentContext);
72+
// make contexts accessible via the reactor ContextView - the doOn* callbacks
73+
// instrumentation uses this to set the proper context for callbacks
74+
return mono.contextWrite(
75+
ctx -> ctx.put(CONTEXTS_HOLDER_KEY, instrumentationContexts));
11076
})
111-
.doOnCancel(
112-
() -> {
113-
Context context = contextHolder.getAndRemoveContext();
114-
if (context == null) {
115-
return;
116-
}
117-
instrumenter().end(context, config, null, null);
118-
});
77+
// if there's still any span in flight, end it
78+
.doOnCancel(() -> instrumentationContexts.endClientSpan(null, null));
11979
}
12080
}
12181

122-
static final class PropagateContext implements BiConsumer<HttpClientRequest, Connection> {
82+
private static final class StartOperation implements BiConsumer<HttpClientRequest, Connection> {
12383

124-
private final ContextHolder contextHolder;
84+
private final InstrumentationContexts instrumentationContexts;
12585

126-
PropagateContext(ContextHolder contextHolder) {
127-
this.contextHolder = contextHolder;
86+
StartOperation(InstrumentationContexts instrumentationContexts) {
87+
this.instrumentationContexts = instrumentationContexts;
12888
}
12989

13090
@Override
131-
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
132-
Context context = contextHolder.context;
133-
if (context != null) {
134-
GlobalOpenTelemetry.getPropagators()
135-
.getTextMapPropagator()
136-
.inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE);
137-
}
91+
public void accept(HttpClientRequest request, Connection connection) {
92+
Context context = instrumentationContexts.startClientSpan(request);
13893

13994
// also propagate the context to the underlying netty instrumentation
14095
// if this span was suppressed and context is null, propagate parentContext - this will allow
14196
// netty spans to be suppressed too
142-
Context nettyParentContext = context == null ? contextHolder.parentContext : context;
97+
Context nettyParentContext =
98+
context == null ? instrumentationContexts.getParentContext() : context;
14399
NettyClientTelemetry.setChannelContext(connection.channel(), nettyParentContext);
144100
}
145101
}
146102

147-
static final class EndOperationWithRequestError
103+
private static final class EndOperationWithRequestError
148104
implements BiConsumer<HttpClientRequest, Throwable> {
149105

150-
private final ContextHolder contextHolder;
151-
private final HttpClientConfig config;
106+
private final InstrumentationContexts instrumentationContexts;
152107

153-
EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) {
154-
this.contextHolder = contextHolder;
155-
this.config = config;
108+
EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) {
109+
this.instrumentationContexts = instrumentationContexts;
156110
}
157111

158112
@Override
159-
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
160-
Context context = contextHolder.getAndRemoveContext();
161-
if (context == null) {
162-
return;
113+
public void accept(HttpClientRequest request, Throwable error) {
114+
instrumentationContexts.endClientSpan(null, error);
115+
116+
if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
117+
// TODO: emit connection error span
118+
119+
// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter
120+
// currently it also creates that connection error span (when the connection telemetry is
121+
// turned off), but without HTTP semantics - it does not have access to any HTTP information
122+
// after all
123+
// it should be possible to completely disable it, and just start and end the span here
124+
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
125+
// I'll do that in a separate PR - for better readability
163126
}
164-
instrumenter().end(context, config, null, error);
165127
}
166128
}
167129

168-
static final class EndOperationWithResponseError
130+
private static final class EndOperationWithResponseError
169131
implements BiConsumer<HttpClientResponse, Throwable> {
170132

171-
private final ContextHolder contextHolder;
172-
private final HttpClientConfig config;
133+
private final InstrumentationContexts instrumentationContexts;
173134

174-
EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) {
175-
this.contextHolder = contextHolder;
176-
this.config = config;
135+
EndOperationWithResponseError(InstrumentationContexts instrumentationContexts) {
136+
this.instrumentationContexts = instrumentationContexts;
177137
}
178138

179139
@Override
180140
public void accept(HttpClientResponse response, Throwable error) {
181-
Context context = contextHolder.getAndRemoveContext();
182-
if (context == null) {
183-
return;
184-
}
185-
instrumenter().end(context, config, response, error);
141+
instrumentationContexts.endClientSpan(response, error);
186142
}
187143
}
188144

189-
static final class EndOperationWithSuccess implements BiConsumer<HttpClientResponse, Connection> {
145+
private static final class EndOperationWithSuccess
146+
implements BiConsumer<HttpClientResponse, Connection> {
190147

191-
private final ContextHolder contextHolder;
192-
private final HttpClientConfig config;
148+
private final InstrumentationContexts instrumentationContexts;
193149

194-
EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) {
195-
this.contextHolder = contextHolder;
196-
this.config = config;
150+
EndOperationWithSuccess(InstrumentationContexts instrumentationContexts) {
151+
this.instrumentationContexts = instrumentationContexts;
197152
}
198153

199154
@Override
200155
public void accept(HttpClientResponse response, Connection connection) {
201-
Context context = contextHolder.getAndRemoveContext();
202-
if (context == null) {
203-
return;
204-
}
205-
instrumenter().end(context, config, response, null);
156+
instrumentationContexts.endClientSpan(response, null);
206157
}
207158
}
208159

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
9+
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
12+
import java.util.Queue;
13+
import java.util.concurrent.ArrayBlockingQueue;
14+
import java.util.logging.Level;
15+
import java.util.logging.Logger;
16+
import javax.annotation.Nullable;
17+
import reactor.netty.http.client.HttpClientRequest;
18+
import reactor.netty.http.client.HttpClientResponse;
19+
20+
final class InstrumentationContexts {
21+
22+
private static final Logger logger = Logger.getLogger(InstrumentationContexts.class.getName());
23+
24+
private volatile Context parentContext;
25+
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
26+
// it calls the callback functions in that order); thus for a short moment there can be 2
27+
// coexisting HTTP client spans
28+
private final Queue<RequestAndContext> clientContexts = new ArrayBlockingQueue<>(2, true);
29+
30+
void initialize(Context parentContext) {
31+
this.parentContext = HttpClientResend.initialize(parentContext);
32+
}
33+
34+
Context getParentContext() {
35+
return parentContext;
36+
}
37+
38+
@Nullable
39+
Context getClientContext() {
40+
RequestAndContext requestAndContext = clientContexts.peek();
41+
return requestAndContext == null ? null : requestAndContext.context;
42+
}
43+
44+
@Nullable
45+
Context startClientSpan(HttpClientRequest request) {
46+
Context parentContext = this.parentContext;
47+
Context context = null;
48+
if (instrumenter().shouldStart(parentContext, request)) {
49+
context = instrumenter().start(parentContext, request);
50+
if (!clientContexts.offer(new RequestAndContext(request, context))) {
51+
// should not ever happen in reality
52+
String message =
53+
"Could not instrument HTTP client request; not enough space in the request queue";
54+
logger.log(Level.FINE, message);
55+
instrumenter().end(context, request, null, new IllegalStateException(message));
56+
}
57+
}
58+
return context;
59+
}
60+
61+
void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
62+
RequestAndContext requestAndContext = clientContexts.poll();
63+
if (requestAndContext != null) {
64+
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
65+
}
66+
}
67+
68+
static final class RequestAndContext {
69+
final HttpClientRequest request;
70+
final Context context;
71+
72+
RequestAndContext(HttpClientRequest request, Context context) {
73+
this.request = request;
74+
this.context = context;
75+
}
76+
}
77+
}

instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@
77

88
public final class ReactorContextKeys {
99

10-
public static final String CLIENT_PARENT_CONTEXT_KEY =
11-
ReactorContextKeys.class.getName() + ".client-parent-context";
12-
public static final String CLIENT_CONTEXT_KEY =
13-
ReactorContextKeys.class.getName() + ".client-context";
10+
public static final String CONTEXTS_HOLDER_KEY =
11+
ReactorContextKeys.class.getName() + ".contexts-holder";
1412

1513
private ReactorContextKeys() {}
1614
}

0 commit comments

Comments
 (0)