Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add static and runtime dag info, API to fetch ancestor and successor tasks #2124

Open
wants to merge 34 commits into
base: master
Choose a base branch
from

Conversation

talsperre
Copy link
Collaborator

@talsperre talsperre commented Oct 31, 2024

Add runtime DAG info so that we can query the ancestor and successor tasks for a given task easily.

Usage

from metaflow import Task, namespace
namespace(None)
task = Task('RuntimeDAGFlow/18/step_c/32076012', attempt=0)

To get ancestors, progenies, and siblings, use the following API:

ancestors = task.ancestors
successors = task.successors

The output would be a list of metaflow Task objects.

@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch from 48c771d to ec43f14 Compare November 1, 2024 18:34
Comment on lines 675 to 690
@classmethod
def _filter_tasks_by_metadata(
cls, flow_id, run_id, query_step, field_name, field_value
):
raise NotImplementedError()

@classmethod
def filter_tasks_by_metadata(
cls, flow_id, run_id, query_step, field_name, field_value
):
# TODO: Do we need to do anything wrt to task attempt?
task_ids = cls._filter_tasks_by_metadata(
flow_id, run_id, query_step, field_name, field_value
)
return task_ids

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need for the private method, or could this simply be contained in the public-facing one? right now its not doing anything before calling the private one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, did you have an implementation of this for service.py yet?

def filter_tasks_by_metadata(
cls, flow_id, run_id, query_step, field_name, field_value
):
# TODO: Do we need to do anything wrt to task attempt?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not, as the ancestors for task attempts should be identical, right? What about the immediate_siblings though, will they include or exclude attempts of the same task?

@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch from ffbf68a to c6fb9ac Compare January 2, 2025 23:25
@talsperre talsperre changed the title Add static and runtime dag info, API to fetch ancestor tasks Add static and runtime dag info, API to fetch ancestor and successor tasks Jan 7, 2025
@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch 2 times, most recently from d66d32b to 7644058 Compare January 12, 2025 03:12
Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments. I think it's pretty close though. I haven't looked at hte metadata service changes. We may also want to raise a better error message if the service is not new enough?

run_id: str,
cur_foreach_stack_len: int,
steps: List[str],
query_type: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would just use a boolean. Something like is_ancestor. These are internal functions anyways and slightly more efficient to use bools :)

metaflow/client/core.py Outdated Show resolved Hide resolved
if query_foreach_stack_len == cur_foreach_stack_len:
# The successor or ancestor tasks belong to the same foreach stack level
field_name = "foreach-indices"
field_value = self.metadata_dict.get(field_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't currently cache metadata_dict so either we could fix that or cache it here to avoid making multiple calls to the metadata service and then sorts. It would need to be cached across _get_related_tasks and this function.

# Current Task: foreach-indices = [0, 1, 2], foreach-indices-truncated = [0, 1]
# Ancestor Task: foreach-indices = [0, 1], foreach-indices-truncated = [0]
# We will compare the foreach-indices value of ancestor task with the
# foreach-indices value of current task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: foreach-indices-truncated value of the current task

return field_name, field_value

def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]:
start_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used -- can strip or use.

@@ -248,8 +248,7 @@
# Default container registry
DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY")
# Controls whether to include foreach stack information in metadata.
# TODO(Darin, 05/01/24): Remove this flag once we are confident with this feature.
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", False)
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably change this at some point and remove it to not make it optional anymore.

# Filter tasks based on metadata
for task in tasks:
task_id = task.get("task_id")
if not task_id:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this happen? Also, task_id of zero is valid iirc.

# and the artifact files are saved as: <attempt>_artifact__<artifact_name>.json
# We loop over all the JSON files in the directory and find the latest one
# that matches the field prefix.
json_files = glob.glob(os.path.join(path, "*.json"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to do a more efficient globbing so we don't have to filter by field_prefix later on. SOmething like f"{field_prefix}*.json".

metaflow/task.py Outdated
type="foreach-indices-truncated",
tags=metadata_tags,
),
MetaDatum(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is only used in the siblings thing. If that's the case, we may be able to get rid of this when we refactor the siblings thing (if we do that). I am also a little confused as to why this is needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor siblings function mostly to return siblings irrespective of whether it is in a for each or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- I think we can now get rid of this metadatum then right?

metaflow/task.py Outdated
tags=metadata_tags,
),
MetaDatum(
field="previous_steps",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consistency here between previous_steps and foreach-indices for example.

}
url = ServiceMetadataProvider._obj_path(flow_id, run_id, query_step)
url = f"{url}/tasks?{urlencode(query_params)}"
return cls._request(cls._monitor, url, "GET")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting an error with this that cls does not have _monitor. All other calls to _request pass in None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will pass in None simply.

"query_step": query_step,
}
url = ServiceMetadataProvider._obj_path(flow_id, run_id, query_step)
url = f"{url}/tasks?{urlencode(query_params)}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing import for urlencode. f-strings are probably fine by 2025, as we've gotten rid of the older tests that break with them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I was wondering about fstrings but did check and our official minimum version is 3.6 which supports it -- and yes, let's move at least a tad into the future :). I'm going to start using them too and the code will slowly migrate to it (and become infinitesimally faster :) )

