Skip to content

Commit 90e5125

Browse files
Fix NDE caused by removing Workflow.getVersion with a succeeding Work… (#2370)
* Fix NDE caused by removing Workflow.getVersion with a succeeding Workflow.sideEffect * Add comment
1 parent b593b35 commit 90e5125

File tree

6 files changed

+178
-1
lines changed

6 files changed

+178
-1
lines changed

temporal-sdk/src/main/java/io/temporal/internal/history/MarkerUtils.java

+16
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
package io.temporal.internal.history;
2222

23+
import io.temporal.api.command.v1.Command;
24+
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
2325
import io.temporal.api.common.v1.Payloads;
26+
import io.temporal.api.enums.v1.CommandType;
2427
import io.temporal.api.enums.v1.EventType;
2528
import io.temporal.api.history.v1.HistoryEvent;
2629
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
@@ -42,6 +45,19 @@ public static boolean verifyMarkerName(HistoryEvent event, String markerName) {
4245
return markerName.equals(attributes.getMarkerName());
4346
}
4447

48+
/**
49+
* @param command {@code Command} to inspect
50+
* @param markerName expected marker name
51+
* @return true if the command has a correct structure for a marker and an expected marker name
52+
*/
53+
public static boolean verifyMarkerName(Command command, String markerName) {
54+
if (!CommandType.COMMAND_TYPE_RECORD_MARKER.equals(command.getCommandType())) {
55+
return false;
56+
}
57+
RecordMarkerCommandAttributes attributes = command.getRecordMarkerCommandAttributes();
58+
return markerName.equals(attributes.getMarkerName());
59+
}
60+
4561
/**
4662
* This method should be used to extract values from the marker persisted by the SDK itself. These
4763
* values are converted using standard data converter to be always accessible by the SDK.

temporal-sdk/src/main/java/io/temporal/internal/history/VersionMarkerUtils.java

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.internal.history;
2222

2323
import com.google.common.base.Preconditions;
24+
import io.temporal.api.command.v1.Command;
2425
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
2526
import io.temporal.api.common.v1.Payloads;
2627
import io.temporal.api.history.v1.HistoryEvent;
@@ -56,6 +57,14 @@ public static boolean hasVersionMarkerStructure(HistoryEvent event) {
5657
return MarkerUtils.verifyMarkerName(event, MARKER_NAME);
5758
}
5859

60+
/**
61+
* @param command {@code Command} to inspect
62+
* @return true if the command has a correct structure for a version marker
63+
*/
64+
public static boolean hasVersionMarkerStructure(Command command) {
65+
return MarkerUtils.verifyMarkerName(command, MARKER_NAME);
66+
}
67+
5968
@Nullable
6069
public static String getChangeId(MarkerRecordedEventAttributes markerAttributes) {
6170
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_CHANGE_ID_KEY, String.class);

temporal-sdk/src/main/java/io/temporal/internal/statemachines/StateMachineCommandUtils.java

+5
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,9 @@ public static Command createRecordMarker(RecordMarkerCommandAttributes attribute
3434
.setRecordMarkerCommandAttributes(attributes)
3535
.build();
3636
}
37+
38+
public static Command createFakeMarkerCommand(String markerName) {
39+
return createRecordMarker(
40+
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build());
41+
}
3742
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package io.temporal.internal.statemachines;
2222

23+
import static io.temporal.internal.statemachines.StateMachineCommandUtils.createFakeMarkerCommand;
2324
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
2425

2526
import com.google.common.annotations.VisibleForTesting;
@@ -200,7 +201,7 @@ public void handleWorkflowTaskStarted() {
200201
}
201202

202203
void createFakeCommand() {
203-
addCommand(StateMachineCommandUtils.RECORD_MARKER_FAKE_COMMAND);
204+
addCommand(createFakeMarkerCommand(VersionMarkerUtils.MARKER_NAME));
204205
}
205206

206207
private void validateVersionAndThrow(boolean preloaded) {

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

+20
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,26 @@ private void handleCommandEvent(HistoryEvent event) {
525525
continue;
526526
}
527527

528+
// This checks if the next event is a version marker, but the next command is not a version
529+
// marker. This can happen if a getVersion call was removed.
530+
if (VersionMarkerUtils.hasVersionMarkerStructure(event)
531+
&& !VersionMarkerUtils.hasVersionMarkerStructure(command.getCommand())) {
532+
if (handleNonMatchingVersionMarker(event)) {
533+
// this event is a version marker for removed getVersion call.
534+
// Handle the version marker as unmatched and return even if there is no commands to match
535+
// it against.
536+
return;
537+
} else {
538+
throw new NonDeterministicException(
539+
"Event "
540+
+ event.getEventId()
541+
+ " of type "
542+
+ event.getEventType()
543+
+ " does not"
544+
+ " match command type "
545+
+ command.getCommandType());
546+
}
547+
}
528548
// Note that handleEvent can cause a command cancellation in case of
529549
// 1. MutableSideEffect
530550
// 2. Version State Machine during replay cancels the command and enters SKIPPED state
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.versionTests;
22+
23+
import static org.junit.Assert.*;
24+
25+
import io.temporal.activity.LocalActivityOptions;
26+
import io.temporal.testing.internal.SDKTestWorkflowRule;
27+
import io.temporal.worker.WorkerOptions;
28+
import io.temporal.workflow.Workflow;
29+
import io.temporal.workflow.shared.TestActivities;
30+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
31+
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
32+
import io.temporal.workflow.unsafe.WorkflowUnsafe;
33+
import java.time.Duration;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
37+
public class GetVersionRemovalBeforeMarkerTest {
38+
private static boolean hasReplayed;
39+
40+
@Rule
41+
public SDKTestWorkflowRule testWorkflowRule =
42+
SDKTestWorkflowRule.newBuilder()
43+
.setWorkflowTypes(TestGetVersionRemovalWorkflowImpl.class)
44+
.setActivityImplementations(new TestActivitiesImpl())
45+
// Forcing a replay. Full history arrived from a normal queue causing a replay.
46+
.setWorkerOptions(
47+
WorkerOptions.newBuilder()
48+
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
49+
.build())
50+
.build();
51+
52+
@Test
53+
public void testSideEffectAfterGetVersion() {
54+
TestWorkflow1 workflowStub =
55+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
56+
String result = workflowStub.execute("SideEffect");
57+
assertTrue(hasReplayed);
58+
assertEquals("side effect", result);
59+
}
60+
61+
@Test
62+
public void testMutableSideEffectAfterGetVersion() {
63+
TestWorkflow1 workflowStub =
64+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
65+
String result = workflowStub.execute("MutableSideEffect");
66+
assertTrue(hasReplayed);
67+
assertEquals("mutable side effect", result);
68+
}
69+
70+
@Test
71+
public void testGetVersionAfterGetVersion() {
72+
TestWorkflow1 workflowStub =
73+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
74+
String result = workflowStub.execute("GetVersion");
75+
assertTrue(hasReplayed);
76+
assertEquals("6", result);
77+
}
78+
79+
@Test
80+
public void testLocalActivityAfterGetVersion() {
81+
TestWorkflow1 workflowStub =
82+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
83+
String result = workflowStub.execute("LocalActivity");
84+
assertTrue(hasReplayed);
85+
assertEquals("activity", result);
86+
}
87+
88+
public static class TestGetVersionRemovalWorkflowImpl implements TestWorkflow1 {
89+
private final TestActivities.VariousTestActivities activities =
90+
Workflow.newLocalActivityStub(
91+
TestActivities.VariousTestActivities.class,
92+
LocalActivityOptions.newBuilder()
93+
.setStartToCloseTimeout(Duration.ofSeconds(5))
94+
.build());
95+
96+
@Override
97+
public String execute(String action) {
98+
// Test removing a version check in replaying code with an additional thread running.
99+
if (!WorkflowUnsafe.isReplaying()) {
100+
int version = Workflow.getVersion("changeId", 1, 2);
101+
assertEquals(version, 2);
102+
} else {
103+
hasReplayed = true;
104+
}
105+
String result = "";
106+
if (action.equals("SideEffect")) {
107+
result = Workflow.sideEffect(String.class, () -> "side effect");
108+
} else if (action.equals("MutableSideEffect")) {
109+
result =
110+
Workflow.mutableSideEffect(
111+
"mutable-side-effect-i",
112+
String.class,
113+
(a, b) -> !a.equals(b),
114+
() -> "mutable side effect");
115+
} else if (action.equals("GetVersion")) {
116+
int v = Workflow.getVersion("otherChangeId", 5, 6);
117+
result = String.valueOf(v);
118+
} else if (action.equals("LocalActivity")) {
119+
result = activities.activity();
120+
}
121+
// Sleep to trigger at lest one more workflow task
122+
Workflow.sleep(Duration.ofSeconds(1));
123+
return result;
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)