Skip to content

Commit

Permalink
fix(sdk): Apply node_selector in PipelineConf (#918)
Browse files Browse the repository at this point in the history
* fix(sdk): Apply node_selector in PipelineConf

Apply the node_selector in PipelineConf to spec.podTemplate
instead of taskPodTemplate of each task.

Signed-off-by: Yihong Wang <[email protected]>

* update Makefile under pipelineloop

when running cli, it should do update target first

Signed-off-by: Yihong Wang <[email protected]>
  • Loading branch information
yhwang authored Apr 13, 2022
1 parent 51c5cdd commit b14f952
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 12 deletions.
12 changes: 6 additions & 6 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,11 +1264,7 @@ def get_when_task(input_task_when, depended_conditions):
task_spec["taskPodTemplate"]["affinity"] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
task_spec["taskPodTemplate"]['tolerations'] = op.tolerations
# process pipeline level first
if pipeline_conf and hasattr(pipeline_conf, 'default_pod_node_selector') \
and len(pipeline_conf.default_pod_node_selector) > 0:
task_spec["taskPodTemplate"]['nodeSelector'] = copy.deepcopy(pipeline_conf.default_pod_node_selector)
# process op level and it may oeverride the pipeline level conf
# process op level node_selector
if op.node_selector:
if task_spec["taskPodTemplate"].get('nodeSelector'):
task_spec["taskPodTemplate"]['nodeSelector'].update(op.node_selector)
Expand All @@ -1291,7 +1287,11 @@ def get_when_task(input_task_when, depended_conditions):
pipeline_run['spec']['podTemplate'] = pipeline_run['spec'].get('podTemplate', {})
pipeline_run['spec']['podTemplate']['imagePullSecrets'] = [
{"name": s.name} for s in pipeline.conf.image_pull_secrets]

# process pipeline level node_selector
if pipeline_conf and hasattr(pipeline_conf, 'default_pod_node_selector') \
and len(pipeline_conf.default_pod_node_selector) > 0:
pipeline_run['spec']['podTemplate'] = pipeline_run['spec'].get('podTemplate', {})
pipeline_run['spec']['podTemplate']['nodeSelector'] = copy.deepcopy(pipeline_conf.default_pod_node_selector)
workflow = pipeline_run

return workflow
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ def test_imagepullsecrets_workflow(self):
from .testdata.imagepullsecrets import imagepullsecrets_pipeline
self._test_pipeline_workflow(imagepullsecrets_pipeline, 'imagepullsecrets.yaml', skip_noninlined=True)

def test_imagepullsecrets_with_node_selector_workflow(self):
"""
Test compiling a imagepullsecrets and node_selector workflow.
"""
from .testdata.imagepullsecrets_with_node_selector import imagepullsecrets_pipeline
self._test_pipeline_workflow(imagepullsecrets_pipeline, 'imagepullsecrets_with_node_selector.yaml', skip_noninlined=True)

def test_basic_no_decorator(self):
"""
Test compiling a basic workflow with no pipeline decorator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example demonstrating how to specify imagepullsecrets to access protected
container registry.
"""

from kfp import dsl, components
from kubernetes import client as k8s_client

GET_FREQUENT_WORD_STR = """
name: get-frequent
description: A get frequent word class representing a component in ML Pipelines
inputs:
- {name: message, type: String}
outputs:
- {name: word, type: String}
implementation:
container:
image: python:3.6-jessie
command:
- sh
- -c
args:
- |
python -c "from collections import Counter; \
text = '$0'; print('Input: ' + text); words = Counter(text.split()); \
print(max(words, key=words.get))" \
| tee $1
- {inputValue: message}
- {outputPath: word}
"""

get_frequent_word_op = components.load_component_from_text(GET_FREQUENT_WORD_STR)


@dsl.pipeline(
name='save-most-frequent',
description='Get Most Frequent Word and Save to GCS'
)
def imagepullsecrets_pipeline(
message: str = "When flies fly behind flies, then flies are following flies."):
"""A pipeline function describing the orchestration of the workflow."""

counter = get_frequent_word_op(message=message)
# Call set_image_pull_secrets after get_pipeline_conf().
dsl.get_pipeline_conf() \
.set_image_pull_secrets([k8s_client.V1ObjectReference(name="secretA")])
# also set node_selector
dsl.get_pipeline_conf().set_default_pod_node_selector('kubernetes.io/os', 'linux')


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(imagepullsecrets_pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: save-most-frequent
annotations:
tekton.dev/output_artifacts: '{"get-frequent": [{"key": "artifacts/$PIPELINERUN/get-frequent/word.tgz",
"name": "get-frequent-word", "path": "/tmp/outputs/word/data"}]}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"get-frequent": [["word", "$(results.word.path)"]]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word
and Save to GCS", "inputs": [{"default": "When flies fly behind flies, then
flies are following flies.", "name": "message", "optional": true, "type": "String"}],
"name": "save-most-frequent"}'
spec:
params:
- name: message
value: When flies fly behind flies, then flies are following flies.
pipelineSpec:
params:
- name: message
default: When flies fly behind flies, then flies are following flies.
tasks:
- name: get-frequent
params:
- name: message
value: $(params.message)
taskSpec:
steps:
- name: main
args:
- |
python -c "from collections import Counter; text = '$0'; print('Input: ' + text); words = Counter(text.split()); print(max(words, key=words.get))" | tee $1
- $(inputs.params.message)
- $(results.word.path)
command:
- sh
- -c
image: python:3.6-jessie
params:
- name: message
results:
- name: word
description: /tmp/outputs/word/data
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "get-frequent",
"outputs": [{"name": "word", "type": "String"}], "version": "get-frequent@sha256=6ee1acb749583ceffd098100e3e83c4c369aa6b5f295c8bda202b32c853ca5db"}'
tekton.dev/template: ''
timeout: 525600m
timeout: 525600m
podTemplate:
imagePullSecrets:
- name: secretA
nodeSelector:
kubernetes.io/os: linux
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ spec:
[], "version": "echo@sha256=e351a00f59eb0eb8d614bb41816020b768770ba7513a5b3c0c7ea5c2efa9c6d6"}'
tekton.dev/template: ''
timeout: 525600m
taskRunSpecs:
- pipelineTaskName: echo
taskPodTemplate:
nodeSelector:
kubernetes.io/os: linux
timeout: 525600m
podTemplate:
nodeSelector:
kubernetes.io/os: linux
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ spec:
nodeSelector:
kubernetes.io/os: windows
timeout: 525600m
podTemplate:
nodeSelector:
kubernetes.io/os: linux
2 changes: 1 addition & 1 deletion tekton-catalog/pipeline-loops/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ init:
mkdir -p ${BIN_DIR}

.PHONY: cli
cli: init $(info cli: build validation cli )
cli: update init $(info cli: build validation cli )
go build -o=${BIN_DIR}/pipelineloop-cli ./cmd/cli

.PHONY: validate-testdata-python-sdk
Expand Down

0 comments on commit b14f952

Please sign in to comment.