@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch from 17a4489 to 7cdfb41 Compare January 15, 2025 00:53
m.name: m.value
for m in sorted(self.metadata, key=lambda m: m.created_at)
}
return self._metadata_dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this slightly changes the syntax since now if there is new metadata, the user won't get it. Should check if this impacts other operations. Or scope the caching to just the functions that need it.


def _get_related_tasks(self, is_ancestor: bool) -> Dict[str, List[str]]:
flow_id, run_id, _, _ = self.path_components
steps = (
Copy link
Collaborator

@saikonen saikonen Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data type problem here which leads to the queries not working correctly:
the steps ends up being of type str on OSS metadata-service, so using these you end up iterating over characters instead of step names, e.g.:

/flows/SplitFlow/runs/63/steps/{/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=%7B
/flows/SplitFlow/runs/63/steps/e/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=e
/flows/SplitFlow/runs/63/steps/n/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=n
/flows/SplitFlow/runs/63/steps/d/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=d
/flows/SplitFlow/runs/63/steps/}/filtered_tasks?metadata_field_name=foreach-indices&metadata_field_value=%7B%7D&query_step=%7D

@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch from a8df33d to 7833e40 Compare January 22, 2025 08:30
Copy link
Collaborator

@savingoyal savingoyal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick UX feedback - let me know if in the new proposed UX we miss out on any use cases. i am reviewing the rest of the PR meanwhile.

}

