Skip to content

Commit fa061a2

Browse files
author
daniela
committed
IWF-936: fix FAIL_WORKFLOW_ON_FAILURE state option not working
1 parent 53858c3 commit fa061a2

File tree

4 files changed

+154
-13
lines changed

4 files changed

+154
-13
lines changed

iwf/tests/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
from iwf.tests.workflows.state_options_override_workflow import (
2929
StateOptionsOverrideWorkflow,
3030
)
31+
from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1,
32+
StateOptionsWorkflow2)
3133
from iwf.tests.workflows.timer_workflow import TimerWorkflow
3234
from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import (
3335
WaitForStateWithStateExecutionIdWorkflow,
@@ -57,6 +59,8 @@
5759
registry.add_workflow(RpcMemoWorkflow())
5860
registry.add_workflow(RPCWorkflow())
5961
registry.add_workflow(StateOptionsOverrideWorkflow())
62+
registry.add_workflow(StateOptionsWorkflow1())
63+
registry.add_workflow(StateOptionsWorkflow2())
6064
registry.add_workflow(TimerWorkflow())
6165
registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow())
6266
registry.add_workflow(WaitForStateWithWaitForKeyWorkflow())

iwf/tests/test_workflow_state_options.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
1+
import inspect
2+
import time
13
import unittest
24

5+
from iwf.client import Client
6+
from iwf.iwf_api.models import IDReusePolicy
37
from iwf.iwf_api.models import (
48
PersistenceLoadingPolicy,
59
PersistenceLoadingType,
6-
WorkflowStateOptions as IdlWorkflowStateOptions,
10+
WorkflowStateOptions as IdlWorkflowStateOptions, RetryPolicy,
11+
WaitUntilApiFailurePolicy,
712
)
8-
13+
from iwf.tests.worker_server import registry
14+
from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1,
15+
StateOptionsWorkflow2)
16+
from iwf.workflow_options import WorkflowOptions
917
from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options
18+
from ..errors import WorkflowFailed
1019

1120

