Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flyteremote to access subnodes information of array nodes #3152

Merged
merged 22 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions flytekit/models/admin/task_execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flyteidl.admin import task_execution_pb2 as _task_execution_pb2

from flytekit.models import common as _common
from flytekit.models import event as _event
from flytekit.models.core import execution as _execution
from flytekit.models.core import identifier as _identifier

Expand All @@ -16,6 +17,7 @@ def __init__(
updated_at,
output_uri=None,
error=None,
metadata=None,
):
"""
:param int phase: Enum value from flytekit.models.core.execution.TaskExecutionPhase
Expand All @@ -28,6 +30,7 @@ def __init__(
literals.
:param flytekit.models.core.execution.ExecutionError error: If task has failed and in terminal state, this will
be set to the error encountered.
:param flytekit.models.event.TaskExecutionMetadata metadata: Metadata associated with the task execution.
"""
self._phase = phase
self._logs = logs
Expand All @@ -37,6 +40,7 @@ def __init__(
self._updated_at = updated_at
self._output_uri = output_uri
self._error = error
self._metadata = metadata

@property
def phase(self):
Expand Down Expand Up @@ -95,6 +99,13 @@ def error(self):
"""
return self._error

@property
def metadata(self):
"""
:rtype: flytekit.models.event.TaskExecutionMetadata
"""
return self._metadata

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.task_execution_pb2.TaskExecutionClosure
Expand All @@ -104,6 +115,7 @@ def to_flyte_idl(self):
logs=[l.to_flyte_idl() for l in self.logs],
output_uri=self.output_uri,
error=self.error.to_flyte_idl() if self.error is not None else None,
metadata=self.metadata.to_flyte_idl() if self.metadata is not None else None,
)
p.started_at.FromDatetime(self.started_at)
p.created_at.FromDatetime(self.created_at)
Expand All @@ -126,6 +138,7 @@ def from_flyte_idl(cls, p):
created_at=p.created_at.ToDatetime(),
updated_at=p.updated_at.ToDatetime(),
duration=p.duration.ToTimedelta(),
metadata=_event.TaskExecutionMetadata.from_flyte_idl(p.metadata) if p.HasField("metadata") else None,
)


Expand Down
37 changes: 37 additions & 0 deletions flytekit/models/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from flyteidl.event import event_pb2 as _event_pb2

from flytekit.models import common as _common


class TaskExecutionMetadata(_common.FlyteIdlEntity):
"""
:param google.protobuf.internal.containers.RepeatedCompositeFieldContainer external_resources:
"""

def __init__(self, external_resources=None):
self._external_resources = external_resources

@property
def external_resources(self):
"""
:rtype: google.protobuf.internal.containers.RepeatedCompositeFieldContainer
"""
return self._external_resources

def to_flyte_idl(self):
"""
:rtype: flyteidl.event.TaskExecutionMetadata
"""
return _event_pb2.TaskExecutionMetadata(
external_resources=self._external_resources,
)

@classmethod
def from_flyte_idl(cls, proto):
"""
:param flyteidl.event.event_pb2.TaskExecutionMetadata proto:
:rtype: TaskExecutionMetadata
"""
return cls(
external_resources=proto.external_resources,
)
6 changes: 6 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ def test_execute_workflow_with_maptask(register):
)
assert execution.outputs["o0"] == [4, 5, 6]
assert len(execution.node_executions["n0"].task_executions) == 1
assert len(execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources) == len(d)
for i in range(len(d)):
assert execution.node_executions["n0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED

def test_execution_workflow_with_maptask_in_dynamic(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
Expand All @@ -680,6 +683,9 @@ def test_execution_workflow_with_maptask_in_dynamic(register):
assert execution.node_executions["n0"].subworkflow_node_executions is not None
assert "n0-0-dn0" in execution.node_executions["n0"].subworkflow_node_executions
assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions) == 1
assert len(execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources) == len(d)
for i in range(len(d)):
assert execution.node_executions["n0"].subworkflow_node_executions["n0-0-dn0"].task_executions[0].closure.metadata.external_resources[i].phase == 3 # SUCCEEDED


def test_executes_nested_workflow_dictating_interruptible(register):
Expand Down
Loading