Skip to content

Commit b593b35

Browse files
authored
[OpenTracing] Wrap returned promises in OutboundCallsInterceptor (#2366)
This change allows the OpenTracing interceptor to transfer any active trace/span from the point of invocation to the point of promise callback execution, thus preserving the span within the callback. Without this, spans become disjoint across Promise.thenCompose, due to the nature of the deferring the execution of the .thenCompose to a callback thread. Fixes #1962 Signed-off-by: Greg Haskins <[email protected]>
1 parent 35e390e commit b593b35

File tree

2 files changed

+262
-5
lines changed

2 files changed

+262
-5
lines changed

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java

+95-5
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,91 @@
2727
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
2828
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase;
2929
import io.temporal.opentracing.OpenTracingOptions;
30+
import io.temporal.workflow.Functions;
31+
import io.temporal.workflow.Promise;
3032
import io.temporal.workflow.Workflow;
3133
import io.temporal.workflow.WorkflowInfo;
3234
import io.temporal.workflow.unsafe.WorkflowUnsafe;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.TimeoutException;
3337

3438
public class OpenTracingWorkflowOutboundCallsInterceptor
3539
extends WorkflowOutboundCallsInterceptorBase {
3640
private final SpanFactory spanFactory;
3741
private final Tracer tracer;
3842
private final ContextAccessor contextAccessor;
3943

44+
private class PromiseWrapper<R> implements Promise<R> {
45+
private final Span capturedSpan;
46+
private final Promise<R> delegate;
47+
48+
PromiseWrapper(Span capturedSpan, Promise<R> delegate) {
49+
this.capturedSpan = capturedSpan;
50+
this.delegate = delegate;
51+
}
52+
53+
private <O> O wrap(Functions.Func<O> fn) {
54+
Span activeSpan = tracer.scopeManager().activeSpan();
55+
if (activeSpan == null && capturedSpan != null) {
56+
try (Scope ignored = tracer.scopeManager().activate(capturedSpan)) {
57+
return fn.apply();
58+
}
59+
} else {
60+
return fn.apply();
61+
}
62+
}
63+
64+
@Override
65+
public boolean isCompleted() {
66+
return delegate.isCompleted();
67+
}
68+
69+
@Override
70+
public R get() {
71+
return delegate.get();
72+
}
73+
74+
@Override
75+
public R cancellableGet() {
76+
return delegate.cancellableGet();
77+
}
78+
79+
@Override
80+
public R get(long timeout, TimeUnit unit) throws TimeoutException {
81+
return delegate.get(timeout, unit);
82+
}
83+
84+
@Override
85+
public R cancellableGet(long timeout, TimeUnit unit) throws TimeoutException {
86+
return delegate.cancellableGet(timeout, unit);
87+
}
88+
89+
@Override
90+
public RuntimeException getFailure() {
91+
return delegate.getFailure();
92+
}
93+
94+
@Override
95+
public <U> Promise<U> thenApply(Functions.Func1<? super R, ? extends U> fn) {
96+
return delegate.thenApply((r) -> wrap(() -> fn.apply(r)));
97+
}
98+
99+
@Override
100+
public <U> Promise<U> handle(Functions.Func2<? super R, RuntimeException, ? extends U> fn) {
101+
return delegate.handle((r, e) -> wrap(() -> fn.apply(r, e)));
102+
}
103+
104+
@Override
105+
public <U> Promise<U> thenCompose(Functions.Func1<? super R, ? extends Promise<U>> fn) {
106+
return delegate.thenCompose((r) -> wrap(() -> fn.apply(r)));
107+
}
108+
109+
@Override
110+
public Promise<R> exceptionally(Functions.Func1<Throwable, ? extends R> fn) {
111+
return delegate.exceptionally((t) -> wrap(() -> fn.apply(t)));
112+
}
113+
}
114+
40115
public OpenTracingWorkflowOutboundCallsInterceptor(
41116
WorkflowOutboundCallsInterceptor next,
42117
OpenTracingOptions options,
@@ -51,13 +126,16 @@ public OpenTracingWorkflowOutboundCallsInterceptor(
51126
@Override
52127
public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
53128
if (!WorkflowUnsafe.isReplaying()) {
129+
Span capturedSpan = tracer.scopeManager().activeSpan();
54130
Span activityStartSpan =
55131
contextAccessor.writeSpanContextToHeader(
56132
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
57133
input.getHeader(),
58134
tracer);
59135
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
60-
return super.executeActivity(input);
136+
ActivityOutput<R> output = super.executeActivity(input);
137+
return new ActivityOutput<>(
138+
output.getActivityId(), new PromiseWrapper<>(capturedSpan, output.getResult()));
61139
} finally {
62140
activityStartSpan.finish();
63141
}
@@ -69,13 +147,15 @@ public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
69147
@Override
70148
public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
71149
if (!WorkflowUnsafe.isReplaying()) {
150+
Span capturedSpan = tracer.scopeManager().activeSpan();
72151
Span activityStartSpan =
73152
contextAccessor.writeSpanContextToHeader(
74153
() -> createActivityStartSpanBuilder(input.getActivityName()).start(),
75154
input.getHeader(),
76155
tracer);
77156
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
78-
return super.executeLocalActivity(input);
157+
LocalActivityOutput<R> output = super.executeLocalActivity(input);
158+
return new LocalActivityOutput<>(new PromiseWrapper<>(capturedSpan, output.getResult()));
79159
} finally {
80160
activityStartSpan.finish();
81161
}
@@ -87,11 +167,15 @@ public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> inp
87167
@Override
88168
public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
89169
if (!WorkflowUnsafe.isReplaying()) {
170+
Span capturedSpan = tracer.scopeManager().activeSpan();
90171
Span childWorkflowStartSpan =
91172
contextAccessor.writeSpanContextToHeader(
92173
() -> createChildWorkflowStartSpanBuilder(input).start(), input.getHeader(), tracer);
93174
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
94-
return super.executeChildWorkflow(input);
175+
ChildWorkflowOutput<R> output = super.executeChildWorkflow(input);
176+
return new ChildWorkflowOutput<>(
177+
new PromiseWrapper<>(capturedSpan, output.getResult()),
178+
new PromiseWrapper<>(capturedSpan, output.getWorkflowExecution()));
95179
} finally {
96180
childWorkflowStartSpan.finish();
97181
}
@@ -104,13 +188,17 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
104188
public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
105189
ExecuteNexusOperationInput<R> input) {
106190
if (!WorkflowUnsafe.isReplaying()) {
191+
Span capturedSpan = tracer.scopeManager().activeSpan();
107192
Span nexusOperationExecuteSpan =
108193
contextAccessor.writeSpanContextToHeader(
109194
() -> createStartNexusOperationSpanBuilder(input).start(),
110195
input.getHeaders(),
111196
tracer);
112197
try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) {
113-
return super.executeNexusOperation(input);
198+
ExecuteNexusOperationOutput<R> output = super.executeNexusOperation(input);
199+
return new ExecuteNexusOperationOutput<>(
200+
new PromiseWrapper<>(capturedSpan, output.getResult()),
201+
new PromiseWrapper<>(capturedSpan, output.getOperationExecution()));
114202
} finally {
115203
nexusOperationExecuteSpan.finish();
116204
}
@@ -122,6 +210,7 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
122210
@Override
123211
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
124212
if (!WorkflowUnsafe.isReplaying()) {
213+
Span capturedSpan = tracer.scopeManager().activeSpan();
125214
WorkflowInfo workflowInfo = Workflow.getInfo();
126215
Span childWorkflowStartSpan =
127216
contextAccessor.writeSpanContextToHeader(
@@ -136,7 +225,8 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
136225
input.getHeader(),
137226
tracer);
138227
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
139-
return super.signalExternalWorkflow(input);
228+
SignalExternalOutput output = super.signalExternalWorkflow(input);
229+
return new SignalExternalOutput(new PromiseWrapper<>(capturedSpan, output.getResult()));
140230
} finally {
141231
childWorkflowStartSpan.finish();
142232
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.opentracing;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertTrue;
25+
26+
import io.opentracing.Scope;
27+
import io.opentracing.Span;
28+
import io.opentracing.mock.MockSpan;
29+
import io.opentracing.mock.MockTracer;
30+
import io.opentracing.util.ThreadLocalScopeManager;
31+
import io.temporal.activity.ActivityInterface;
32+
import io.temporal.activity.ActivityMethod;
33+
import io.temporal.activity.ActivityOptions;
34+
import io.temporal.client.WorkflowClient;
35+
import io.temporal.client.WorkflowClientOptions;
36+
import io.temporal.client.WorkflowOptions;
37+
import io.temporal.testing.internal.SDKTestWorkflowRule;
38+
import io.temporal.worker.WorkerFactoryOptions;
39+
import io.temporal.workflow.*;
40+
import java.time.Duration;
41+
import org.junit.After;
42+
import org.junit.Rule;
43+
import org.junit.Test;
44+
45+
public class CallbackContextTest {
46+
47+
private static final MockTracer mockTracer =
48+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
49+
50+
private final OpenTracingOptions OT_OPTIONS =
51+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
52+
53+
@Rule
54+
public SDKTestWorkflowRule testWorkflowRule =
55+
SDKTestWorkflowRule.newBuilder()
56+
.setWorkflowClientOptions(
57+
WorkflowClientOptions.newBuilder()
58+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
59+
.validateAndBuildWithDefaults())
60+
.setWorkerFactoryOptions(
61+
WorkerFactoryOptions.newBuilder()
62+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
63+
.validateAndBuildWithDefaults())
64+
.setWorkflowTypes(WorkflowImpl.class)
65+
.setActivityImplementations(new ActivityImpl())
66+
.build();
67+
68+
@After
69+
public void tearDown() {
70+
mockTracer.reset();
71+
}
72+
73+
@ActivityInterface
74+
public interface TestActivity {
75+
@ActivityMethod
76+
boolean activity();
77+
}
78+
79+
@WorkflowInterface
80+
public interface TestWorkflow {
81+
@WorkflowMethod
82+
boolean workflow();
83+
}
84+
85+
public static class ActivityImpl implements TestActivity {
86+
@Override
87+
public boolean activity() {
88+
Span span = mockTracer.buildSpan("someWork").start();
89+
try (Scope ignored = mockTracer.scopeManager().activate(span)) {
90+
Thread.sleep(100);
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException(e);
93+
} finally {
94+
span.finish();
95+
}
96+
return true;
97+
}
98+
}
99+
100+
public static class WorkflowImpl implements TestWorkflow {
101+
private final TestActivity activity =
102+
Workflow.newActivityStub(
103+
TestActivity.class,
104+
ActivityOptions.newBuilder()
105+
.setStartToCloseTimeout(Duration.ofMinutes(1))
106+
.validateAndBuildWithDefaults());
107+
108+
@Override
109+
public boolean workflow() {
110+
return Async.function(activity::activity)
111+
.thenCompose(
112+
result ->
113+
Promise.allOf(
114+
Async.function(activity::activity), Async.function(activity::activity)))
115+
.thenCompose(result -> Async.function(activity::activity))
116+
.get();
117+
}
118+
}
119+
120+
@Test
121+
public void testCallbackContext() {
122+
MockSpan span = mockTracer.buildSpan("ClientFunction").start();
123+
124+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
125+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
126+
TestWorkflow workflow =
127+
client.newWorkflowStub(
128+
TestWorkflow.class,
129+
WorkflowOptions.newBuilder()
130+
.setTaskQueue(testWorkflowRule.getTaskQueue())
131+
.validateBuildWithDefaults());
132+
assertTrue(workflow.workflow());
133+
} finally {
134+
span.finish();
135+
}
136+
137+
OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans());
138+
139+
MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction");
140+
141+
MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0);
142+
assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId());
143+
assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName());
144+
145+
MockSpan workflowRunSpan = spansHelper.getByParentSpan(workflowStartSpan).get(0);
146+
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
147+
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
148+
149+
assertEquals(4, spansHelper.getByParentSpan(workflowRunSpan).stream().count());
150+
151+
spansHelper
152+
.getByParentSpan(workflowRunSpan)
153+
.forEach(
154+
(activityStartSpan) -> {
155+
assertEquals(workflowRunSpan.context().spanId(), activityStartSpan.parentId());
156+
assertEquals("StartActivity:Activity", activityStartSpan.operationName());
157+
158+
MockSpan activityRunSpan = spansHelper.getByParentSpan(activityStartSpan).get(0);
159+
assertEquals(activityStartSpan.context().spanId(), activityRunSpan.parentId());
160+
assertEquals("RunActivity:Activity", activityRunSpan.operationName());
161+
162+
MockSpan activityWorkSpan = spansHelper.getByParentSpan(activityRunSpan).get(0);
163+
assertEquals(activityRunSpan.context().spanId(), activityWorkSpan.parentId());
164+
assertEquals("someWork", activityWorkSpan.operationName());
165+
});
166+
}
167+
}

0 commit comments

Comments
 (0)