1221
class TestWorkflowStateOptions(unittest.TestCase):
22+
@classmethod
23+
def setUpClass(cls):
24+
cls.client = Client(registry)
25+
1326
def test_convert_to_idl(self):
1427
empty_idl = IdlWorkflowStateOptions()
1528
assert empty_idl == _to_idl_state_options(False, None, {})
@@ -29,3 +42,60 @@ def test_convert_to_idl(self):
2942
assert non_empty_idl == _to_idl_state_options(True, non_empty, {})
3043
non_empty.state_id = "state-id-2"
3144
assert non_empty.state_id == "state-id-2"
45+
46+
"""Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values."""
47+
def test_proceed_to_execute_when_wait_until_retry_exhausted(self):
48+
retry_policy = RetryPolicy(maximum_attempts=1)
49+
50+
# Test PROCEED_ON_FAILURE
51+
options_proceed = WorkflowStateOptions(
52+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
53+
wait_until_api_retry_policy=retry_policy,
54+
)
55+
result_proceed = _to_idl_state_options(False, options_proceed, {})
56+
assert result_proceed.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
57+
58+
# Test FAIL_WORKFLOW_ON_FAILURE
59+
options_fail = WorkflowStateOptions(
60+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
61+
wait_until_api_retry_policy=retry_policy,
62+
)
63+
result_fail = _to_idl_state_options(False, options_fail, {})
64+
assert result_fail.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
65+
66+
# Test with None/unset value
67+
options = WorkflowStateOptions()
68+
result = _to_idl_state_options(False, options, {})
69+
# By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None
70+
# The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default
71+
from iwf.iwf_api.types import Unset
72+
self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset))
73+
74+
def test_proceed_on_failure(self):
75+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
76+
self.client.start_workflow(
77+
StateOptionsWorkflow1,
78+
wf_id,
79+
10,
80+
"input",
81+
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
82+
)
83+
output = self.client.wait_for_workflow_completion(wf_id)
84+
85+
assert (
86+
output
87+
== "InitState1_execute_completed"
88+
)
89+
90+
def test_fail_workflow_on_failure(self):
91+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
92+
self.client.start_workflow(
93+
StateOptionsWorkflow2,
94+
wf_id,
95+
10,
96+
"input",
97+
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
98+
)
99+
100+
with self.assertRaises(WorkflowFailed):
101+
self.client.wait_for_workflow_completion(wf_id, str)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from iwf.command_request import CommandRequest
2+
from iwf.command_results import CommandResults
3+
from iwf.communication import Communication
4+
5+
from iwf.iwf_api.models import RetryPolicy, WaitUntilApiFailurePolicy
6+
from iwf.persistence import Persistence
7+
from iwf.state_decision import StateDecision
8+
from iwf.state_schema import StateSchema
9+
from iwf.workflow import ObjectWorkflow
10+
from iwf.workflow_context import WorkflowContext
11+
from iwf.workflow_state import T, WorkflowState
12+
from iwf.workflow_state_options import WorkflowStateOptions
13+
14+
15+
class InitState1(WorkflowState[str]):
16+
def get_state_options(self) -> WorkflowStateOptions:
17+
return WorkflowStateOptions(
18+
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
19+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
20+
)
21+
22+
def wait_until(
23+
self,
24+
ctx: WorkflowContext,
25+
input: T,
26+
persistence: Persistence,
27+
communication: Communication,
28+
) -> CommandRequest:
29+
raise RuntimeError("test failure")
30+
31+
def execute(
32+
self,
33+
ctx: WorkflowContext,
34+
input: T,
35+
command_results: CommandResults,
36+
persistence: Persistence,
37+
communication: Communication,
38+
) -> StateDecision:
39+
data = "InitState1_execute_completed"
40+
return StateDecision.graceful_complete_workflow(data)
41+
42+
43+
class InitState2(WorkflowState[str]):
44+
def get_state_options(self) -> WorkflowStateOptions:
45+
return WorkflowStateOptions(
46+
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
47+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
48+
)
49+
50+
def wait_until(
51+
self,
52+
ctx: WorkflowContext,
53+
input: T,
54+
persistence: Persistence,
55+
communication: Communication,
56+
) -> CommandRequest:
57+
raise RuntimeError("test failure")
58+
59+
def execute(
60+
self,
61+
ctx: WorkflowContext,
62+
input: T,
63+
command_results: CommandResults,
64+
persistence: Persistence,
65+
communication: Communication,
66+
) -> StateDecision:
67+
data = "InitState2_execute_completed"
68+
return StateDecision.graceful_complete_workflow(data)
69+
70+
71+
class StateOptionsWorkflow1(ObjectWorkflow):
72+
def get_workflow_states(self) -> StateSchema:
73+
return StateSchema.with_starting_state(InitState1())
74+
75+
class StateOptionsWorkflow2(ObjectWorkflow):
76+
def get_workflow_states(self) -> StateSchema:
77+
return StateSchema.with_starting_state(InitState2())
78+

iwf/workflow_state_options.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,6 @@ def _to_idl_state_options(
108108
)
109109
if options.wait_until_api_retry_policy is not None:
110110
res.wait_until_api_retry_policy = options.wait_until_api_retry_policy
111-
if options.proceed_to_execute_when_wait_until_retry_exhausted is not None:
112-
if options.proceed_to_execute_when_wait_until_retry_exhausted:
113-
res.wait_until_api_failure_policy = (
114-
WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
115-
)
116-
else:
117-
res.wait_until_api_failure_policy = (
118-
WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
119-
)
120-
121-
pass
122111
if options.wait_until_api_timeout_seconds is not None:
123112
res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds
124113
if options.execute_api_retry_policy is not None:

0 commit comments

Comments
 (0)