Skip to content

Argo compiler emits duplicate DAGTask for foreach matching_join #3196

@igmcdowell

Description

@igmcdowell

Note: I used Claude to put together this analysis after hitting an edge case in Argo template generation. I've hand-verified the repro and fix, though the seen tracking is pretty hairy and I don't claim to have as full an understanding of the root cause as Claude implies.

============================ Begin Claude analysis ============================

ArgoWorkflows._dag_templates can emit two DAG tasks with the same name inside one Argo template in the matching_join of a foreach. Argo rejects the WorkflowTemplate at submit time with:

templates.<flow> sorting failed: duplicated nodeName <step>

The duplicate appears when a split-switch step's matching_conditional_join resolves to a node that is also the matching_join of a foreach descendant.

Reproduction

Verified on metaflow master (commit 6d509db, version 2.19.29).

Flow

"""Minimum repro: duplicate DAGTask for foreach matching_join."""
from metaflow import FlowSpec, project, step


@project(name="foreach_join_dedup")
class ForeachJoinDedupFlow(FlowSpec):
    @step
    def start(self):
        # Both branches converge at fan_gate.
        self.optional_mode = "run"
        self.next(
            {"skip": self.fan_gate, "run": self.optional_step},
            condition="optional_mode",
        )

    @step
    def optional_step(self):
        self.next(self.fan_gate)

    @step
    def fan_gate(self):
        self.fan_mode = "run"
        self.next(
            {"skip": self.end, "run": self.fan_out},
            condition="fan_mode",
        )

    @step
    def fan_out(self):
        self.scan_items = [1, 2]
        self.next(self.fan_step, foreach="scan_items")

    @step
    def fan_step(self):
        # `join_step` is fan_out's matching_join — the bug victim.
        self.next(self.join_step)

    @step
    def join_step(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    ForeachJoinDedupFlow()

The two ingredients:

  1. start and optional_step both feed fan_gate, so start's two branches converge before reaching the foreach. This puts both start.out_funcs into fan_step.conditional_branches, the precondition for join_step to be a closing candidate for start in _parse_conditional_branches.
  2. Step names are chosen so that join_step (the matching_join) sorts last alphabetically among nodes that close start. The compiler's second pass overwrites matching_conditional_join_dict[start] on every closing candidate, so whichever node iterates last wins. With these names, matching_conditional_join_dict[start] == "join_step".

Compile

python foreach_join_dedup_flow.py --no-pylint --datastore=s3 argo-workflows create --only-json

Output

$ ... | jq '
    [.spec.templates[]
    | select(.dag)
    | {tpl: .name,
        dups: ([.dag.tasks[].name]
                | group_by(.) | map(select(length>1)
                | {name: .[0], count: length}))}]
    | map(select(.dups | length > 0))'

[
{
    "tpl": "ForeachJoinDedupFlow",
    "dups": [{"name": "join-step", "count": 2}]
}
]

The two join-step DAG tasks differ in shape, matching the two compiler code paths:

depends: fan-out-foreach-None.Succeeded      (foreach handler emission)
parameters: ['input-paths', 'split-cardinality']
---
depends: ((fan-step.Succeeded))              (regular-join emission)
parameters: ['input-paths']

Root cause

In metaflow/plugins/argo/argo_workflows.py, inside ArgoWorkflows._dag_templates._visit:

  • The seen list (line 1232) dedupes _visit calls on line 1240 returns early if node.name in seen. Every DAGTask emission site does seen.append(node.name) except the problematic branch.
  • The foreach branch (lines ~1873–1918) appends a DAGTask whose name is self.graph[node.matching_join].name (line 1874), but does not seen.append(...) that matching-join name.
  • The foreach branch's tail call at line 1919 jumps over the matching_join to out_funcs[0], so the matching_join is never visited via the foreach subtree.

If a later traversal arrives at the matching_join with exit_node != matching_join, line 1240 doesn't fire and the regular join path (line 1568) appends a second DAGTask.

That later traversal happens via the split-switch handler at lines 1685–1701. After visiting each child of a split-switch S, it tail-calls:

return _visit(
    self.graph[self._matching_conditional_join(node)],
    exit_node,
    ...
)

If _matching_conditional_join(S) resolves to J, and J is also a foreach's matching_join that was emitted (without seen-tracking) earlier in the traversal of one of S's children, this tail call walks into J as a regular join and emits a second DAGTask.

In the repro: matching_conditional_join_dict[start] == "join_step", and join_step is also fan_out.matching_join.

Proposed fix

One line, immediately after dag_tasks.append(join_foreach_task) (line 1918):

                dag_tasks.append(join_foreach_task)
+                seen.append(self.graph[node.matching_join].name)
                return _visit(
                    self.graph[self.graph[node.matching_join].out_funcs[0]],

Verified: with this one-line change, the repro flow compiles to a single join-step DAGTask (with the foreach-handler shape), and test/unit/test_argo_workflows_cli.py still passes 16/16.
============================ End Claude analysis ============================

Happy to put up a PR with the patch and a regression test if useful.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions