|
40 | 40 | logger = get_logger(__name__)
|
41 | 41 |
|
42 | 42 |
|
| 43 | +def _import_pipeline(source: str) -> Pipeline: |
| 44 | + """Import a pipeline. |
| 45 | +
|
| 46 | + Args: |
| 47 | + source: The pipeline source. |
| 48 | +
|
| 49 | + Returns: |
| 50 | + The pipeline. |
| 51 | + """ |
| 52 | + try: |
| 53 | + pipeline_instance = source_utils.load(source) |
| 54 | + except ModuleNotFoundError as e: |
| 55 | + source_root = source_utils.get_source_root() |
| 56 | + cli_utils.error( |
| 57 | + f"Unable to import module `{e.name}`. Make sure the source path is " |
| 58 | + f"relative to your source root `{source_root}`." |
| 59 | + ) |
| 60 | + except AttributeError as e: |
| 61 | + cli_utils.error("Unable to load attribute from module: " + str(e)) |
| 62 | + |
| 63 | + if not isinstance(pipeline_instance, Pipeline): |
| 64 | + cli_utils.error( |
| 65 | + f"The given source path `{source}` does not resolve to a pipeline " |
| 66 | + "object." |
| 67 | + ) |
| 68 | + |
| 69 | + return pipeline_instance |
| 70 | + |
| 71 | + |
43 | 72 | @cli.group(cls=TagGroup, tag=CliCategories.MANAGEMENT_TOOLS)
|
44 | 73 | def pipeline() -> None:
|
45 | 74 | """Interact with pipelines, runs and schedules."""
|
@@ -85,22 +114,7 @@ def register_pipeline(
|
85 | 114 | "source code root."
|
86 | 115 | )
|
87 | 116 |
|
88 |
| - try: |
89 |
| - pipeline_instance = source_utils.load(source) |
90 |
| - except ModuleNotFoundError as e: |
91 |
| - source_root = source_utils.get_source_root() |
92 |
| - cli_utils.error( |
93 |
| - f"Unable to import module `{e.name}`. Make sure the source path is " |
94 |
| - f"relative to your source root `{source_root}`." |
95 |
| - ) |
96 |
| - except AttributeError as e: |
97 |
| - cli_utils.error("Unable to load attribute from module: " + str(e)) |
98 |
| - |
99 |
| - if not isinstance(pipeline_instance, Pipeline): |
100 |
| - cli_utils.error( |
101 |
| - f"The given source path `{source}` does not resolve to a pipeline " |
102 |
| - "object." |
103 |
| - ) |
| 117 | + pipeline_instance = _import_pipeline(source=source) |
104 | 118 |
|
105 | 119 | parameters: Dict[str, Any] = {}
|
106 | 120 | if parameters_path:
|
@@ -176,24 +190,9 @@ def build_pipeline(
|
176 | 190 | "your source code root."
|
177 | 191 | )
|
178 | 192 |
|
179 |
| - try: |
180 |
| - pipeline_instance = source_utils.load(source) |
181 |
| - except ModuleNotFoundError as e: |
182 |
| - source_root = source_utils.get_source_root() |
183 |
| - cli_utils.error( |
184 |
| - f"Unable to import module `{e.name}`. Make sure the source path is " |
185 |
| - f"relative to your source root `{source_root}`." |
186 |
| - ) |
187 |
| - except AttributeError as e: |
188 |
| - cli_utils.error("Unable to load attribute from module: " + str(e)) |
189 |
| - |
190 |
| - if not isinstance(pipeline_instance, Pipeline): |
191 |
| - cli_utils.error( |
192 |
| - f"The given source path `{source}` does not resolve to a pipeline " |
193 |
| - "object." |
194 |
| - ) |
195 |
| - |
196 | 193 | with cli_utils.temporary_active_stack(stack_name_or_id=stack_name_or_id):
|
| 194 | + pipeline_instance = _import_pipeline(source=source) |
| 195 | + |
197 | 196 | pipeline_instance = pipeline_instance.with_options(
|
198 | 197 | config_path=config_path
|
199 | 198 | )
|
@@ -277,36 +276,21 @@ def run_pipeline(
|
277 | 276 | "your source code root."
|
278 | 277 | )
|
279 | 278 |
|
280 |
| - try: |
281 |
| - pipeline_instance = source_utils.load(source) |
282 |
| - except ModuleNotFoundError as e: |
283 |
| - source_root = source_utils.get_source_root() |
284 |
| - cli_utils.error( |
285 |
| - f"Unable to import module `{e.name}`. Make sure the source path is " |
286 |
| - f"relative to your source root `{source_root}`." |
287 |
| - ) |
288 |
| - except AttributeError as e: |
289 |
| - cli_utils.error("Unable to load attribute from module: " + str(e)) |
290 |
| - |
291 |
| - if not isinstance(pipeline_instance, Pipeline): |
292 |
| - cli_utils.error( |
293 |
| - f"The given source path `{source}` does not resolve to a pipeline " |
294 |
| - "object." |
295 |
| - ) |
296 |
| - |
297 |
| - build: Union[str, PipelineBuildBase, None] = None |
298 |
| - if build_path_or_id: |
299 |
| - if uuid_utils.is_valid_uuid(build_path_or_id): |
300 |
| - build = build_path_or_id |
301 |
| - elif os.path.exists(build_path_or_id): |
302 |
| - build = PipelineBuildBase.from_yaml(build_path_or_id) |
303 |
| - else: |
304 |
| - cli_utils.error( |
305 |
| - f"The specified build {build_path_or_id} is not a valid UUID " |
306 |
| - "or file path." |
307 |
| - ) |
308 |
| - |
309 | 279 | with cli_utils.temporary_active_stack(stack_name_or_id=stack_name_or_id):
|
| 280 | + pipeline_instance = _import_pipeline(source=source) |
| 281 | + |
| 282 | + build: Union[str, PipelineBuildBase, None] = None |
| 283 | + if build_path_or_id: |
| 284 | + if uuid_utils.is_valid_uuid(build_path_or_id): |
| 285 | + build = build_path_or_id |
| 286 | + elif os.path.exists(build_path_or_id): |
| 287 | + build = PipelineBuildBase.from_yaml(build_path_or_id) |
| 288 | + else: |
| 289 | + cli_utils.error( |
| 290 | + f"The specified build {build_path_or_id} is not a valid UUID " |
| 291 | + "or file path." |
| 292 | + ) |
| 293 | + |
310 | 294 | pipeline_instance = pipeline_instance.with_options(
|
311 | 295 | config_path=config_path,
|
312 | 296 | build=build,
|
@@ -369,24 +353,9 @@ def create_run_template(
|
369 | 353 | "init` at your source code root."
|
370 | 354 | )
|
371 | 355 |
|
372 |
| - try: |
373 |
| - pipeline_instance = source_utils.load(source) |
374 |
| - except ModuleNotFoundError as e: |
375 |
| - source_root = source_utils.get_source_root() |
376 |
| - cli_utils.error( |
377 |
| - f"Unable to import module `{e.name}`. Make sure the source path is " |
378 |
| - f"relative to your source root `{source_root}`." |
379 |
| - ) |
380 |
| - except AttributeError as e: |
381 |
| - cli_utils.error("Unable to load attribute from module: " + str(e)) |
382 |
| - |
383 |
| - if not isinstance(pipeline_instance, Pipeline): |
384 |
| - cli_utils.error( |
385 |
| - f"The given source path `{source}` does not resolve to a pipeline " |
386 |
| - "object." |
387 |
| - ) |
388 |
| - |
389 | 356 | with cli_utils.temporary_active_stack(stack_name_or_id=stack_name_or_id):
|
| 357 | + pipeline_instance = _import_pipeline(source=source) |
| 358 | + |
390 | 359 | pipeline_instance = pipeline_instance.with_options(
|
391 | 360 | config_path=config_path
|
392 | 361 | )
|
|
0 commit comments