From 8ba253ced99879a01445372d0fab7f574a1705de Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Tue, 18 Feb 2025 23:23:13 -0800 Subject: [PATCH 01/18] fix sync for map task in dynamic Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 49 ++++++++++++++++++++++++++++++++++--- flytekit/remote/remote.py | 5 ++-- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index fd78d4c3c4..f4f08921f9 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -348,13 +348,33 @@ def promote_from_model(cls, model: _workflow_model.GateNode): class FlyteArrayNode(_workflow_model.ArrayNode): + def __init__(self, node, parallelism, min_successes, min_success_ratio, flyte_task_node=None, flyte_workflow_node=None): + super().__init__(node, parallelism, min_successes, min_success_ratio) + self._flyte_task_node = flyte_task_node + self._flyte_workflow_node = flyte_workflow_node + + @property + def flyte_task_node(self): + return self._flyte_task_node + + @property + def flyte_workflow_node(self): + return self._flyte_workflow_node + @classmethod - def promote_from_model(cls, model: _workflow_model.ArrayNode): + def promote_from_model( + cls, + model: _workflow_model.ArrayNode, + flyte_task_node: Optional[FlyteTaskNode] = None, + flyte_workflow_node: Optional[FlyteWorkflowNode] = None, + ): return cls( node=model._node, parallelism=model._parallelism, min_successes=model._min_successes, min_success_ratio=model._min_success_ratio, + flyte_task_node=flyte_task_node, + flyte_workflow_node=flyte_workflow_node, ) @@ -406,7 +426,7 @@ def task_node(self) -> Optional[FlyteTaskNode]: return self._flyte_task_node @property - def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]: + def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode, FlyteArrayNode]: return self._flyte_entity @classmethod @@ -477,8 +497,29 @@ def promote_from_model( elif model.gate_node is not None: flyte_gate_node = FlyteGateNode.promote_from_model(model.gate_node) elif model.array_node is not None: - flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) - # TODO: validate task in tasks + # map over task + if model.array_node.node.task_node is not None: + if model.array_node.node.task_node.reference_id not in tasks: + raise RuntimeError( + f"Remote Workflow closure does not have task with id {model.array_node.node.task_node.reference_id}." + ) + flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id])) + # map over launch plan + elif model.array_node.node.workflow_node is not None: + workflow_node, converted_sub_workflows = cls._promote_workflow_node( + model.array_node.node.workflow_node, + sub_workflows, + node_launch_plans, + tasks, + converted_sub_workflows, + ) + flyte_array_node = FlyteArrayNode.promote_from_model( + model.array_node, flyte_workflow_node=workflow_node + ) + else: + raise _system_exceptions.FlyteSystemException( + "Array node must have either task or workflow node specified" + ) else: raise _system_exceptions.FlyteSystemException( f"Bad Node model, neither task nor workflow detected, node: {model}" diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 108755d7f2..06426f9efd 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2615,10 +2615,9 @@ def sync_node_execution( if execution._node.array_node is None: logger.error("Array node not found") return execution - # if there's a task node underneath the array node, let's fetch the interface for it + # if there's a task node underneath the array node if execution._node.array_node.node.task_node is not None: - tid = execution._node.array_node.node.task_node.reference_id - t = self.fetch_task(tid.project, tid.domain, tid.name, tid.version) + t = execution._node.flyte_entity.flyte_task_node.flyte_task execution._task_executions = [ self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t) for task_execution in iterate_task_executions(self.client, execution.id) From 8bc2096d54babeded4cacd0f970ea710eaad9713 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Wed, 19 Feb 2025 10:11:43 -0800 Subject: [PATCH 02/18] remove unnecessary Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index f4f08921f9..00e9ba40de 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -348,25 +348,19 @@ def promote_from_model(cls, model: _workflow_model.GateNode): class FlyteArrayNode(_workflow_model.ArrayNode): - def __init__(self, node, parallelism, min_successes, min_success_ratio, flyte_task_node=None, flyte_workflow_node=None): + def __init__(self, node, parallelism, min_successes, min_success_ratio, flyte_task_node=None): super().__init__(node, parallelism, min_successes, min_success_ratio) self._flyte_task_node = flyte_task_node - self._flyte_workflow_node = flyte_workflow_node @property def flyte_task_node(self): return self._flyte_task_node - @property - def flyte_workflow_node(self): - return self._flyte_workflow_node - @classmethod def promote_from_model( cls, model: _workflow_model.ArrayNode, flyte_task_node: Optional[FlyteTaskNode] = None, - flyte_workflow_node: Optional[FlyteWorkflowNode] = None, ): return cls( node=model._node, @@ -374,7 +368,6 @@ def promote_from_model( min_successes=model._min_successes, min_success_ratio=model._min_success_ratio, flyte_task_node=flyte_task_node, - flyte_workflow_node=flyte_workflow_node, ) @@ -504,18 +497,6 @@ def promote_from_model( f"Remote Workflow closure does not have task with id {model.array_node.node.task_node.reference_id}." ) flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id])) - # map over launch plan - elif model.array_node.node.workflow_node is not None: - workflow_node, converted_sub_workflows = cls._promote_workflow_node( - model.array_node.node.workflow_node, - sub_workflows, - node_launch_plans, - tasks, - converted_sub_workflows, - ) - flyte_array_node = FlyteArrayNode.promote_from_model( - model.array_node, flyte_workflow_node=workflow_node - ) else: raise _system_exceptions.FlyteSystemException( "Array node must have either task or workflow node specified" From afdd73a03ad23871e28e211f628bb287236946ed Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Wed, 19 Feb 2025 13:48:40 -0800 Subject: [PATCH 03/18] remove raise error Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index 00e9ba40de..daaecd0911 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -498,9 +498,7 @@ def promote_from_model( ) flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id])) else: - raise _system_exceptions.FlyteSystemException( - "Array node must have either task or workflow node specified" - ) + flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) else: raise _system_exceptions.FlyteSystemException( f"Bad Node model, neither task nor workflow detected, node: {model}" From a3d71574286e21b9cac5e91c10a96848ae106639 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Wed, 19 Feb 2025 14:23:54 -0800 Subject: [PATCH 04/18] add integration tests Signed-off-by: Troy Chiu --- tests/flytekit/integration/remote/test_remote.py | 14 ++++++++++++++ .../remote/workflows/basic/dynamic_array_map.py | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 3babe86c45..d3f897b76f 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -616,6 +616,20 @@ def test_execute_workflow_with_maptask(register): assert execution.outputs["o0"] == [4, 5, 6] assert len(execution.node_executions["n0"].task_executions) == 1 +def test_execution_workflow_with_maptask_in_dynamic(register): + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + d: typing.List[int] = [1, 2, 3] + flyte_launch_plan = remote.fetch_launch_plan(name="basic.dynamic_array_map.workflow_with_maptask_in_dynamic", version=VERSION) + execution = remote.execute( + flyte_launch_plan, + inputs={"data": d}, + version=VERSION, + wait=True, + ) + assert execution.outputs["o0"] == [2, 3, 4] + assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1 + + def test_executes_nested_workflow_dictating_interruptible(register): remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) flyte_launch_plan = remote.fetch_launch_plan(name="basic.child_workflow.parent_wf", version=VERSION) diff --git a/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py b/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py new file mode 100644 index 0000000000..9c005c7e62 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py @@ -0,0 +1,16 @@ +from flytekit import workflow, task, map_task, dynamic + + +@task +def fn(x: int) -> int: + return x + 1 + + +@dynamic +def dynamic(data: list[int]) -> list[int]: + return map_task(fn)(x=data) + + +@workflow +def workflow_with_maptask_in_dynamic(data: list[int]) -> list[int]: + return dynamic(data=data) \ No newline at end of file From 19b15622a3fa63f1003f29462c4db7d2331e659a Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Wed, 19 Feb 2025 14:24:39 -0800 Subject: [PATCH 05/18] lint Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 5 ++++- .../integration/remote/workflows/basic/dynamic_array_map.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index daaecd0911..81026a173d 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -496,7 +496,10 @@ def promote_from_model( raise RuntimeError( f"Remote Workflow closure does not have task with id {model.array_node.node.task_node.reference_id}." ) - flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id])) + flyte_array_node = FlyteArrayNode.promote_from_model( + model.array_node, + flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id]), + ) else: flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) else: diff --git a/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py b/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py index 9c005c7e62..12407b095a 100644 --- a/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py +++ b/tests/flytekit/integration/remote/workflows/basic/dynamic_array_map.py @@ -13,4 +13,4 @@ def dynamic(data: list[int]) -> list[int]: @workflow def workflow_with_maptask_in_dynamic(data: list[int]) -> list[int]: - return dynamic(data=data) \ No newline at end of file + return dynamic(data=data) From f395792fd945678f6441c2ecfd612421f2de8c90 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Wed, 19 Feb 2025 15:11:32 -0800 Subject: [PATCH 06/18] type hint Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 10 +++++++++- tests/flytekit/integration/remote/test_remote.py | 3 +++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index 81026a173d..c8e8bd5442 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -348,7 +348,14 @@ def promote_from_model(cls, model: _workflow_model.GateNode): class FlyteArrayNode(_workflow_model.ArrayNode): - def __init__(self, node, parallelism, min_successes, min_success_ratio, flyte_task_node=None): + def __init__( + self, + node: _workflow_model.Node, + parallelism: int, + min_successes: int, + min_success_ratio: float, + flyte_task_node: Optional[FlyteTaskNode] = None, + ): super().__init__(node, parallelism, min_successes, min_success_ratio) self._flyte_task_node = flyte_task_node @@ -500,6 +507,7 @@ def promote_from_model( model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id]), ) + # default case else: flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) else: diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index d3f897b76f..7c2a9b5c3d 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -627,6 +627,9 @@ def test_execution_workflow_with_maptask_in_dynamic(register): wait=True, ) assert execution.outputs["o0"] == [2, 3, 4] + assert "n0" in execution.node_executions + assert execution.node_executions["n0"].subworkflow_node_executions is not None + assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1 From 7174381294c8a66cc8b5822fca65758992d1e722 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Thu, 20 Feb 2025 12:47:58 -0800 Subject: [PATCH 07/18] wip Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 20 ++++++++++++++++++++ flytekit/remote/remote.py | 16 ++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index c8e8bd5442..a4a8c8ec88 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -355,19 +355,26 @@ def __init__( min_successes: int, min_success_ratio: float, flyte_task_node: Optional[FlyteTaskNode] = None, + flyte_workflow_node: Optional[FlyteWorkflowNode] = None, ): super().__init__(node, parallelism, min_successes, min_success_ratio) self._flyte_task_node = flyte_task_node + self._flyte_workflow_node = flyte_workflow_node @property def flyte_task_node(self): return self._flyte_task_node + @property + def flyte_workflow_node(self): + return self._flyte_workflow_node + @classmethod def promote_from_model( cls, model: _workflow_model.ArrayNode, flyte_task_node: Optional[FlyteTaskNode] = None, + flyte_workflow_node: Optional[FlyteWorkflowNode] = None, ): return cls( node=model._node, @@ -375,6 +382,7 @@ def promote_from_model( min_successes=model._min_successes, min_success_ratio=model._min_success_ratio, flyte_task_node=flyte_task_node, + flyte_workflow_node=flyte_workflow_node, ) @@ -507,6 +515,18 @@ def promote_from_model( model.array_node, flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id]), ) + # map over launch plan + elif model.array_node.node.workflow_node is not None: + workflow_node, converted_sub_workflows = cls._promote_workflow_node( + model.array_node.node.workflow_node, + sub_workflows, + node_launch_plans, + tasks, + converted_sub_workflows, + ) + flyte_array_node = FlyteArrayNode.promote_from_model( + model.array_node, flyte_workflow_node=workflow_node + ) # default case else: flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 06426f9efd..612974e807 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -497,6 +497,11 @@ def find_launch_plan( lp_ref = node.workflow_node.launchplan_ref find_launch_plan(lp_ref, node_launch_plans) + # Inspect array nodes for launch plans + if node.array_node is not None and node.array_node.node.workflow_node is not None and node.array_node.node.workflow_node.launchplan_ref is not None: + lp_ref = node.array_node.node.workflow_node.launchplan_ref + find_launch_plan(lp_ref, node_launch_plans) + # Inspect conditional branch nodes for launch plans def get_launch_plan_from_branch( branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec] @@ -2627,6 +2632,17 @@ def sync_node_execution( else: logger.error(f"Fetched map task does not have an interface, skipping i/o {t}") return execution + elif execution._node.array_node.node.workflow_node is not None: + breakpoint() + sub_flyte_workflow = execution._node.flyte_entity.flyte_workflow_node + launched_exec_id = execution.closure.workflow_node_metadata.execution_id + launched_exec = self.fetch_execution( + project=launched_exec_id.project, domain=launched_exec_id.domain, name=launched_exec_id.name + ) + self.sync_execution(launched_exec) + execution._workflow_executions.append(launched_exec) + execution._interface = launched_exec._flyte_workflow.interface + return execution else: logger.error("Array node not over task, skipping i/o") return execution From e5d6dd272fd008d0a63a8cc6c1b3daadb0f1eff5 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Thu, 20 Feb 2025 13:32:18 -0800 Subject: [PATCH 08/18] use flyte node Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 43 +++++++++++++++++++------------------ flytekit/remote/remote.py | 2 +- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index c8e8bd5442..ae809d8264 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -350,31 +350,30 @@ def promote_from_model(cls, model: _workflow_model.GateNode): class FlyteArrayNode(_workflow_model.ArrayNode): def __init__( self, - node: _workflow_model.Node, + flyte_node: FlyteNode, parallelism: int, min_successes: int, min_success_ratio: float, - flyte_task_node: Optional[FlyteTaskNode] = None, + ): - super().__init__(node, parallelism, min_successes, min_success_ratio) - self._flyte_task_node = flyte_task_node + super().__init__(flyte_node, parallelism, min_successes, min_success_ratio) + self._flyte_node = flyte_node @property - def flyte_task_node(self): - return self._flyte_task_node + def flyte_node(self) -> FlyteNode: + return self._flyte_node @classmethod def promote_from_model( cls, model: _workflow_model.ArrayNode, - flyte_task_node: Optional[FlyteTaskNode] = None, + flyte_node: FlyteNode, ): return cls( - node=model._node, + flyte_node=flyte_node, parallelism=model._parallelism, min_successes=model._min_successes, min_success_ratio=model._min_success_ratio, - flyte_task_node=flyte_task_node, ) @@ -497,19 +496,21 @@ def promote_from_model( elif model.gate_node is not None: flyte_gate_node = FlyteGateNode.promote_from_model(model.gate_node) elif model.array_node is not None: - # map over task - if model.array_node.node.task_node is not None: - if model.array_node.node.task_node.reference_id not in tasks: - raise RuntimeError( - f"Remote Workflow closure does not have task with id {model.array_node.node.task_node.reference_id}." - ) - flyte_array_node = FlyteArrayNode.promote_from_model( - model.array_node, - flyte_task_node=cls._promote_task_node(tasks[model.array_node.node.task_node.reference_id]), + if model.array_node.node is None: + raise _system_exceptions.FlyteSystemException( + f"Bad Node model, array node detected but no node specified, node: {model}" ) - # default case - else: - flyte_array_node = FlyteArrayNode.promote_from_model(model.array_node) + flyte_node, converted_sub_workflows = cls.promote_from_model( + model.array_node.node, + sub_workflows, + node_launch_plans, + tasks, + converted_sub_workflows, + ) + flyte_array_node = FlyteArrayNode.promote_from_model( + model.array_node, + flyte_node, + ) else: raise _system_exceptions.FlyteSystemException( f"Bad Node model, neither task nor workflow detected, node: {model}" diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 06426f9efd..2ff1a8bfb8 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2617,7 +2617,7 @@ def sync_node_execution( return execution # if there's a task node underneath the array node if execution._node.array_node.node.task_node is not None: - t = execution._node.flyte_entity.flyte_task_node.flyte_task + t = execution._node.flyte_entity.flyte_node.task_node.flyte_task execution._task_executions = [ self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t) for task_execution in iterate_task_executions(self.client, execution.id) From 32e4142102d3162e816eba4530b61e45499c50f4 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Thu, 20 Feb 2025 13:44:05 -0800 Subject: [PATCH 09/18] lint Signed-off-by: Troy Chiu --- flytekit/remote/entities.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index ae809d8264..7c6f72ec1b 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -354,7 +354,6 @@ def __init__( parallelism: int, min_successes: int, min_success_ratio: float, - ): super().__init__(flyte_node, parallelism, min_successes, min_success_ratio) self._flyte_node = flyte_node From b0406ffc0537d4b72d471894c2a43a5a4cdd6d4e Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Fri, 21 Feb 2025 14:12:02 -0800 Subject: [PATCH 10/18] wip Signed-off-by: Troy Chiu --- flytekit/models/admin/task_execution.py | 13 +++++++++ flytekit/models/event.py | 36 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 flytekit/models/event.py diff --git a/flytekit/models/admin/task_execution.py b/flytekit/models/admin/task_execution.py index 3eecad795e..2730db11fc 100644 --- a/flytekit/models/admin/task_execution.py +++ b/flytekit/models/admin/task_execution.py @@ -1,6 +1,7 @@ from flyteidl.admin import task_execution_pb2 as _task_execution_pb2 from flytekit.models import common as _common +from flytekit.models import event as _event from flytekit.models.core import execution as _execution from flytekit.models.core import identifier as _identifier @@ -16,6 +17,7 @@ def __init__( updated_at, output_uri=None, error=None, + metadata=None, ): """ :param int phase: Enum value from flytekit.models.core.execution.TaskExecutionPhase @@ -28,6 +30,7 @@ def __init__( literals. :param flytekit.models.core.execution.ExecutionError error: If task has failed and in terminal state, this will be set to the error encountered. + :param flytekit.models.event.TaskExecutionMetadata metadata: Metadata associated with the task execution. """ self._phase = phase self._logs = logs @@ -37,6 +40,7 @@ def __init__( self._updated_at = updated_at self._output_uri = output_uri self._error = error + self._metadata = metadata @property def phase(self): @@ -95,6 +99,13 @@ def error(self): """ return self._error + @property + def metadata(self): + """ + :rtype: flytekit.models.event.TaskExecutionMetadata + """ + return self._metadata + def to_flyte_idl(self): """ :rtype: flyteidl.admin.task_execution_pb2.TaskExecutionClosure @@ -104,6 +115,7 @@ def to_flyte_idl(self): logs=[l.to_flyte_idl() for l in self.logs], output_uri=self.output_uri, error=self.error.to_flyte_idl() if self.error is not None else None, + metadata=self.metadata.to_flyte_idl() if self.metadata is not None else None, ) p.started_at.FromDatetime(self.started_at) p.created_at.FromDatetime(self.created_at) @@ -126,6 +138,7 @@ def from_flyte_idl(cls, p): created_at=p.created_at.ToDatetime(), updated_at=p.updated_at.ToDatetime(), duration=p.duration.ToTimedelta(), + metadata=_event.TaskExecutionMetadata.from_flyte_idl(p.metadata) if p.HasField("metadata") else None, ) diff --git a/flytekit/models/event.py b/flytekit/models/event.py new file mode 100644 index 0000000000..00d8f65108 --- /dev/null +++ b/flytekit/models/event.py @@ -0,0 +1,36 @@ +from flyteidl.event import event_pb2 as _event_pb2 + +from flytekit.models import common as _common + + +class TaskExecutionMetadata(_common.FlyteIdlEntity): + def __init__(self, external_resources): + """ + :param google.protobuf.internal.containers.RepeatedCompositeFieldContainer external_resources: + """ + self._external_resources = external_resources + + @property + def external_resources(self): + """ + :rtype: google.protobuf.internal.containers.RepeatedCompositeFieldContainer + """ + return self._external_resources + + def to_flyte_idl(self): + """ + :rtype: flyteidl.event.TaskExecutionMetadata + """ + return _event_pb2.TaskExecutionMetadata( + external_resources=self._external_resources, + ) + + @classmethod + def from_flyte_idl(cls, proto): + """ + :param flyteidl.event.TaskExecutionMetadata proto: + :rtype: TaskExecutionMetadata + """ + return cls( + external_resources=proto.external_resources, + ) From 76d159a5832cbd80abd93e4e4c4fd0693b1ce45f Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Fri, 21 Feb 2025 14:12:35 -0800 Subject: [PATCH 11/18] wip Signed-off-by: Troy Chiu --- flytekit/models/core/workflow.py | 3 ++- flytekit/remote/entities.py | 6 +----- flytekit/remote/remote.py | 35 +++++++++++++++++++------------- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/flytekit/models/core/workflow.py b/flytekit/models/core/workflow.py index 4b9fd8d856..f9ed52e38e 100644 --- a/flytekit/models/core/workflow.py +++ b/flytekit/models/core/workflow.py @@ -12,6 +12,7 @@ from flytekit.models import interface as _interface from flytekit.models import types as type_models from flytekit.models.core import condition as _condition +from flytekit.models.core import identifier from flytekit.models.core import identifier as _identifier from flytekit.models.literals import Binding as _Binding from flytekit.models.literals import RetryStrategy as _RetryStrategy @@ -744,7 +745,7 @@ def __init__(self, launchplan_ref=None, sub_workflow_ref=None): self._sub_workflow_ref = sub_workflow_ref @property - def launchplan_ref(self): + def launchplan_ref(self) -> identifier.Identifier: """ [Optional] A globally unique identifier for the launch plan. Should map to Admin. diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index 9dde4291bf..16c16eedd0 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -362,16 +362,12 @@ def __init__( def flyte_node(self) -> FlyteNode: return self._flyte_node - @property - def flyte_workflow_node(self): - return self._flyte_workflow_node - @classmethod def promote_from_model( cls, model: _workflow_model.ArrayNode, flyte_node: FlyteNode, - ): + ) -> FlyteArrayNode: return cls( flyte_node=flyte_node, parallelism=model._parallelism, diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 725cb585f0..4504656947 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -498,7 +498,11 @@ def find_launch_plan( find_launch_plan(lp_ref, node_launch_plans) # Inspect array nodes for launch plans - if node.array_node is not None and node.array_node.node.workflow_node is not None and node.array_node.node.workflow_node.launchplan_ref is not None: + if ( + node.array_node is not None + and node.array_node.node.workflow_node is not None + and node.array_node.node.workflow_node.launchplan_ref is not None + ): lp_ref = node.array_node.node.workflow_node.launchplan_ref find_launch_plan(lp_ref, node_launch_plans) @@ -2624,7 +2628,7 @@ def sync_node_execution( if execution._node.array_node.node.task_node is not None: t = execution._node.flyte_entity.flyte_node.task_node.flyte_task execution._task_executions = [ - self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t) + self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t.interface) for task_execution in iterate_task_executions(self.client, execution.id) ] if t.interface: @@ -2633,15 +2637,17 @@ def sync_node_execution( logger.error(f"Fetched map task does not have an interface, skipping i/o {t}") return execution elif execution._node.array_node.node.workflow_node is not None: - breakpoint() - sub_flyte_workflow = execution._node.flyte_entity.flyte_workflow_node - launched_exec_id = execution.closure.workflow_node_metadata.execution_id - launched_exec = self.fetch_execution( - project=launched_exec_id.project, domain=launched_exec_id.domain, name=launched_exec_id.name + launch_plan_id = execution._node.array_node.node.workflow_node.launchplan_ref + launch_plan = self.fetch_launch_plan( + launch_plan_id.project, launch_plan_id.domain, launch_plan_id.name, launch_plan_id.version ) - self.sync_execution(launched_exec) - execution._workflow_executions.append(launched_exec) - execution._interface = launched_exec._flyte_workflow.interface + execution._task_executions = [ + self.sync_task_execution( + FlyteTaskExecution.promote_from_model(task_execution), launch_plan.interface + ) + for task_execution in iterate_task_executions(self.client, execution.id) + ] + execution._interface = launch_plan.interface return execution else: logger.error("Array node not over task, skipping i/o") @@ -2656,7 +2662,7 @@ def sync_node_execution( else: execution._task_executions = [ self.sync_task_execution( - FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task + FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task.interface ) for t in iterate_task_executions(self.client, execution.id) ] @@ -2671,15 +2677,16 @@ def sync_node_execution( return execution def sync_task_execution( - self, execution: FlyteTaskExecution, entity_definition: typing.Optional[FlyteTask] = None + self, execution: FlyteTaskExecution, entity_interface: typing.Optional[TypedInterface] = None ) -> FlyteTaskExecution: """Sync a FlyteTaskExecution object with its corresponding remote state.""" execution._closure = self.client.get_task_execution(execution.id).closure execution_data = self.client.get_task_execution_data(execution.id) task_id = execution.id.task_id - if entity_definition is None: + if entity_interface is None: entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version) - return self._assign_inputs_and_outputs(execution, execution_data, entity_definition.interface) + entity_interface = entity_definition.interface + return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) ############################# # Terminate Execution State # From 686a7c972f410c58cade50eeb57180233aa165d3 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Fri, 21 Feb 2025 15:23:52 -0800 Subject: [PATCH 12/18] use interface to list Signed-off-by: Troy Chiu --- flytekit/remote/remote.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 380badd98e..067ec5b779 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -2641,13 +2641,14 @@ def sync_node_execution( launch_plan = self.fetch_launch_plan( launch_plan_id.project, launch_plan_id.domain, launch_plan_id.name, launch_plan_id.version ) + task_execution_interface = launch_plan.interface.transform_interface_to_list() execution._task_executions = [ self.sync_task_execution( - FlyteTaskExecution.promote_from_model(task_execution), launch_plan.interface + FlyteTaskExecution.promote_from_model(task_execution), task_execution_interface ) for task_execution in iterate_task_executions(self.client, execution.id) ] - execution._interface = launch_plan.interface + execution._interface = task_execution_interface return execution else: logger.error("Array node not over task, skipping i/o") From e27ddae724d4908d574e7e405562661b011831a6 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Sat, 22 Feb 2025 15:35:00 -0800 Subject: [PATCH 13/18] add tests Signed-off-by: Troy Chiu --- .../integration/remote/test_remote.py | 16 ++++++++++++++ .../remote/workflows/basic/array_map_lp.py | 21 +++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 tests/flytekit/integration/remote/workflows/basic/array_map_lp.py diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 51e79ff0e9..8dfb6b4b4b 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -681,6 +681,22 @@ def test_execution_workflow_with_maptask_in_dynamic(register): assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1 +def test_execution_workflow_with_maptask_over_lp(register): + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + d: typing.List[int] = [1, 2, 3] + flyte_launch_plan = remote.fetch_launch_plan(name="basic.dynamic_array_map.workflow_with_map_over_lp", version=VERSION) + execution = remote.execute( + flyte_launch_plan, + inputs={"data": d}, + version=VERSION, + wait=True, + ) + assert execution.outputs["o0"] == [2, 3, 4] + assert "n0" in execution.node_executions + assert len(execution.node_executions["n0"].task_executions) == 1 + assert execution.node_executions["n0"].task_executions[0].inputs == {"data": d} + assert execution.node_executions["n0"].task_executions[0].outputs == {"o0": [2, 3, 4]} + def test_executes_nested_workflow_dictating_interruptible(register): remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) diff --git a/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py b/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py new file mode 100644 index 0000000000..2c058ee0d7 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py @@ -0,0 +1,21 @@ +import typing +from flytekit import map_task, task, workflow, LaunchPlan + + +@task +def fn(x: int) -> int: + return x + 1 + + +@workflow +def test_map_over_lp_wf(x: int) -> int: + return fn(x=x) + +lp = LaunchPlan.get_or_create( + workflow=test_map_over_lp_wf, + name="test_map_over_lp_wf_lp", +) + +@workflow +def workflow_with_map_over_lp(data: typing.List[int]) -> typing.List[int]: + return map_task(lp)(x=data) \ No newline at end of file From 337fb241b6d8c0cbcb42b0ab0eab2af3f7ec59b5 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Sat, 22 Feb 2025 15:37:27 -0800 Subject: [PATCH 14/18] lint Signed-off-by: Troy Chiu --- .../flytekit/integration/remote/workflows/basic/array_map_lp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py b/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py index 2c058ee0d7..33af49261d 100644 --- a/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py +++ b/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py @@ -18,4 +18,4 @@ def test_map_over_lp_wf(x: int) -> int: @workflow def workflow_with_map_over_lp(data: typing.List[int]) -> typing.List[int]: - return map_task(lp)(x=data) \ No newline at end of file + return map_task(lp)(x=data) From 10a92e141016f8bebaa58b0750c845d000ba8711 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Mon, 24 Feb 2025 10:47:24 -0800 Subject: [PATCH 15/18] nit Signed-off-by: Troy Chiu --- .../integration/remote/test_remote.py | 16 -------------- .../remote/workflows/basic/array_map_lp.py | 21 ------------------- 2 files changed, 37 deletions(-) delete mode 100644 tests/flytekit/integration/remote/workflows/basic/array_map_lp.py diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 8dfb6b4b4b..51e79ff0e9 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -681,22 +681,6 @@ def test_execution_workflow_with_maptask_in_dynamic(register): assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1 -def test_execution_workflow_with_maptask_over_lp(register): - remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) - d: typing.List[int] = [1, 2, 3] - flyte_launch_plan = remote.fetch_launch_plan(name="basic.dynamic_array_map.workflow_with_map_over_lp", version=VERSION) - execution = remote.execute( - flyte_launch_plan, - inputs={"data": d}, - version=VERSION, - wait=True, - ) - assert execution.outputs["o0"] == [2, 3, 4] - assert "n0" in execution.node_executions - assert len(execution.node_executions["n0"].task_executions) == 1 - assert execution.node_executions["n0"].task_executions[0].inputs == {"data": d} - assert execution.node_executions["n0"].task_executions[0].outputs == {"o0": [2, 3, 4]} - def test_executes_nested_workflow_dictating_interruptible(register): remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) diff --git a/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py b/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py deleted file mode 100644 index 33af49261d..0000000000 --- a/tests/flytekit/integration/remote/workflows/basic/array_map_lp.py +++ /dev/null @@ -1,21 +0,0 @@ -import typing -from flytekit import map_task, task, workflow, LaunchPlan - - -@task -def fn(x: int) -> int: - return x + 1 - - -@workflow -def test_map_over_lp_wf(x: int) -> int: - return fn(x=x) - -lp = LaunchPlan.get_or_create( - workflow=test_map_over_lp_wf, - name="test_map_over_lp_wf_lp", -) - -@workflow -def workflow_with_map_over_lp(data: typing.List[int]) -> typing.List[int]: - return map_task(lp)(x=data) From af9829e52c7a2c0194e087d1f676f71203c32039 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Mon, 24 Feb 2025 11:37:00 -0800 Subject: [PATCH 16/18] expose external resources Signed-off-by: Troy Chiu --- flytekit/models/core/workflow.py | 3 +-- flytekit/models/event.py | 2 +- flytekit/remote/entities.py | 2 +- flytekit/remote/remote.py | 34 +++++--------------------------- 4 files changed, 8 insertions(+), 33 deletions(-) diff --git a/flytekit/models/core/workflow.py b/flytekit/models/core/workflow.py index f9ed52e38e..4b9fd8d856 100644 --- a/flytekit/models/core/workflow.py +++ b/flytekit/models/core/workflow.py @@ -12,7 +12,6 @@ from flytekit.models import interface as _interface from flytekit.models import types as type_models from flytekit.models.core import condition as _condition -from flytekit.models.core import identifier from flytekit.models.core import identifier as _identifier from flytekit.models.literals import Binding as _Binding from flytekit.models.literals import RetryStrategy as _RetryStrategy @@ -745,7 +744,7 @@ def __init__(self, launchplan_ref=None, sub_workflow_ref=None): self._sub_workflow_ref = sub_workflow_ref @property - def launchplan_ref(self) -> identifier.Identifier: + def launchplan_ref(self): """ [Optional] A globally unique identifier for the launch plan. Should map to Admin. diff --git a/flytekit/models/event.py b/flytekit/models/event.py index 00d8f65108..5cbf25d800 100644 --- a/flytekit/models/event.py +++ b/flytekit/models/event.py @@ -28,7 +28,7 @@ def to_flyte_idl(self): @classmethod def from_flyte_idl(cls, proto): """ - :param flyteidl.event.TaskExecutionMetadata proto: + :param flyteidl.event.event_pb2.TaskExecutionMetadata proto: :rtype: TaskExecutionMetadata """ return cls( diff --git a/flytekit/remote/entities.py b/flytekit/remote/entities.py index 16c16eedd0..7c6f72ec1b 100644 --- a/flytekit/remote/entities.py +++ b/flytekit/remote/entities.py @@ -367,7 +367,7 @@ def promote_from_model( cls, model: _workflow_model.ArrayNode, flyte_node: FlyteNode, - ) -> FlyteArrayNode: + ): return cls( flyte_node=flyte_node, parallelism=model._parallelism, diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 067ec5b779..804326f79e 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -497,15 +497,6 @@ def find_launch_plan( lp_ref = node.workflow_node.launchplan_ref find_launch_plan(lp_ref, node_launch_plans) - # Inspect array nodes for launch plans - if ( - node.array_node is not None - and node.array_node.node.workflow_node is not None - and node.array_node.node.workflow_node.launchplan_ref is not None - ): - lp_ref = node.array_node.node.workflow_node.launchplan_ref - find_launch_plan(lp_ref, node_launch_plans) - # Inspect conditional branch nodes for launch plans def get_launch_plan_from_branch( branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec] @@ -2628,7 +2619,7 @@ def sync_node_execution( if execution._node.array_node.node.task_node is not None: t = execution._node.flyte_entity.flyte_node.task_node.flyte_task execution._task_executions = [ - self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t.interface) + self.sync_task_execution(FlyteTaskExecution.promote_from_model(task_execution), t) for task_execution in iterate_task_executions(self.client, execution.id) ] if t.interface: @@ -2636,20 +2627,6 @@ def sync_node_execution( else: logger.error(f"Fetched map task does not have an interface, skipping i/o {t}") return execution - elif execution._node.array_node.node.workflow_node is not None: - launch_plan_id = execution._node.array_node.node.workflow_node.launchplan_ref - launch_plan = self.fetch_launch_plan( - launch_plan_id.project, launch_plan_id.domain, launch_plan_id.name, launch_plan_id.version - ) - task_execution_interface = launch_plan.interface.transform_interface_to_list() - execution._task_executions = [ - self.sync_task_execution( - FlyteTaskExecution.promote_from_model(task_execution), task_execution_interface - ) - for task_execution in iterate_task_executions(self.client, execution.id) - ] - execution._interface = task_execution_interface - return execution else: logger.error("Array node not over task, skipping i/o") return execution @@ -2663,7 +2640,7 @@ def sync_node_execution( else: execution._task_executions = [ self.sync_task_execution( - FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task.interface + FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task ) for t in iterate_task_executions(self.client, execution.id) ] @@ -2678,16 +2655,15 @@ def sync_node_execution( return execution def sync_task_execution( - self, execution: FlyteTaskExecution, entity_interface: typing.Optional[TypedInterface] = None + self, execution: FlyteTaskExecution, entity_definition: typing.Optional[FlyteTask] = None ) -> FlyteTaskExecution: """Sync a FlyteTaskExecution object with its corresponding remote state.""" execution._closure = self.client.get_task_execution(execution.id).closure execution_data = self.client.get_task_execution_data(execution.id) task_id = execution.id.task_id - if entity_interface is None: + if entity_definition is None: entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version) - entity_interface = entity_definition.interface - return self._assign_inputs_and_outputs(execution, execution_data, entity_interface) + return self._assign_inputs_and_outputs(execution, execution_data, entity_definition.interface) ############################# # Terminate Execution State # From 2421361607c60c30c6e877ae012f1f1be3eee452 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Mon, 24 Feb 2025 11:48:13 -0800 Subject: [PATCH 17/18] lint Signed-off-by: Troy Chiu --- flytekit/models/event.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flytekit/models/event.py b/flytekit/models/event.py index 5cbf25d800..cb4b6e44fd 100644 --- a/flytekit/models/event.py +++ b/flytekit/models/event.py @@ -4,10 +4,11 @@ class TaskExecutionMetadata(_common.FlyteIdlEntity): - def __init__(self, external_resources): - """ - :param google.protobuf.internal.containers.RepeatedCompositeFieldContainer external_resources: - """ + """ + :param google.protobuf.internal.containers.RepeatedCompositeFieldContainer external_resources: + """ + + def __init__(self, external_resources=None): self._external_resources = external_resources @property From 14cb643ef9c6cb1eb1daf01edc39ca5dd120e5e1 Mon Sep 17 00:00:00 2001 From: Troy Chiu Date: Mon, 24 Feb 2025 11:57:41 -0800 Subject: [PATCH 18/18] add tests Signed-off-by: Troy Chiu --- tests/flytekit/integration/remote/test_remote.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 51e79ff0e9..58eaa5b56b 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -664,6 +664,9 @@ def test_execute_workflow_with_maptask(register): ) assert execution.outputs["o0"] == [4, 5, 6] assert len(execution.node_executions["n0"].task_executions) == 1 + assert len(execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources) == len(d) + for i in range(len(d)): + assert execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED def test_execution_workflow_with_maptask_in_dynamic(register): remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) @@ -680,6 +683,9 @@ def test_execution_workflow_with_maptask_in_dynamic(register): assert execution.node_executions["n0"].subworkflow_node_executions is not None assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1 + assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources) == len(d) + for i in range(len(d)): + assert execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED def test_executes_nested_workflow_dictating_interruptible(register):