Skip to content

Commit 9a3b491

Browse files
authored
IWF-936: fix FAIL_WORKFLOW_ON_FAILURE state option not working (#113)
1 parent 53858c3 commit 9a3b491

File tree

4 files changed

+163
-12
lines changed

4 files changed

+163
-12
lines changed

iwf/tests/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
from iwf.tests.workflows.state_options_override_workflow import (
2929
StateOptionsOverrideWorkflow,
3030
)
31+
from iwf.tests.workflows.state_options_workflow import (
32+
StateOptionsWorkflow1,
33+
StateOptionsWorkflow2,
34+
)
3135
from iwf.tests.workflows.timer_workflow import TimerWorkflow
3236
from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import (
3337
WaitForStateWithStateExecutionIdWorkflow,
@@ -57,6 +61,8 @@
5761
registry.add_workflow(RpcMemoWorkflow())
5862
registry.add_workflow(RPCWorkflow())
5963
registry.add_workflow(StateOptionsOverrideWorkflow())
64+
registry.add_workflow(StateOptionsWorkflow1())
65+
registry.add_workflow(StateOptionsWorkflow2())
6066
registry.add_workflow(TimerWorkflow())
6167
registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow())
6268
registry.add_workflow(WaitForStateWithWaitForKeyWorkflow())

iwf/tests/test_workflow_state_options.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
1+
import inspect
2+
import time
13
import unittest
24

5+
from iwf.client import Client
6+
from iwf.iwf_api.models import IDReusePolicy
37
from iwf.iwf_api.models import (
48
PersistenceLoadingPolicy,
59
PersistenceLoadingType,
610
WorkflowStateOptions as IdlWorkflowStateOptions,
11+
RetryPolicy,
12+
WaitUntilApiFailurePolicy,
713
)
8-
14+
from iwf.tests.worker_server import registry
15+
from iwf.tests.workflows.state_options_workflow import (
16+
StateOptionsWorkflow1,
17+
StateOptionsWorkflow2,
18+
)
19+
from iwf.workflow_options import WorkflowOptions
920
from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options
21+
from ..errors import WorkflowFailed
1022

1123

1224
class TestWorkflowStateOptions(unittest.TestCase):
25+
@classmethod
26+
def setUpClass(cls):
27+
cls.client = Client(registry)
28+
1329
def test_convert_to_idl(self):
1430
empty_idl = IdlWorkflowStateOptions()
1531
assert empty_idl == _to_idl_state_options(False, None, {})
@@ -29,3 +45,65 @@ def test_convert_to_idl(self):
2945
assert non_empty_idl == _to_idl_state_options(True, non_empty, {})
3046
non_empty.state_id = "state-id-2"
3147
assert non_empty.state_id == "state-id-2"
48+
49+
"""Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values."""
50+
51+
def test_proceed_to_execute_when_wait_until_retry_exhausted(self):
52+
retry_policy = RetryPolicy(maximum_attempts=1)
53+
54+
# Test PROCEED_ON_FAILURE
55+
options_proceed = WorkflowStateOptions(
56+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
57+
wait_until_api_retry_policy=retry_policy,
58+
)
59+
result_proceed = _to_idl_state_options(False, options_proceed, {})
60+
assert (
61+
result_proceed.wait_until_api_failure_policy
62+
== WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
63+
)
64+
65+
# Test FAIL_WORKFLOW_ON_FAILURE
66+
options_fail = WorkflowStateOptions(
67+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
68+
wait_until_api_retry_policy=retry_policy,
69+
)
70+
result_fail = _to_idl_state_options(False, options_fail, {})
71+
assert (
72+
result_fail.wait_until_api_failure_policy
73+
== WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
74+
)
75+
76+
# Test with None/unset value
77+
options = WorkflowStateOptions()
78+
result = _to_idl_state_options(False, options, {})
79+
# By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None
80+
# The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default
81+
from iwf.iwf_api.types import Unset
82+
83+
self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset))
84+
85+
def test_proceed_on_failure(self):
86+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
87+
self.client.start_workflow(
88+
StateOptionsWorkflow1,
89+
wf_id,
90+
10,
91+
"input",
92+
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
93+
)
94+
output = self.client.wait_for_workflow_completion(wf_id)
95+
96+
assert output == "InitState1_execute_completed"
97+
98+
def test_fail_workflow_on_failure(self):
99+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
100+
self.client.start_workflow(
101+
StateOptionsWorkflow2,
102+
wf_id,
103+
10,
104+
"input",
105+
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
106+
)
107+
108+
with self.assertRaises(WorkflowFailed):
109+
self.client.wait_for_workflow_completion(wf_id, str)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from iwf.command_request import CommandRequest
2+
from iwf.command_results import CommandResults
3+
from iwf.communication import Communication
4+
5+
from iwf.iwf_api.models import RetryPolicy, WaitUntilApiFailurePolicy
6+
from iwf.persistence import Persistence
7+
from iwf.state_decision import StateDecision
8+
from iwf.state_schema import StateSchema
9+
from iwf.workflow import ObjectWorkflow
10+
from iwf.workflow_context import WorkflowContext
11+
from iwf.workflow_state import T, WorkflowState
12+
from iwf.workflow_state_options import WorkflowStateOptions
13+
14+
15+
class InitState1(WorkflowState[str]):
16+
def get_state_options(self) -> WorkflowStateOptions:
17+
return WorkflowStateOptions(
18+
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
19+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
20+
)
21+
22+
def wait_until(
23+
self,
24+
ctx: WorkflowContext,
25+
input: T,
26+
persistence: Persistence,
27+
communication: Communication,
28+
) -> CommandRequest:
29+
raise RuntimeError("test failure")
30+
31+
def execute(
32+
self,
33+
ctx: WorkflowContext,
34+
input: T,
35+
command_results: CommandResults,
36+
persistence: Persistence,
37+
communication: Communication,
38+
) -> StateDecision:
39+
data = "InitState1_execute_completed"
40+
return StateDecision.graceful_complete_workflow(data)
41+
42+
43+
class InitState2(WorkflowState[str]):
44+
def get_state_options(self) -> WorkflowStateOptions:
45+
return WorkflowStateOptions(
46+
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
47+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
48+
)
49+
50+
def wait_until(
51+
self,
52+
ctx: WorkflowContext,
53+
input: T,
54+
persistence: Persistence,
55+
communication: Communication,
56+
) -> CommandRequest:
57+
raise RuntimeError("test failure")
58+
59+
def execute(
60+
self,
61+
ctx: WorkflowContext,
62+
input: T,
63+
command_results: CommandResults,
64+
persistence: Persistence,
65+
communication: Communication,
66+
) -> StateDecision:
67+
data = "InitState2_execute_completed"
68+
return StateDecision.graceful_complete_workflow(data)
69+
70+
71+
class StateOptionsWorkflow1(ObjectWorkflow):
72+
def get_workflow_states(self) -> StateSchema:
73+
return StateSchema.with_starting_state(InitState1())
74+
75+
76+
class StateOptionsWorkflow2(ObjectWorkflow):
77+
def get_workflow_states(self) -> StateSchema:
78+
return StateSchema.with_starting_state(InitState2())

iwf/workflow_state_options.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,6 @@ def _to_idl_state_options(
108108
)
109109
if options.wait_until_api_retry_policy is not None:
110110
res.wait_until_api_retry_policy = options.wait_until_api_retry_policy
111-
if options.proceed_to_execute_when_wait_until_retry_exhausted is not None:
112-
if options.proceed_to_execute_when_wait_until_retry_exhausted:
113-
res.wait_until_api_failure_policy = (
114-
WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
115-
)
116-
else:
117-
res.wait_until_api_failure_policy = (
118-
WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
119-
)
120-
121-
pass
122111
if options.wait_until_api_timeout_seconds is not None:
123112
res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds
124113
if options.execute_api_retry_policy is not None:

0 commit comments

Comments
 (0)