diff --git a/.github/workflows/e2e-test-tune-api.yaml b/.github/workflows/e2e-test-tune-api.yaml index 4778a866049..998ced775f9 100644 --- a/.github/workflows/e2e-test-tune-api.yaml +++ b/.github/workflows/e2e-test-tune-api.yaml @@ -21,11 +21,12 @@ jobs: uses: ./.github/workflows/template-setup-e2e-test with: kubernetes-version: ${{ matrix.kubernetes-version }} - + - name: Run e2e test with tune API uses: ./.github/workflows/template-e2e-test with: tune-api: true + training-operator: true strategy: fail-fast: false diff --git a/.gitignore b/.gitignore index 3e41b2b726a..a1984c4203f 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,6 @@ $RECYCLE.BIN/ ## Vendor dir vendor + +# Jupyter Notebooks. +**/.ipynb_checkpoints diff --git a/hack/gen-python-sdk/post_gen.py b/hack/gen-python-sdk/post_gen.py index f61f6c8d227..3c3fc037a9b 100644 --- a/hack/gen-python-sdk/post_gen.py +++ b/hack/gen-python-sdk/post_gen.py @@ -42,9 +42,7 @@ def _rewrite_helper(input_file, output_file, rewrite_rules): lines.append("# Import Katib API client.\n") lines.append("from kubeflow.katib.api.katib_client import KatibClient\n") lines.append("# Import Katib TrainerResources class.\n") - lines.append( - "from kubeflow.katib.types.trainer_resources import TrainerResources\n" - ) + lines.append("from kubeflow.katib.types.types import TrainerResources\n") lines.append("# Import Katib report metrics functions\n") lines.append("from kubeflow.katib.api.report_metrics import report_metrics\n") lines.append("# Import Katib helper functions.\n") diff --git a/pkg/apis/v1beta1/swagger.json b/pkg/apis/v1beta1/swagger.json index bd2dfa25eab..1018de4d26c 100644 --- a/pkg/apis/v1beta1/swagger.json +++ b/pkg/apis/v1beta1/swagger.json @@ -132,7 +132,7 @@ "$ref": "#/definitions/v1beta1.AlgorithmSetting" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -148,7 +148,7 @@ "$ref": "#/definitions/.v1beta1.SuggestionCondition" }, "x-kubernetes-list-map-keys": [ - "Type" + "type" ], "x-kubernetes-list-type": "map" }, @@ -173,7 +173,7 @@ "$ref": "#/definitions/.v1beta1.TrialAssignment" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -217,7 +217,7 @@ "$ref": "#/definitions/v1beta1.EarlyStoppingRule" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -241,7 +241,7 @@ "$ref": "#/definitions/v1beta1.ParameterAssignment" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -323,7 +323,7 @@ "$ref": "#/definitions/v1beta1.EarlyStoppingRule" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -356,7 +356,7 @@ "$ref": "#/definitions/v1beta1.ParameterAssignment" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -402,7 +402,7 @@ "$ref": "#/definitions/.v1beta1.TrialCondition" }, "x-kubernetes-list-map-keys": [ - "Type" + "type" ], "x-kubernetes-list-type": "map" }, @@ -450,7 +450,7 @@ "$ref": "#/definitions/v1beta1.AlgorithmSetting" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -539,7 +539,7 @@ "$ref": "#/definitions/v1beta1.EarlyStoppingSetting" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -681,7 +681,7 @@ "$ref": "#/definitions/v1beta1.ParameterSpec" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -711,7 +711,7 @@ "$ref": "#/definitions/v1beta1.ExperimentCondition" }, "x-kubernetes-list-map-keys": [ - "Type" + "type" ], "x-kubernetes-list-type": "map" }, @@ -968,7 +968,7 @@ "$ref": "#/definitions/v1beta1.Operation" }, "x-kubernetes-list-map-keys": [ - "OperationType" + "operationType" ], "x-kubernetes-list-type": "map" } @@ -1000,7 +1000,7 @@ "$ref": "#/definitions/v1beta1.MetricStrategy" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, @@ -1025,7 +1025,7 @@ "$ref": "#/definitions/v1beta1.Metric" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -1045,7 +1045,7 @@ "$ref": "#/definitions/v1beta1.ParameterSpec" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -1072,7 +1072,7 @@ "$ref": "#/definitions/v1beta1.ParameterAssignment" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" } @@ -1193,7 +1193,7 @@ "$ref": "#/definitions/v1beta1.TrialParameterSpec" }, "x-kubernetes-list-map-keys": [ - "Name" + "name" ], "x-kubernetes-list-type": "map" }, diff --git a/pkg/apis/v1beta1/zz_generated.openapi.go b/pkg/apis/v1beta1/zz_generated.openapi.go index 57d147b29db..6942a57603c 100644 --- a/pkg/apis/v1beta1/zz_generated.openapi.go +++ b/pkg/apis/v1beta1/zz_generated.openapi.go @@ -117,7 +117,7 @@ func schema_apis_controller_common_v1beta1_AlgorithmSpec(ref common.ReferenceCal VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -255,7 +255,7 @@ func schema_apis_controller_common_v1beta1_EarlyStoppingSpec(ref common.Referenc VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -479,7 +479,7 @@ func schema_apis_controller_common_v1beta1_ObjectiveSpec(ref common.ReferenceCal VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -515,7 +515,7 @@ func schema_apis_controller_common_v1beta1_Observation(ref common.ReferenceCallb VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -796,7 +796,7 @@ func schema_apis_controller_experiments_v1beta1_ExperimentSpec(ref common.Refere VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -914,7 +914,7 @@ func schema_apis_controller_experiments_v1beta1_ExperimentStatus(ref common.Refe VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Type", + "type", }, "x-kubernetes-list-type": "map", }, @@ -1272,7 +1272,7 @@ func schema_apis_controller_experiments_v1beta1_NasConfig(ref common.ReferenceCa VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "OperationType", + "operationType", }, "x-kubernetes-list-type": "map", }, @@ -1314,7 +1314,7 @@ func schema_apis_controller_experiments_v1beta1_Operation(ref common.ReferenceCa VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1357,7 +1357,7 @@ func schema_apis_controller_experiments_v1beta1_OptimalTrial(ref common.Referenc VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1513,7 +1513,7 @@ func schema_apis_controller_experiments_v1beta1_TrialTemplate(ref common.Referen VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1782,7 +1782,7 @@ func schema_apis_controller_suggestions_v1beta1_SuggestionStatus(ref common.Refe VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1811,7 +1811,7 @@ func schema_apis_controller_suggestions_v1beta1_SuggestionStatus(ref common.Refe VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1851,7 +1851,7 @@ func schema_apis_controller_suggestions_v1beta1_SuggestionStatus(ref common.Refe VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Type", + "type", }, "x-kubernetes-list-type": "map", }, @@ -1888,7 +1888,7 @@ func schema_apis_controller_suggestions_v1beta1_TrialAssignment(ref common.Refer VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -1917,7 +1917,7 @@ func schema_apis_controller_suggestions_v1beta1_TrialAssignment(ref common.Refer VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -2130,7 +2130,7 @@ func schema_apis_controller_trials_v1beta1_TrialSpec(ref common.ReferenceCallbac VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -2152,7 +2152,7 @@ func schema_apis_controller_trials_v1beta1_TrialSpec(ref common.ReferenceCallbac VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Name", + "name", }, "x-kubernetes-list-type": "map", }, @@ -2280,7 +2280,7 @@ func schema_apis_controller_trials_v1beta1_TrialStatus(ref common.ReferenceCallb VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ "x-kubernetes-list-map-keys": []interface{}{ - "Type", + "type", }, "x-kubernetes-list-type": "map", }, diff --git a/sdk/python/v1beta1/kubeflow/katib/__init__.py b/sdk/python/v1beta1/kubeflow/katib/__init__.py index c6ca7dda3b9..3a146f997bd 100644 --- a/sdk/python/v1beta1/kubeflow/katib/__init__.py +++ b/sdk/python/v1beta1/kubeflow/katib/__init__.py @@ -72,7 +72,7 @@ # Import Katib API client. from kubeflow.katib.api.katib_client import KatibClient # Import Katib TrainerResources class. -from kubeflow.katib.types.trainer_resources import TrainerResources +from kubeflow.katib.types.types import TrainerResources # Import Katib report metrics functions from kubeflow.katib.api.report_metrics import report_metrics # Import Katib helper functions. diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index caf2a45aaca..32a8f0dace5 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -24,8 +24,20 @@ from kubeflow.katib import models from kubeflow.katib.api_client import ApiClient from kubeflow.katib.constants import constants -from kubeflow.katib.types.trainer_resources import TrainerResources +from kubeflow.katib.types.types import TrainerResources from kubeflow.katib.utils import utils +from kubeflow.training.constants.constants import ( + DEFAULT_COMMAND, + ENTRYPOINT_PYTHON, + ENTRYPOINT_TORCH, + JOB_PARAMETERS, + PYTORCHJOB_KIND, + STORAGE_INITIALIZER, + STORAGE_INITIALIZER_IMAGE, + STORAGE_INITIALIZER_VOLUME_MOUNT, + TRAINER_TRANSFORMER_IMAGE, +) +from kubeflow.training.utils import utils as training_utils from kubernetes import client, config logger = logging.getLogger(__name__) @@ -180,7 +192,7 @@ def tune( "access_modes": constants.PVC_DEFAULT_ACCESS_MODES, }, objective: Optional[Callable] = None, - base_image: Optional[str] = constants.BASE_IMAGE_TENSORFLOW, + base_image: str = constants.BASE_IMAGE_PYTORCH, parameters: Optional[Dict[str, Any]] = None, namespace: Optional[str] = None, env_per_trial: Optional[ @@ -311,8 +323,10 @@ class name in this argument. GPU, pass in a V1ResourceRequirement instance instead, since it's more flexible. This parameter is optional and defaults to None. - For external models and datasets, you can specify a TrainerResources object, - which includes `num_workers`, `num_procs_per_worker`, and `resources_per_worker`. + You should specify a TrainerResources as Trial resources if you use PyTorchJob as + Katib Trial for distributed training. This is mandatory parameter if you use + LLM `trainer_parameters`. The TrainerResources includes `num_workers`, + `num_procs_per_worker`, and `resources_per_worker`. For example: ``` resources_per_trial = TrainerResources( @@ -428,28 +442,46 @@ class name in this argument. ) # Iterate over input parameters and do substitutions. - experiment_params = [] - trial_params = [] + experiment_parameters = [] + trial_parameters = [] input_params = utils.get_trial_substitutions_from_dict( - parameters, experiment_params, trial_params + parameters, experiment_parameters, trial_parameters ) + # For the distributed training the entrypoint is `torchrun`, else is `python -u` + if isinstance(resources_per_trial, TrainerResources) and ( + resources_per_trial.num_workers > 1 + or resources_per_trial.num_procs_per_worker > 1 + ): + entrypoint = ENTRYPOINT_TORCH + else: + entrypoint = ENTRYPOINT_PYTHON # Get the execution script from the objective function. exec_script = utils.get_exec_script_from_objective( - objective, input_params, packages_to_install, pip_index_url + objective, + entrypoint, + input_params, + packages_to_install, + pip_index_url, ) - if isinstance(resources_per_trial, dict): - if "gpu" in resources_per_trial: - resources_per_trial["nvidia.com/gpu"] = resources_per_trial.pop( - "gpu" - ) - - resources_per_trial = client.V1ResourceRequirements( - requests=resources_per_trial, - limits=resources_per_trial, - ) - + # Generate container spec for PyTorchJob or Job. + container_spec = training_utils.get_container_spec( + name=( + JOB_PARAMETERS[PYTORCHJOB_KIND]["container"] + if isinstance(resources_per_trial, TrainerResources) + else constants.DEFAULT_PRIMARY_CONTAINER_NAME + ), + base_image=base_image, + command=DEFAULT_COMMAND, + args=[exec_script], + resources=( + resources_per_trial.resources_per_worker + if isinstance(resources_per_trial, TrainerResources) + else resources_per_trial + ), + ) + # TODO (andreyvelich): get_container_spec should support EnvFromSource envs. env = [] env_from = [] if isinstance(env_per_trial, dict): @@ -468,40 +500,25 @@ class name in this argument. f"Incorrect value for env_per_trial: {env_per_trial}" ) - # Create Trial specification. - trial_spec = client.V1Job( - api_version="batch/v1", - kind="Job", - spec=client.V1JobSpec( - template=client.V1PodTemplateSpec( - metadata=models.V1ObjectMeta( - annotations={"sidecar.istio.io/inject": "false"} - ), - spec=client.V1PodSpec( - restart_policy="Never", - containers=[ - client.V1Container( - name=constants.DEFAULT_PRIMARY_CONTAINER_NAME, - image=base_image, - command=["bash", "-c"], - args=[exec_script], - env=env if env else None, - env_from=env_from if env_from else None, - resources=resources_per_trial, - ) - ], - ), - ) - ), - ) - - # Create Trial template. - trial_template = models.V1beta1TrialTemplate( - primary_container_name=constants.DEFAULT_PRIMARY_CONTAINER_NAME, - retain=retain_trials, - trial_parameters=trial_params, - trial_spec=trial_spec, - ) + container_spec.env = env if env else None + container_spec.env_from = env_from if env_from else None + + # Trial uses PyTorchJob for distributed training if TrainerResources is set. + if isinstance(resources_per_trial, TrainerResources): + trial_template = utils.get_trial_template_with_pytorchjob( + retain_trials, + trial_parameters, + resources_per_trial, + training_utils.get_pod_template_spec(containers=[container_spec]), + training_utils.get_pod_template_spec(containers=[container_spec]), + ) + # Otherwise, Trial uses Job for model training. + else: + trial_template = utils.get_trial_template_with_job( + retain_trials, + trial_parameters, + training_utils.get_pod_template_spec(containers=[container_spec]), + ) # If users choose to use external models and datasets. else: @@ -509,6 +526,7 @@ class name in this argument. not model_provider_parameters or not dataset_provider_parameters or not trainer_parameters + or not isinstance(resources_per_trial, TrainerResources) ): raise ValueError("One of the required parameters is None") @@ -523,16 +541,6 @@ class name in this argument. HuggingFaceTrainerParams, ) from kubeflow.storage_initializer.s3 import S3DatasetParams - from kubeflow.training import models as training_models - from kubeflow.training.constants.constants import ( - JOB_PARAMETERS, - PYTORCHJOB_KIND, - STORAGE_INITIALIZER, - STORAGE_INITIALIZER_IMAGE, - STORAGE_INITIALIZER_VOLUME_MOUNT, - TRAINER_TRANSFORMER_IMAGE, - ) - from kubeflow.training.utils import utils as training_utils except ImportError: raise ImportError( "LLM dependencies for Tune API are not installed. " @@ -603,13 +611,15 @@ class name in this argument. ) # Iterate over input parameters and do substitutions. - experiment_params = [] - trial_params = [] + experiment_parameters = [] + trial_parameters = [] training_args = utils.get_trial_substitutions_from_trainer( - trainer_parameters.training_parameters, experiment_params, trial_params + trainer_parameters.training_parameters, + experiment_parameters, + trial_parameters, ) lora_config = utils.get_trial_substitutions_from_trainer( - trainer_parameters.lora_config, experiment_params, trial_params + trainer_parameters.lora_config, experiment_parameters, trial_parameters ) # Create the init and the primary container. @@ -653,7 +663,7 @@ class name in this argument. volume_mounts=[STORAGE_INITIALIZER_VOLUME_MOUNT], resources=( resources_per_trial.resources_per_worker - if resources_per_trial + if isinstance(resources_per_trial, TrainerResources) else None ), ) @@ -677,51 +687,17 @@ class name in this argument. volumes=[storage_initializer_volume], ) - # Create PyTorchJob. - pytorchjob = training_models.KubeflowOrgV1PyTorchJob( - api_version="kubeflow.org/v1", - kind="PyTorchJob", - spec=training_models.KubeflowOrgV1PyTorchJobSpec( - run_policy=training_models.KubeflowOrgV1RunPolicy( - clean_pod_policy=None - ), - pytorch_replica_specs={}, - ), - ) - - if ( - resources_per_trial is not None - and resources_per_trial.num_procs_per_worker - ): - pytorchjob.spec.nproc_per_node = str( - resources_per_trial.num_procs_per_worker - ) - - pytorchjob.spec.pytorch_replica_specs["Master"] = ( - training_models.KubeflowOrgV1ReplicaSpec( - replicas=1, - template=master_pod_template_spec, - ) - ) - - if resources_per_trial is not None and resources_per_trial.num_workers > 1: - pytorchjob.spec.pytorch_replica_specs["Worker"] = ( - training_models.KubeflowOrgV1ReplicaSpec( - replicas=resources_per_trial.num_workers - 1, - template=worker_pod_template_spec, - ) - ) - - # Create Trial template. - trial_template = models.V1beta1TrialTemplate( - primary_container_name=JOB_PARAMETERS[PYTORCHJOB_KIND]["container"], - retain=retain_trials, - trial_parameters=trial_params, - trial_spec=pytorchjob, + # Generate Trial template using the PyTorchJob. + trial_template = utils.get_trial_template_with_pytorchjob( + retain_trials, + trial_parameters, + resources_per_trial, + worker_pod_template_spec, + master_pod_template_spec, ) # Add parameters to the Katib Experiment. - experiment.spec.parameters = experiment_params + experiment.spec.parameters = experiment_parameters # Add Trial template to the Katib Experiment. experiment.spec.trial_template = trial_template diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py index f6a17017c34..db0af56f8c0 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py @@ -18,12 +18,14 @@ V1beta1TrialTemplate, ) from kubeflow.katib.constants import constants +from kubeflow.katib.types import types from kubeflow.storage_initializer.hugging_face import ( HuggingFaceDatasetParams, HuggingFaceModelParams, HuggingFaceTrainerParams, ) -from kubernetes.client import V1ObjectMeta +from kubeflow.training.models import KubeflowOrgV1PyTorchJob +from kubernetes.client import V1Job, V1ObjectMeta PVC_FAILED = "pvc creation failed" @@ -476,16 +478,37 @@ def create_experiment( learning_rate=katib.search.double(min=1e-05, max=5e-05), ), ), + "resources_per_trial": types.TrainerResources( + num_workers=2, + num_procs_per_worker=2, + resources_per_worker={"gpu": "2"}, + ), }, RuntimeError, ), ( - "valid flow with custom objective tuning", + "valid flow with custom objective function and Job as Trial", { "name": "tune_test", "objective": lambda x: print(f"a={x}"), "parameters": {"a": katib.search.int(min=10, max=100)}, "objective_metric_name": "a", + "resources_per_trial": {"gpu": "2"}, + }, + TEST_RESULT_SUCCESS, + ), + ( + "valid flow with custom objective function and PyTorchJob as Trial", + { + "name": "tune_test", + "objective": lambda x: print(f"a={x}"), + "parameters": {"a": katib.search.int(min=10, max=100)}, + "objective_metric_name": "a", + "resources_per_trial": types.TrainerResources( + num_workers=2, + num_procs_per_worker=2, + resources_per_worker={"gpu": "2"}, + ), }, TEST_RESULT_SUCCESS, ), @@ -508,6 +531,11 @@ def create_experiment( learning_rate=katib.search.double(min=1e-05, max=5e-05), ), ), + "resources_per_trial": types.TrainerResources( + num_workers=2, + num_procs_per_worker=2, + resources_per_worker={"gpu": "2"}, + ), "objective_metric_name": "train_loss", "objective_type": "minimize", }, @@ -597,7 +625,10 @@ def test_tune(katib_client, test_name, kwargs, expected_output): call_args = mock_create_experiment.call_args experiment = call_args[0][0] - if test_name == "valid flow with custom objective tuning": + if ( + test_name + == "valid flow with custom objective function and Job as Trial" + ): # Verify input_params args_content = "".join( experiment.spec.trial_template.trial_spec.spec.template.spec.containers[ @@ -623,6 +654,18 @@ def test_tune(katib_client, test_name, kwargs, expected_output): objective_metric_name="a", additional_metric_names=[], ) + # Verity Trial spec + assert isinstance(experiment.spec.trial_template.trial_spec, V1Job) + + elif ( + test_name + == "valid flow with custom objective function and PyTorchJob as Trial" + ): + # Verity Trial spec + assert isinstance( + experiment.spec.trial_template.trial_spec, + KubeflowOrgV1PyTorchJob, + ) elif test_name == "valid flow with external model tuning": # Verify input_params diff --git a/sdk/python/v1beta1/kubeflow/katib/types/trainer_resources.py b/sdk/python/v1beta1/kubeflow/katib/types/trainer_resources.py deleted file mode 100644 index 87bbbbf67fc..00000000000 --- a/sdk/python/v1beta1/kubeflow/katib/types/trainer_resources.py +++ /dev/null @@ -1,10 +0,0 @@ -class TrainerResources(object): - def __init__( - self, - num_workers=None, - num_procs_per_worker=None, - resources_per_worker=None, - ): - self.num_workers = num_workers - self.num_procs_per_worker = num_procs_per_worker - self.resources_per_worker = resources_per_worker diff --git a/sdk/python/v1beta1/kubeflow/katib/types/types.py b/sdk/python/v1beta1/kubeflow/katib/types/types.py new file mode 100644 index 00000000000..d4c4bd2237a --- /dev/null +++ b/sdk/python/v1beta1/kubeflow/katib/types/types.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass +from typing import Dict + + +# Trainer resources for distributed training. +@dataclass +class TrainerResources: + num_workers: int + num_procs_per_worker: int + resources_per_worker: Dict[str, str] diff --git a/sdk/python/v1beta1/kubeflow/katib/utils/utils.py b/sdk/python/v1beta1/kubeflow/katib/utils/utils.py index 28f3126bbfa..1c0784fa292 100644 --- a/sdk/python/v1beta1/kubeflow/katib/utils/utils.py +++ b/sdk/python/v1beta1/kubeflow/katib/utils/utils.py @@ -22,6 +22,14 @@ from kubeflow.katib import models from kubeflow.katib.constants import constants +from kubeflow.katib.types import types +from kubeflow.training import models as training_models +from kubeflow.training.constants.constants import ( + API_VERSION, + JOB_PARAMETERS, + PYTORCHJOB_KIND, +) +from kubernetes import client logger = logging.getLogger(__name__) @@ -145,8 +153,8 @@ def default(self, obj): def get_trial_substitutions_from_dict( parameters: Dict[str, Any], - experiment_params: List[models.V1beta1ParameterSpec], - trial_params: List[models.V1beta1TrialParameterSpec], + experiment_parameters: List[models.V1beta1ParameterSpec], + trial_parameters: List[models.V1beta1TrialParameterSpec], ) -> Dict[str, str]: for p_name, p_value in parameters.items(): # If input parameter value is Katib Experiment parameter sample. @@ -156,10 +164,10 @@ def get_trial_substitutions_from_dict( # Add value to the Katib Experiment parameters. p_value.name = p_name - experiment_params.append(p_value) + experiment_parameters.append(p_value) # Add value to the Katib Experiment's Trial parameters. - trial_params.append( + trial_parameters.append( models.V1beta1TrialParameterSpec(name=p_name, reference=p_name) ) else: @@ -217,7 +225,8 @@ def get_trial_substitutions_from_trainer( def get_exec_script_from_objective( objective: Callable, - input_params: Dict[str, Any] = None, + entrypoint: str, + input_params: Dict[str, Any], packages_to_install: Optional[List[str]] = None, pip_index_url: str = "https://pypi.org/simple", ) -> str: @@ -247,16 +256,18 @@ def get_exec_script_from_objective( # Prepare execute script template. exec_script = textwrap.dedent( """ - program_path=$(mktemp -d) - read -r -d '' SCRIPT << EOM\n - {objective_code} - EOM - printf "%s" "$SCRIPT" > $program_path/ephemeral_objective.py - python3 -u $program_path/ephemeral_objective.py""" + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM\n + {objective_code} + EOM + printf "%s" \"$SCRIPT\" > \"$program_path/ephemeral_script.py\" + {entrypoint} \"$program_path/ephemeral_script.py\"""" ) # Add objective code to the execute script. - exec_script = exec_script.format(objective_code=objective_code) + exec_script = exec_script.format( + objective_code=objective_code, entrypoint=entrypoint + ) # Install Python packages if that is required. if packages_to_install is not None: @@ -267,3 +278,76 @@ def get_exec_script_from_objective( # Return executable script to execute objective function. return exec_script + + +def get_trial_template_with_job( + retain_trials: bool, + trial_parameters: List[models.V1beta1TrialParameterSpec], + pod_template_spec: client.V1PodTemplateSpec, +) -> models.V1beta1TrialTemplate: + """ + Get Trial template with Job as a Trial's Worker + """ + + # Restart policy must be set for the Job. + pod_template_spec.spec.restart_policy = "Never" # type: ignore + + # Use Job as a Trial spec. + job = client.V1Job( + api_version="batch/v1", + kind="Job", + spec=client.V1JobSpec(template=pod_template_spec), + ) + + trial_template = models.V1beta1TrialTemplate( + primary_container_name=constants.DEFAULT_PRIMARY_CONTAINER_NAME, + retain=retain_trials, + trial_parameters=trial_parameters, + trial_spec=job, + ) + return trial_template + + +def get_trial_template_with_pytorchjob( + retain_trials: bool, + trial_parameters: List[models.V1beta1TrialParameterSpec], + resources_per_trial: types.TrainerResources, + master_pod_template_spec: models.V1PodTemplateSpec, + worker_pod_template_spec: models.V1PodTemplateSpec, +) -> models.V1beta1TrialTemplate: + """ + Get Trial template with PyTorchJob as a Trial's Worker + """ + + # Use PyTorchJob as a Trial spec. + pytorchjob = training_models.KubeflowOrgV1PyTorchJob( + api_version=API_VERSION, + kind=PYTORCHJOB_KIND, + spec=training_models.KubeflowOrgV1PyTorchJobSpec( + run_policy=training_models.KubeflowOrgV1RunPolicy(clean_pod_policy=None), + nproc_per_node=str(resources_per_trial.num_procs_per_worker), + pytorch_replica_specs={ + "Master": training_models.KubeflowOrgV1ReplicaSpec( + replicas=1, + template=master_pod_template_spec, + ) + }, + ), + ) + + # Add Worker replica if number of workers > 1 + if resources_per_trial.num_workers > 1: + pytorchjob.spec.pytorch_replica_specs["Worker"] = ( + training_models.KubeflowOrgV1ReplicaSpec( + replicas=resources_per_trial.num_workers - 1, + template=worker_pod_template_spec, + ) + ) + + trial_template = models.V1beta1TrialTemplate( + primary_container_name=JOB_PARAMETERS[PYTORCHJOB_KIND]["container"], + retain=retain_trials, + trial_parameters=trial_parameters, + trial_spec=pytorchjob, + ) + return trial_template diff --git a/sdk/python/v1beta1/setup.py b/sdk/python/v1beta1/setup.py index 04a79bcf0d4..bbdc6342413 100644 --- a/sdk/python/v1beta1/setup.py +++ b/sdk/python/v1beta1/setup.py @@ -25,6 +25,7 @@ "kubernetes>=27.2.0", "grpcio>=1.64.1", "protobuf>=4.21.12,<5", + "kubeflow-training==1.9.0", ] katib_grpc_api_file = "../../../pkg/apis/manager/v1beta1/python/api_pb2.py" @@ -86,6 +87,6 @@ ], install_requires=REQUIRES, extras_require={ - "huggingface": ["kubeflow-training[huggingface]==1.8.1"], + "huggingface": ["kubeflow-training[huggingface]==1.9.0"], }, ) diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py index c9d1cb2ee43..4f9b297458d 100644 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py @@ -2,6 +2,7 @@ import logging from kubeflow.katib import KatibClient, search +from kubeflow.katib.types.types import TrainerResources from kubernetes import client from verify import verify_experiment_results @@ -19,21 +20,19 @@ def run_e2e_experiment_create_by_tune( ): # Create Katib Experiment and wait until it is finished. logging.debug("Creating Experiment: {}/{}".format(exp_namespace, exp_name)) - + # Use the test case from get-started tutorial. # https://www.kubeflow.org/docs/components/katib/getting-started/#getting-started-with-katib-python-sdk # [1] Create an objective function. def objective(parameters): import time + time.sleep(5) result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2 print(f"result={result}") - + # [2] Create hyperparameter search space. - parameters = { - "a": search.int(min=10, max=20), - "b": search.double(min=0.1, max=0.2) - } + parameters = {"a": search.int(min=10, max=20), "b": search.double(min=0.1, max=0.2)} # [3] Create Katib Experiment with 4 Trials and 2 CPUs per Trial. # And Wait until Experiment reaches Succeeded condition. @@ -44,7 +43,7 @@ def objective(parameters): parameters=parameters, objective_metric_name="result", max_trial_count=4, - resources_per_trial={"cpu": "2"}, + resources_per_trial={"cpu": "100m"}, ) experiment = katib_client.wait_for_experiment_condition( exp_name, exp_namespace, timeout=EXPERIMENT_TIMEOUT @@ -58,13 +57,80 @@ def objective(parameters): logging.debug(katib_client.get_suggestion(exp_name, exp_namespace)) +def run_e2e_experiment_create_by_tune_pytorchjob( + katib_client: KatibClient, + exp_name: str, + exp_namespace: str, +): + # Create Katib Experiment and wait until it is finished. + logging.debug("Creating Experiment: {}/{}".format(exp_namespace, exp_name)) + + # Verify the PyTorchJob distributed. + def objective(parameters): + import os + import time + + import torch.distributed as dist + + # Setup PyTorch distributed. + dist.init_process_group(backend="gloo") + + print( + "PyTorch Dist. WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format( + dist.get_world_size(), dist.get_rank(), os.getenv("LOCAL_RANK") + ) + ) + + time.sleep(5) + # Only get results from the process with RANK=0. + if dist.get_rank() == 0: + result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2 + print(f"result={result}") + dist.destroy_process_group() + + # Create Katib Experiment with 3 Trials. Every Trial runs PyTorchJob with 2 workers. + katib_client.tune( + name=exp_name, + namespace=exp_namespace, + objective=objective, + parameters={ + "a": search.int(min=10, max=20), + "b": search.double(min=0.1, max=0.2), + }, + objective_metric_name="result", + max_trial_count=3, + parallel_trial_count=2, + resources_per_trial=TrainerResources( + num_workers=2, + num_procs_per_worker=2, + resources_per_worker={"cpu": "100m"}, + ), + ) + + experiment = katib_client.wait_for_experiment_condition( + exp_name, exp_namespace, timeout=EXPERIMENT_TIMEOUT + ) + + # Verify the Experiment results. + verify_experiment_results(katib_client, experiment, exp_name, exp_namespace) + + # Print the Experiment and Suggestion. + logging.debug(katib_client.get_experiment(exp_name, exp_namespace)) + logging.debug(katib_client.get_suggestion(exp_name, exp_namespace)) + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--namespace", type=str, required=True, help="Namespace for the Katib E2E test", + "--namespace", + type=str, + required=True, + help="Namespace for the Katib E2E test", ) parser.add_argument( - "--verbose", action="store_true", help="Verbose output for the Katib E2E test", + "--verbose", + action="store_true", + help="Verbose output for the Katib E2E test", ) args = parser.parse_args() @@ -74,9 +140,11 @@ def objective(parameters): katib_client = KatibClient() namespace_labels = client.CoreV1Api().read_namespace(args.namespace).metadata.labels - if 'katib.kubeflow.org/metrics-collector-injection' not in namespace_labels: - namespace_labels['katib.kubeflow.org/metrics-collector-injection'] = 'enabled' - client.CoreV1Api().patch_namespace(args.namespace, {'metadata': {'labels': namespace_labels}}) + if "katib.kubeflow.org/metrics-collector-injection" not in namespace_labels: + namespace_labels["katib.kubeflow.org/metrics-collector-injection"] = "enabled" + client.CoreV1Api().patch_namespace( + args.namespace, {"metadata": {"labels": namespace_labels}} + ) # Test with run_e2e_experiment_create_by_tune exp_name = "tune-example" @@ -84,10 +152,37 @@ def objective(parameters): try: run_e2e_experiment_create_by_tune(katib_client, exp_name, exp_namespace) logging.info("---------------------------------------------------------------") - logging.info(f"E2E is succeeded for Experiment created by tune: {exp_namespace}/{exp_name}") + logging.info( + f"E2E is succeeded for Experiment created by tune: {exp_namespace}/{exp_name}" + ) + except Exception as e: + logging.info("---------------------------------------------------------------") + logging.info( + f"E2E is failed for Experiment created by tune: {exp_namespace}/{exp_name}" + ) + raise e + finally: + # Delete the Experiment. + logging.info("---------------------------------------------------------------") + logging.info("---------------------------------------------------------------") + katib_client.delete_experiment(exp_name, exp_namespace) + + # Test with run_e2e_experiment_create_by_tune_pytorchjob + exp_name = "tune-example-pytorchjob" + exp_namespace = args.namespace + try: + run_e2e_experiment_create_by_tune_pytorchjob( + katib_client, exp_name, exp_namespace + ) + logging.info("---------------------------------------------------------------") + logging.info( + f"E2E is succeeded for Experiment created by tune with PyTorchJob: {exp_namespace}/{exp_name}" + ) except Exception as e: logging.info("---------------------------------------------------------------") - logging.info(f"E2E is failed for Experiment created by tune: {exp_namespace}/{exp_name}") + logging.info( + f"E2E is failed for Experiment created by tune with PyTorchJob: {exp_namespace}/{exp_name}" + ) raise e finally: # Delete the Experiment. diff --git a/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh b/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh index d0b05caf712..056ff9ab54f 100755 --- a/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh +++ b/test/e2e/v1beta1/scripts/gh-actions/setup-katib.sh @@ -25,7 +25,7 @@ DEPLOY_TRAINING_OPERATOR=${2:-false} WITH_DATABASE_TYPE=${3:-mysql} E2E_TEST_IMAGE_TAG="e2e-test" -TRAINING_OPERATOR_VERSION="v1.6.0-rc.0" +TRAINING_OPERATOR_VERSION="v1.9.0" echo "Start to install Katib" @@ -58,7 +58,7 @@ cat ../../../../../manifests/v1beta1/installs/katib-standalone/katib-config.yaml # If the user wants to deploy training operator, then use the kustomization file for training operator. if "$DEPLOY_TRAINING_OPERATOR"; then echo "Deploying Training Operator $TRAINING_OPERATOR_VERSION" - kustomize build "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=$TRAINING_OPERATOR_VERSION" | kubectl apply -f - + kubectl apply --server-side -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=$TRAINING_OPERATOR_VERSION" fi echo "Deploying Katib" diff --git a/test/unit/v1beta1/requirements.txt b/test/unit/v1beta1/requirements.txt index e6cc18aa541..440f28297e1 100644 --- a/test/unit/v1beta1/requirements.txt +++ b/test/unit/v1beta1/requirements.txt @@ -1,3 +1,3 @@ grpcio-testing==1.64.1 pytest==7.2.0 -kubeflow-training[huggingface]==1.8.1 +kubeflow-training[huggingface]==1.9.0