Skip to content

Commit

Permalink
Add API for getting closest siblings
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Oct 31, 2024
1 parent 829f544 commit 48c771d
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,17 +1241,37 @@ def _successor_task(flow_id, run_id, successor_step):
)
return successor_iters

# def closest_siblings(self) -> Iterator["Task"]:
# """
# Returns an iterator over the closest siblings of this task.
#
# Returns
# -------
# Iterator[Task]
# Iterator over the closest siblings of this task
# """
# flow_id, run_id, step_name, task_id = self.path_components
# print(f"flow_id: {flow_id}, run_id: {run_id}, step_name: {step_name}, task_id: {task_id}")
def closest_siblings(self) -> Dict[str, List[str]]:
"""
Returns a dictionary of closest siblings of this task for each step.
Returns
-------
Dict[str, List[str]]
Dictionary of closest siblings of this task. The keys are the
names of the current step and the values are the corresponding
task ids of the siblings.
"""
flow_id, run_id, step_name, task_id = self.path_components

foreach_stack = self.metadata_dict.get("foreach-stack", [])
foreach_step_names = self.metadata_dict.get("foreach-step-names", [])
if len(foreach_stack) == 0:
raise MetaflowInternalError("Task is not part of any foreach split")
elif step_name != foreach_step_names[-1]:
raise MetaflowInternalError(
f"Step {step_name} does not have any direct siblings since it is not part "
f"of a new foreach split."
)

field_name = "foreach-indices-truncated"
field_value = self.metadata_dict.get("foreach-indices-truncated")
# We find all tasks of the same step that have the same foreach-indices-truncated value
return {
step_name: self._metaflow.metadata.filter_tasks_by_metadata(
flow_id, run_id, step_name, step_name, field_name, field_value
)
}

@property
def metadata(self) -> List[Metadata]:
Expand Down

0 comments on commit 48c771d

Please sign in to comment.