Skip to content

Commit 44cc1af

Browse files
committed
bug(backend): Pipeline Version Link Returns "Cannot retrieve pipeline version" Error
Fix race condition in upload_pipeline() that caused "Cannot retrieve pipeline version" errors when creating new pipelines. The previous code created a pipeline (which auto-creates a default version), then created a second version with a random name, then tried to delete the default version by sorting versions by created_at timestamp. If both versions had the same timestamp (common with second-precision timestamps), the sort order was undefined and could delete the wrong version - the one we wanted to keep. The returned version ID then pointed to a deleted version. Simplified the flow to just use the default version that upload_pipeline() creates, eliminating the race condition entirely: - New pipelines: use the default version created by upload_pipeline() - Existing pipelines: upload a new version with random name (unchanged) Signed-off-by: Eder Ignatowicz <ignatowicz@gmail.com>
1 parent 0b32cb4 commit 44cc1af

File tree

1 file changed

+10
-19
lines changed

1 file changed

+10
-19
lines changed

backend/kale/common/kfputils.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -127,37 +127,28 @@ def upload_pipeline(
127127
client = _get_kfp_client(host)
128128
log.info("Uploading pipeline '%s'...", pipeline_name)
129129
pipeline_id = get_pipeline_id(pipeline_name, host=host)
130-
version_name = utils.random_string()
131130
if not pipeline_id:
132-
# The first version of the pipeline is set to the pipeline name value.
133-
# To work around this, upload the first pipeline, then another one
134-
# with a proper version name. Finally delete the original pipeline.
131+
# New pipeline: upload_pipeline creates a default version automatically
135132
upp = client.upload_pipeline(
136133
pipeline_package_path=pipeline_package_path, pipeline_name=pipeline_name
137134
)
138135
pipeline_id = upp.pipeline_id
139-
log.info("Uploaded Pipeline '%s' id: %s", pipeline_name, pipeline_id)
140-
upv = client.upload_pipeline_version(
141-
pipeline_package_path=pipeline_package_path,
142-
pipeline_version_name=version_name,
143-
pipeline_id=pipeline_id,
144-
)
145-
# delete the first version which has the same name as the pipeline
136+
log.info("Uploaded pipeline '%s' with id: %s", pipeline_name, pipeline_id)
137+
# Get the default version that was created with the pipeline
146138
versions = client.list_pipeline_versions(pipeline_id=pipeline_id)
147-
sorted_versions = sorted(versions.pipeline_versions, key=lambda v: v.created_at)
148-
delete_vid = sorted_versions[0].pipeline_version_id
149-
client.delete_pipeline_version(pipeline_id=pipeline_id, pipeline_version_id=delete_vid)
150-
log.info(
151-
"Deleted pipeline version with name '%s' and ID: %s", pipeline_name, upp.pipeline_id
152-
)
139+
version_id = versions.pipeline_versions[0].pipeline_version_id
153140
else:
141+
# Existing pipeline: upload a new version
142+
version_name = utils.random_string()
154143
upv = client.upload_pipeline_version(
155144
pipeline_package_path=pipeline_package_path,
156145
pipeline_version_name=version_name,
157146
pipeline_id=pipeline_id,
158147
)
159-
log.info("Successfully uploaded version '%s' for pipeline '%s'.", version_name, pipeline_name)
160-
return pipeline_id, upv.pipeline_version_id
148+
version_id = upv.pipeline_version_id
149+
log.info("Uploaded version '%s' for pipeline '%s'.",
150+
version_name, pipeline_name)
151+
return pipeline_id, version_id
161152

162153

163154
def run_pipeline(

0 commit comments

Comments
 (0)