@property
def immediate_ancestors(self) -> Dict[str, List[str]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we offer a property parents which simply returns a list of task pathspecs. it will return Nonefor the start step. an open question is if we would want to also offer parent - maybe we can cross that bridge as a follow up PR if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the consistency guarantees offered by this property. do we expect that this property will return an immutable set of parents as soon as this task is registered?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also - we are assuming that return values here point to the latest successful attempt. might be good to note this in the doc string.

return self._get_related_tasks(is_ancestor=True)

@property
def immediate_successors(self) -> Dict[str, List[str]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we offer a property children which simply returns a list of task pathspecs. it will return None for the end step. similar comment for child as for parent

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the expected behavior of this property? as soon as children task are registered, do we start updating the children property? also, when do we know that there are not going to be any more children tasks?

return self._get_related_tasks(is_ancestor=False)

@property
def siblings(self) -> Dict[str, List[str]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be siblings(self, ancestors) - where ancestors is a list of task pathspec and defaults to task.parents?

metaflow/task.py Outdated
@@ -493,6 +512,36 @@ def run_step(
)
)

# Add runtime dag info - for a nested foreach this may look like:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in terms of metadata that needs to be stored - we could simply stringify the stack and store it - step1:0,step2:1... - and that should be it, no? a prefix query should be fast enough

metaflow/client/core.py Outdated Show resolved Hide resolved
metaflow/client/core.py Outdated Show resolved Hide resolved
return Step(f"{flow_id}/{run_id}/{query_step}", _namespace_check=False).task

@property
def _graph_info(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to expose _graph_info, ancestor_steps or successor_steps to the end user?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is my understanding correct that _graph_info is introduced to support static dag info? if so - a better ux would be to introduce parent_steps and child_steps in the step object instead of exposing _graph_info as is. we treat _graph_info as a special escape hatch - so best to not make it formal at the moment and lose some much needed flexibility around it's structure.

metaflow/task.py Show resolved Hide resolved
metaflow/task.py Outdated
@@ -493,6 +506,20 @@ def run_step(
)
)

# Add runtime dag info - for a nested foreach this may look like:
# foreach_indices: "step1:idx1,step2:idx2,step3:idx3"
foreach_indices = self._dynamic_runtime_metadata(foreach_stack)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use a different term for this field - it's a stack of foreach_indices - maybe - foreach_execution_path

@@ -301,7 +303,7 @@ def __init__(
# distinguish between "attempt will happen" and "no such
# attempt exists".

if pathspec:
if _use_pathspec and pathspec:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.ancestors[0].system_tags is unfortunately always empty and different that parent_task.system_tags - it might be better to just yield the full task objects instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could fetch it once for the whole run and use it?

ancestor_pathspecs = set([task.pathspec for task in ancestors])

# Compare with stored parent_task_pathspecs
task_pathspec = task.data.task_pathspec
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check not just the pathspec but that the contents of the Task object are the same.

List["Task"]
List of all ancestor tasks of the current task.
"""
return self._get_related_tasks(is_ancestor=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        steps = self.ancestor_steps
        if not steps:
            return

        if len(steps) > 1:
            # Static join - use exact path matching
            pattern = self.metadata_dict.get("foreach-indices", ".*")
        else:
            # Foreach join - match tasks with shorter foreach path
            current_path = self.metadata_dict.get("foreach-indices", "")
            if not current_path:
                pattern = ".*"
            else:
                target_task = Step(f"{self.flow_id}/{self.run_id}/{steps[0]}", _namespace_check=False).task
                target_depth = len(target_task.metadata_dict.get("foreach-indices", "").split(","))
                pattern = ",".join(current_path.split(",")[:target_depth])
            
        yield from self._iter_matching_tasks(steps, pattern)


@property
def successors(self) -> List["Task"]:
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        steps = self.parent.child_steps
        if not steps:
            return
            
        if len(steps) > 1:
            # Static split - use exact path matching
            pattern = self.metadata_dict.get("foreach-indices", ".*")
        else:
            # Foreach split - match tasks with longer foreach path
            current_path = self.metadata_dict.get("foreach-indices", "")
            pattern = f"{current_path},.*" if current_path else ".*"
            
        yield from self._iter_matching_tasks(steps, pattern)

_, _, step_name, _ = self.path_components
return self._graph_info[step_name]["next"]

def _get_metadata_query_vals(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  def _iter_matching_tasks(self, steps, pattern):
        """
        Yield tasks from specified steps matching a foreach path pattern.

        Parameters
        ----------
        steps : List[str]
            List of step names to search for tasks
        pattern : str
            Regex pattern to match foreach-indices metadata

        Returns
        -------
        Iterator[Task]
            Tasks matching the foreach path pattern
        """
        flow_id, run_id, _, _ = self.path_components
        
        for step in steps:
            task_ids = self._metaflow.metadata.filter_tasks_by_metadata(
                flow_id, run_id, step, "foreach-indices", pattern
            )
            for task_id in task_ids:
                yield Task(
                    pathspec="%s/%s/%s/%s" % (flow_id, run_id, step, task_id),
                    _namespace_check=False
                )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why couldn't the filter_task_by_metadata return all the info we need to form the task? It seems it would save a few RT calls right?

Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a review -- just some comments.

@@ -301,7 +303,7 @@ def __init__(
# distinguish between "attempt will happen" and "no such
# attempt exists".

if pathspec:
if _use_pathspec and pathspec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could fetch it once for the whole run and use it?

_, _, step_name, _ = self.path_components
return self._graph_info[step_name]["next"]

def _get_metadata_query_vals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why couldn't the filter_task_by_metadata return all the info we need to form the task? It seems it would save a few RT calls right?

@talsperre talsperre force-pushed the dev/add-runtime-dag-info branch from 75c1301 to bc9e456 Compare February 12, 2025 11:49
target_task = Step(
f"{flow_id}/{run_id}/{steps[0]}", _namespace_check=False
).task
target_path = target_task.metadata_dict.get("foreach-execution-path", "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - just target_task.metadata_dict.get("foreach-execution-path")

if not steps:
return

current_path = self.metadata_dict.get("foreach-execution-path", "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - just self.metadata_dict.get("foreach-execution-path") - just to guard against an eventuality where foreach-execution-path set to empty starts having a meaning.

yield Task(pathspec=task_pathspec, _namespace_check=False)

@property
def parent_tasks(self) -> List["Task"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to handle the case where the user is inspecting a task that ran using an old version of metaflow or is using an old version of the service...

@@ -1123,6 +1123,139 @@ def _iter_filter(self, x):
# exclude private data artifacts
return x.id[0] != "_"

def _iter_matching_tasks(self, steps, pattern):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit - _iter_matching_tasks(self, steps, metadata_key, metadata_pattern)

def parent_tasks(self) -> List["Task"]:
"""
Returns a list of all parent tasks of the current task for the latest successful
attempt.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parents should be the same across attempts?

cls,
flow_id: str,
run_id: str,
query_step: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - step_name

if not task_id:
continue

task_name = task.get("task_name")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious - why task_name?

@@ -493,6 +504,19 @@ def run_step(
)
)

# Add runtime dag information to the metadata of the task
foreach_execution_path = self._dynamic_runtime_metadata(foreach_stack)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment still applies

from metaflow_test import MetaflowTest, ExpectationFailed, steps


class ChildrenTest(MetaflowTest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can combine these two tests together.

child_steps = task.parent.child_steps

for child_task in child_tasks:
assert task.pathspec in child_task.data.parent_pathspecs, (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you verify other properties as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants