Skip to content

Commit

Permalink
fix(sdk): fix nested loop global param bug (#917)
Browse files Browse the repository at this point in the history
* fix nested loop global param bug

* fix nested loop global param bug

* add license

* fix recursion bug
  • Loading branch information
Tomcli authored Apr 13, 2022
1 parent eac02f0 commit 51c5cdd
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 43 deletions.
34 changes: 34 additions & 0 deletions sdk/python/kfp_tekton/compiler/_tekton_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,19 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
custom_task_cr_again['spec']['pipelineSpec']['params'] = sorted(
custom_task_cr_again['spec']['pipelineSpec']['params'], key=lambda k: k['name'])
# add children params to the root tasks
global_task_values = set()
all_nested_loop = []
nested_task = nested_custom_task['nested_custom_task']
while nested_task:
all_nested_loop.append(nested_task)
has_nested_task = False
for n_task in nested_custom_tasks:
if n_task['father_ct'] == nested_task:
nested_task = n_task['nested_custom_task']
has_nested_task = True
break
if not has_nested_task:
break
for task in tasks:
if task['name'] == nested_custom_task['root_ct']:
task['params'].extend(copy.deepcopy(nested_custom_task_special_params))
Expand All @@ -344,6 +357,27 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
task['params'].extend(nested_custom_task_special_params)
if task.get('params') is not None:
task['params'] = sorted(task['params'], key=lambda k: k['name'])
if task['name'] in all_nested_loop:
for param in task['params']:
if '$(params.' in param['value'] and 'subvar-' not in param['value']:
global_task_values.add(param['value'])
# Add any pipeline global params to the nested loop layers
all_params = []
for custom_param in custom_task_cr['spec']['pipelineSpec']['params']:
all_params.append(''.join(['$(params.', custom_param['name'], ')']))
for global_task_value in global_task_values:
if global_task_value not in all_params:
all_params.append(global_task_value)
custom_task_cr['spec']['pipelineSpec']['params'].append(
{'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0],
'type': 'string'}
)
for task in tasks:
if task['name'] == nested_custom_task['father_ct']:
task['params'].append(
{'name': re.findall('\$\(params.([^ \t\n.:,;\{\}]+)\)', global_task_value)[0],
'value': global_task_value}
)
for special_param in nested_custom_task_special_params:
for nested_param in nested_custom_task_spec['params']:
if nested_param['name'] == special_param['name']:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ def test_loop_with_step_workflow(self):
from .testdata.loop_with_step import pipeline
self._test_pipeline_workflow(pipeline, 'loop_with_step.yaml')

def test_nested_loop_global_param_workflow(self):
"""
Test compiling a nested loop with global parameters in workflow.
"""
from .testdata.nested_loop_global_param import nested_loop
self._test_pipeline_workflow(nested_loop, 'nested_loop_global_param.yaml')

def test_cond_recur_workflow(self):
"""
Test compiling a conditional recursive workflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ spec:
params:
- name: loop-item-param-1
value: '[{"a": 1, "b": 2}, {"a": 10, "b": 20}]'
- name: my_pipe_param
value: $(params.my_pipe_param)
- name: my_pipe_param-loop-item
value: $(params.my_pipe_param)
- name: my_pipe_param3
value: $(params.my_pipe_param3)
- name: my_pipe_param3-loop-item
value: $(params.my_pipe_param3)
taskSpec:
Expand All @@ -62,8 +66,12 @@ spec:
type: string
- name: loop-item-param-1-subvar-b
type: string
- name: my_pipe_param
type: string
- name: my_pipe_param-loop-item
type: string
- name: my_pipe_param3
type: string
- name: my_pipe_param3-loop-item
type: string
tasks:
Expand Down Expand Up @@ -140,6 +148,8 @@ spec:
value: $(params.loop-item-param-1-subvar-b)
- name: my_pipe_param-loop-item
value: $(params.my_pipe_param-loop-item)
- name: my_pipe_param3
value: $(params.my_pipe_param3)
- name: my_pipe_param3-loop-item
value: $(params.my_pipe_param3-loop-item)
taskSpec:
Expand All @@ -156,6 +166,8 @@ spec:
type: string
- name: my_pipe_param-loop-item
type: string
- name: my_pipe_param3
type: string
- name: my_pipe_param3-loop-item
type: string
tasks:
Expand Down Expand Up @@ -201,6 +213,8 @@ spec:
value: $(params.loop-item-param-1-subvar-b)
- name: loop-item-param-5
value: '[4, 5]'
- name: my_pipe_param3
value: $(params.my_pipe_param3)
- name: my_pipe_param3-loop-item
value: $(params.my_pipe_param3-loop-item)
taskSpec:
Expand All @@ -217,6 +231,8 @@ spec:
type: string
- name: loop-item-param-5
type: string
- name: my_pipe_param3
type: string
- name: my_pipe_param3-loop-item
type: string
tasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ metadata:
"spec": {"iterateParam": "loop-item-param-1", "iterationNumberParam": "iteration-number-3",
"pipelineSpec": {"params": [{"name": "iteration-number-3", "type": "string"},
{"name": "loop-item-param-1-subvar-a", "type": "string"}, {"name": "loop-item-param-1-subvar-b",
"type": "string"}, {"name": "my_pipe_param-loop-item", "type": "string"}, {"name":
"my_pipe_param3-loop-item", "type": "string"}], "tasks": [{"name": "my-in-coop1",
"params": [{"name": "iteration-number-3", "value": "$(params.iteration-number-3)"},
{"name": "loop-item-param-1-subvar-a", "value": "$(params.loop-item-param-1-subvar-a)"}],
"taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
"{\"name\": \"my-in-coop1\", \"outputs\": [], \"version\": \"my-in-coop1@sha256=a3500c74823f34974bad0f19c6ebb9c48b9457d687021e9d6c874f840a7cd9b0\"}",
"type": "string"}, {"name": "my_pipe_param", "type": "string"}, {"name": "my_pipe_param-loop-item",
"type": "string"}, {"name": "my_pipe_param3", "type": "string"}, {"name": "my_pipe_param3-loop-item",
"type": "string"}], "tasks": [{"name": "my-in-coop1", "params": [{"name": "iteration-number-3",
"value": "$(params.iteration-number-3)"}, {"name": "loop-item-param-1-subvar-a",
"value": "$(params.loop-item-param-1-subvar-a)"}], "taskSpec": {"metadata":
{"annotations": {"pipelines.kubeflow.org/component_spec_digest": "{\"name\":
\"my-in-coop1\", \"outputs\": [], \"version\": \"my-in-coop1@sha256=a3500c74823f34974bad0f19c6ebb9c48b9457d687021e9d6c874f840a7cd9b0\"}",
"tekton.dev/template": ""}, "labels": {"pipelines.kubeflow.org/cache_enabled":
"true", "pipelines.kubeflow.org/generation": "", "pipelines.kubeflow.org/pipelinename":
""}}, "params": [{"name": "iteration-number-3", "type": "string"}, {"name":
Expand All @@ -60,14 +61,15 @@ metadata:
{"name": "loop-item-param-1-subvar-a", "value": "$(params.loop-item-param-1-subvar-a)"},
{"name": "loop-item-param-1-subvar-b", "value": "$(params.loop-item-param-1-subvar-b)"},
{"name": "my_pipe_param-loop-item", "value": "$(params.my_pipe_param-loop-item)"},
{"name": "my_pipe_param3-loop-item", "value": "$(params.my_pipe_param3-loop-item)"}],
"taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1", "kind": "PipelineLoop",
"name": "withitem-multiple-nesting-pipeline-for-loop-4"}}]}}}, {"apiVersion":
"custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata": {"name": "withitem-multiple-nesting-pipeline-for-loop-4"},
"spec": {"iterateParam": "my_pipe_param-loop-item", "pipelineSpec": {"params":
[{"name": "iteration-number-3", "type": "string"}, {"name": "loop-item-param-1-subvar-a",
"type": "string"}, {"name": "loop-item-param-1-subvar-b", "type": "string"},
{"name": "my_pipe_param-loop-item", "type": "string"}, {"name": "my_pipe_param3-loop-item",
{"name": "my_pipe_param3", "value": "$(params.my_pipe_param3)"}, {"name": "my_pipe_param3-loop-item",
"value": "$(params.my_pipe_param3-loop-item)"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "name": "withitem-multiple-nesting-pipeline-for-loop-4"}}]}}},
{"apiVersion": "custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata":
{"name": "withitem-multiple-nesting-pipeline-for-loop-4"}, "spec": {"iterateParam":
"my_pipe_param-loop-item", "pipelineSpec": {"params": [{"name": "iteration-number-3",
"type": "string"}, {"name": "loop-item-param-1-subvar-a", "type": "string"},
{"name": "loop-item-param-1-subvar-b", "type": "string"}, {"name": "my_pipe_param-loop-item",
"type": "string"}, {"name": "my_pipe_param3", "type": "string"}, {"name": "my_pipe_param3-loop-item",
"type": "string"}], "tasks": [{"name": "my-1st-inner-coop", "params": [{"name":
"loop-item-param-1-subvar-a", "value": "$(params.loop-item-param-1-subvar-a)"},
{"name": "my_pipe_param-loop-item", "value": "$(params.my_pipe_param-loop-item)"}],
Expand All @@ -83,21 +85,22 @@ metadata:
"params": [{"name": "iteration-number-3", "value": "$(params.iteration-number-3)"},
{"name": "loop-item-param-1-subvar-b", "value": "$(params.loop-item-param-1-subvar-b)"},
{"name": "loop-item-param-5", "value": "[4, 5]"}, {"name": "my_pipe_param3-loop-item",
"value": "$(params.my_pipe_param3-loop-item)"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1",
"value": "$(params.my_pipe_param3-loop-item)"}, {"name": "my_pipe_param3", "value":
"$(params.my_pipe_param3)"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "name": "withitem-multiple-nesting-pipeline-for-loop-6"}}]}}},
{"apiVersion": "custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata":
{"name": "withitem-multiple-nesting-pipeline-for-loop-6"}, "spec": {"iterateParam":
"loop-item-param-5", "iterationNumberParam": "iteration-number-7", "pipelineSpec":
{"params": [{"name": "iteration-number-3", "type": "string"}, {"name": "iteration-number-7",
"type": "string"}, {"name": "loop-item-param-1-subvar-b", "type": "string"},
{"name": "loop-item-param-5", "type": "string"}, {"name": "my_pipe_param3-loop-item",
"type": "string"}], "tasks": [{"name": "my-2nd-inner-coop", "params": [{"name":
"iteration-number-3", "value": "$(params.iteration-number-3)"}, {"name": "iteration-number-7",
"value": "$(params.iteration-number-7)"}, {"name": "loop-item-param-1-subvar-b",
"value": "$(params.loop-item-param-1-subvar-b)"}, {"name": "loop-item-param-5",
"value": "$(params.loop-item-param-5)"}], "taskSpec": {"metadata": {"annotations":
{"pipelines.kubeflow.org/component_spec_digest": "{\"name\": \"my-2nd-inner-coop\",
\"outputs\": [], \"version\": \"my-2nd-inner-coop@sha256=446981377175c0bb9fd796c392d60faeb066a8ed375deb664fe6caae385220a4\"}",
{"name": "loop-item-param-5", "type": "string"}, {"name": "my_pipe_param3",
"type": "string"}, {"name": "my_pipe_param3-loop-item", "type": "string"}],
"tasks": [{"name": "my-2nd-inner-coop", "params": [{"name": "iteration-number-3",
"value": "$(params.iteration-number-3)"}, {"name": "iteration-number-7", "value":
"$(params.iteration-number-7)"}, {"name": "loop-item-param-1-subvar-b", "value":
"$(params.loop-item-param-1-subvar-b)"}, {"name": "loop-item-param-5", "value":
"$(params.loop-item-param-5)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
"{\"name\": \"my-2nd-inner-coop\", \"outputs\": [], \"version\": \"my-2nd-inner-coop@sha256=446981377175c0bb9fd796c392d60faeb066a8ed375deb664fe6caae385220a4\"}",
"tekton.dev/template": ""}, "labels": {"pipelines.kubeflow.org/cache_enabled":
"true", "pipelines.kubeflow.org/generation": "", "pipelines.kubeflow.org/pipelinename":
""}}, "params": [{"name": "iteration-number-3", "type": "string"}, {"name":
Expand Down Expand Up @@ -147,8 +150,12 @@ spec:
params:
- name: loop-item-param-1
value: '[{"a": 1, "b": 2}, {"a": 10, "b": 20}]'
- name: my_pipe_param
value: $(params.my_pipe_param)
- name: my_pipe_param-loop-item
value: $(params.my_pipe_param)
- name: my_pipe_param3
value: $(params.my_pipe_param3)
- name: my_pipe_param3-loop-item
value: $(params.my_pipe_param3)
timeout: 525600m
68 changes: 68 additions & 0 deletions sdk/python/tests/compiler/testdata/nested_loop_global_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2022 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp.components import load_component_from_text
from kfp_tekton.compiler import TektonCompiler


class Coder:
def empty(self):
return ""


TektonCompiler._get_unique_id_code = Coder.empty


def PrintOp(name: str, msg: str = None):
if msg is None:
msg = name
print_op = load_component_from_text(
"""
name: %s
inputs:
- {name: input_text, type: String, description: 'Represents an input parameter.'}
outputs:
- {name: output_value, type: String, description: 'Represents an output paramter.'}
implementation:
container:
image: alpine:3.6
command:
- sh
- -c
- |
set -e
echo $0 > $1
- {inputValue: input_text}
- {outputPath: output_value}
""" % (name)
)
return print_op(msg)


@dsl.pipeline("empty-loop")
def nested_loop(param: list = ["a", "b", "c"]):
# param of the inner loop is used inner-most --- works fine
with dsl.ParallelFor(param):
with dsl.ParallelFor(param):
PrintOp('print-0', f"print {param}")

# param of the inner loop is not used inner-most --- fails
with dsl.ParallelFor(param):
with dsl.ParallelFor(param):
PrintOp('print-1', "print")


if __name__ == '__main__':
TektonCompiler().compile(nested_loop, __file__.replace('.py', '.yaml'))
Loading

0 comments on commit 51c5cdd

Please sign in to comment.