Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync for map over lp in dynamic #3155

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 62 additions & 64 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,63 +483,10 @@
wf_templates.extend([swf.template for swf in compiled_wf.sub_workflows])

node_launch_plans = {}

def find_launch_plan(
lp_ref: id_models, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if lp_ref not in node_launch_plans:
admin_launch_plan = self.client.get_launch_plan(lp_ref)
node_launch_plans[lp_ref] = admin_launch_plan.spec

for wf_template in wf_templates:
for node in FlyteWorkflow.get_non_system_nodes(wf_template.nodes):
if node.workflow_node is not None and node.workflow_node.launchplan_ref is not None:
lp_ref = node.workflow_node.launchplan_ref
find_launch_plan(lp_ref, node_launch_plans)

# Inspect array nodes for launch plans
if (
node.array_node is not None
and node.array_node.node.workflow_node is not None
and node.array_node.node.workflow_node.launchplan_ref is not None
):
lp_ref = node.array_node.node.workflow_node.launchplan_ref
find_launch_plan(lp_ref, node_launch_plans)

# Inspect conditional branch nodes for launch plans
def get_launch_plan_from_branch(
branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
def get_launch_plan_from_then_node(
child_then_node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
# then_node could have nested branch_node or be a normal then_node
if child_then_node.branch_node:
get_launch_plan_from_branch(child_then_node.branch_node, node_launch_plans)
elif child_then_node.workflow_node and child_then_node.workflow_node.launchplan_ref:
lp_ref = child_then_node.workflow_node.launchplan_ref
find_launch_plan(lp_ref, node_launch_plans)

if branch_node and branch_node.if_else:
branch = branch_node.if_else
if branch.case and branch.case.then_node:
child_then_node = branch.case.then_node
get_launch_plan_from_then_node(child_then_node, node_launch_plans)
if branch.other:
for o in branch.other:
if o.then_node:
child_then_node = o.then_node
get_launch_plan_from_then_node(child_then_node, node_launch_plans)
if branch.else_node:
# else_node could have nested conditional branch_node
if branch.else_node.branch_node:
get_launch_plan_from_branch(branch.else_node.branch_node, node_launch_plans)
elif branch.else_node.workflow_node and branch.else_node.workflow_node.launchplan_ref:
lp_ref = branch.else_node.workflow_node.launchplan_ref
find_launch_plan(lp_ref, node_launch_plans)

if node.branch_node:
get_launch_plan_from_branch(node.branch_node, node_launch_plans)
self.find_launch_plan_for_node(node, node_launch_plans)

flyte_workflow = FlyteWorkflow.promote_from_closure(compiled_wf, node_launch_plans)
flyte_workflow.template._id = workflow_id
return flyte_workflow
Expand All @@ -556,6 +503,65 @@
flyte_lp._flyte_workflow = workflow
return flyte_lp

def find_launch_plan(
self, lp_ref: id_models, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if lp_ref not in node_launch_plans:
admin_launch_plan = self.client.get_launch_plan(lp_ref)
node_launch_plans[lp_ref] = admin_launch_plan.spec

def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):
Comment on lines +513 to +515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simplifying complex nested logic

Consider breaking down the find_launch_plan_for_node method into smaller, more focused methods. The current implementation has multiple nested functions and complex branching logic which could be simplified for better maintainability. Consider extracting the branch node handling logic into a separate method.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):
def _handle_then_node(
self, child_then_node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if child_then_node.branch_node:
self._handle_branch_node(child_then_node.branch_node, node_launch_plans)
elif child_then_node.workflow_node and child_then_node.workflow_node.launchplan_ref:
lp_ref = child_then_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)
def _handle_branch_node(
self, branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
if branch_node and branch_node.if_else:
branch = branch_node.if_else
if branch.case and branch.case.then_node:
self._handle_then_node(branch.case.then_node, node_launch_plans)
if branch.other:
for o in branch.other:
if o.then_node:
self._handle_then_node(o.then_node, node_launch_plans)
if branch.else_node:
if branch.else_node.branch_node:
self._handle_branch_node(branch.else_node.branch_node, node_launch_plans)
elif branch.else_node.workflow_node and branch.else_node.workflow_node.launchplan_ref:
lp_ref = branch.else_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)
def find_launch_plan_for_node(
self, node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
):

Code Review Run #df4fb4


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

# Case 1: workflow node
if node.workflow_node is not None and node.workflow_node.launchplan_ref is not None:
lp_ref = node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)

# Case 2: array node
if (
node.array_node is not None
and node.array_node.node.workflow_node is not None
and node.array_node.node.workflow_node.launchplan_ref is not None
):
lp_ref = node.array_node.node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)

Check warning on line 528 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L527-L528

Added lines #L527 - L528 were not covered by tests

# Case 3: branch node
def get_launch_plan_from_branch(
branch_node: BranchNode, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
def get_launch_plan_from_then_node(
child_then_node: Node, node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec]
) -> None:
# then_node could have nested branch_node or be a normal then_node
if child_then_node.branch_node:
get_launch_plan_from_branch(child_then_node.branch_node, node_launch_plans)

Check warning on line 539 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L539

Added line #L539 was not covered by tests
elif child_then_node.workflow_node and child_then_node.workflow_node.launchplan_ref:
lp_ref = child_then_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)

if branch_node and branch_node.if_else:
branch = branch_node.if_else
if branch.case and branch.case.then_node:
child_then_node = branch.case.then_node
get_launch_plan_from_then_node(child_then_node, node_launch_plans)
if branch.other:
for o in branch.other:
if o.then_node:
child_then_node = o.then_node
get_launch_plan_from_then_node(child_then_node, node_launch_plans)

Check warning on line 553 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L552-L553

Added lines #L552 - L553 were not covered by tests
if branch.else_node:
# else_node could have nested conditional branch_node
if branch.else_node.branch_node:
get_launch_plan_from_branch(branch.else_node.branch_node, node_launch_plans)
elif branch.else_node.workflow_node and branch.else_node.workflow_node.launchplan_ref:
lp_ref = branch.else_node.workflow_node.launchplan_ref
self.find_launch_plan(lp_ref, node_launch_plans)

Check warning on line 560 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L559-L560

Added lines #L559 - L560 were not covered by tests

if node.branch_node:
get_launch_plan_from_branch(node.branch_node, node_launch_plans)

def fetch_active_launchplan(
self, project: str = None, domain: str = None, name: str = None
) -> typing.Optional[FlyteLaunchPlan]:
Expand Down Expand Up @@ -2575,17 +2581,9 @@
if node_execution_get_data_response.dynamic_workflow is not None:
compiled_wf = node_execution_get_data_response.dynamic_workflow.compiled_workflow
node_launch_plans = {}
# TODO: Inspect branch nodes for launch plans
for template in [compiled_wf.primary.template] + [swf.template for swf in compiled_wf.sub_workflows]:
for node in FlyteWorkflow.get_non_system_nodes(template.nodes):
if (
node.workflow_node is not None
and node.workflow_node.launchplan_ref is not None
and node.workflow_node.launchplan_ref not in node_launch_plans
):
node_launch_plans[node.workflow_node.launchplan_ref] = self.client.get_launch_plan(
node.workflow_node.launchplan_ref
).spec
self.find_launch_plan_for_node(node, node_launch_plans)

Check warning on line 2586 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L2586

Added line #L2586 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for launch plans

Consider adding error handling for find_launch_plan_for_node to handle potential exceptions when fetching launch plans. The method may fail silently if there are issues accessing the launch plans.

Code suggestion
Check the AI-generated fix before applying
Suggested change
self.find_launch_plan_for_node(node, node_launch_plans)
try:
self.find_launch_plan_for_node(node, node_launch_plans)
except FlyteEntityNotExistException as e:
logger.warning(f"Failed to fetch launch plan for node {node.id}: {str(e)}")

Code Review Run #df4fb4


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them


dynamic_flyte_wf = FlyteWorkflow.promote_from_closure(compiled_wf, node_launch_plans)
execution._underlying_node_executions = [
Expand Down
Loading