From bf04ddc6d0fceacedd6b1f256ce1b1132193bcc4 Mon Sep 17 00:00:00 2001 From: deepanker13 Date: Fri, 5 Jan 2024 00:23:15 +0530 Subject: [PATCH] code review changes --- .../kubeflow/kubeflow-training-roles.yaml | 10 +++ .../kubeflow/training/api/training_client.py | 78 +++++++++++++------ .../kubeflow/training/constants/constants.py | 4 +- sdk/python/kubeflow/training/utils/utils.py | 12 ++- sdk/python/setup.py | 3 + 5 files changed, 79 insertions(+), 28 deletions(-) diff --git a/manifests/overlays/kubeflow/kubeflow-training-roles.yaml b/manifests/overlays/kubeflow/kubeflow-training-roles.yaml index 2ea5740f92..434fcb723c 100644 --- a/manifests/overlays/kubeflow/kubeflow-training-roles.yaml +++ b/manifests/overlays/kubeflow/kubeflow-training-roles.yaml @@ -47,6 +47,16 @@ rules: - paddlejobs/status verbs: - get + - apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - create + - delete + - get + - list + - watch --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 38af8d7969..5b1440751d 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -30,7 +30,9 @@ HuggingFaceModelParams, HuggingFaceTrainParams, HfDatasetParams, + INIT_CONTAINER_MOUNT_PATH, ) +import os logger = logging.getLogger(__name__) @@ -95,45 +97,51 @@ def train( namespace: str = None, num_workers: int = 1, num_procs_per_worker: int = 1, - storage_config: Dict[Literal["size", "storage_class"], str] = None, + storage_config: Dict[Literal["size", "storage_class"], str] = {"size": "10Gi"}, model_provider_parameters: HuggingFaceModelParams = None, dataset_provider_parameters: Union[HfDatasetParams, S3DatasetParams] = None, train_parameters: HuggingFaceTrainParams = None, - resources_per_worker: Dict[Literal["gpu", "cpu", "memory"], any] = None, + resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = { + "cpu": 1, + "memory": "2Gi", + }, + # Dict[Literal["gpu", "cpu", "memory"], any] = None, ): """ Higher level train api """ if ( not name - or not storage_config or not model_provider_parameters or not dataset_provider_parameters or not train_parameters - or not resources_per_worker ): raise ValueError("One of the required parameters is None") namespace = namespace or self.namespace - if "cpu" not in resources_per_worker or "memory" not in resources_per_worker: - raise ValueError("cpu and memory resources not specified") - else: - limits = { - "cpu": resources_per_worker["cpu"], - "memory": resources_per_worker["memory"], - } + if isinstance(resources_per_worker, dict): + if "gpu" in resources_per_worker: + resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop("gpu") - if ( - resources_per_worker["gpu"] is not None - and num_procs_per_worker > resources_per_worker["gpu"] - ) or (resources_per_worker["gpu"] is None and num_procs_per_worker != 0): - raise ValueError("Insufficient gpu resources allocated to the container.") + if ( + resources_per_worker["gpu"] is not None + and num_procs_per_worker > resources_per_worker["gpu"] + ) or (resources_per_worker["gpu"] is None and num_procs_per_worker != 0): + raise ValueError( + "Insufficient gpu resources allocated to the container." + ) - if "gpu" in resources_per_worker: - limits["nvidia.com/gpu"] = resources_per_worker["gpu"] + if ( + "cpu" not in resources_per_worker + or "memory" not in resources_per_worker + ): + raise ValueError("cpu and memory resources not specified") - requests = limits.copy() + resources_per_worker = client.V1ResourceRequirements( + requests=resources_per_worker, + limits=resources_per_worker, + ) try: self.core_api.create_namespaced_persistent_volume_claim( @@ -146,7 +154,7 @@ def train( ), ) except Exception as e: - print(e) + raise RuntimeError("failed to create pvc") if isinstance(model_provider_parameters, HuggingFaceModelParams): mp = "hf" @@ -171,7 +179,10 @@ def train( json.dumps(dataset_provider_parameters.__dict__), ], volume_mounts=[ - models.V1VolumeMount(name=constants.TRAINER_PV, mount_path="/workspace") + models.V1VolumeMount( + name=constants.TRAINER_PV, + mount_path=INIT_CONTAINER_MOUNT_PATH, + ) ], ) @@ -179,11 +190,29 @@ def train( container_spec = utils.get_container_spec( name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"], image=constants.TRAINER_TRANSFORMER_IMAGE, - args=["--train_parameters", json.dumps(train_parameters.__dict__)], + args=[ + "--model_uri", + model_provider_parameters.model_uri, + "--transformer_type", + model_provider_parameters.transformer_type, + "--model_dir", + model_provider_parameters.download_dir, + "--dataset_dir", + dataset_provider_parameters.download_dir, + "--dataset_name", + dataset_provider_parameters.repo_id, + "--lora_config", + json.dumps(train_parameters.lora_config.__dict__, cls=utils.SetEncoder), + "--training_parameters", + json.dumps(train_parameters.training_parameters.to_dict()), + ], volume_mounts=[ - models.V1VolumeMount(name=constants.TRAINER_PV, mount_path="/workspace") + models.V1VolumeMount( + name=constants.TRAINER_PV, + mount_path=constants.TRAINER_CONTAINER_MOUNT_PATH, + ) ], - resources=models.V1ResourceRequirements(requests=requests, limits=limits), + resources=resources_per_worker, ) # create worker pod spec @@ -221,7 +250,6 @@ def train( worker_pod_template_spec=worker_pod_template_spec, num_worker_replicas=num_workers - 1, num_procs_per_worker=num_procs_per_worker, - elastic_policy=models.KubeflowOrgV1ElasticPolicy(rdzv_backend="c10d"), ) self.create_job(job, namespace=namespace) diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index bfcc86c4f2..0219c1bd2b 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -80,7 +80,7 @@ PYTORCHJOB_CONTAINER = "pytorch" PYTORCHJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower()) PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime" -STORAGE_CONTAINER = "storage" +STORAGE_CONTAINER = "storage-initializer" STORAGE_CONTAINER_IMAGE = "quay.io/deepanker_gupta/storage:v1" TRAINER_TRANSFORMER_IMAGE = "quay.io/deepanker_gupta/trainer:v1" TRAINER_PVC_NAME = "storage-initializer" @@ -168,3 +168,5 @@ models.KubeflowOrgV1MPIJob, models.KubeflowOrgV1PaddleJob, ] + +TRAINER_CONTAINER_MOUNT_PATH = "/workspace" diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 5ea53ec6c9..026da2c546 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -131,7 +131,7 @@ def get_container_spec( raise ValueError("container name or image cannot be none") container_spec = models.V1Container(name=name, image=image) - + container_spec.image_pull_policy = "Always" if args: container_spec.args = args @@ -175,7 +175,8 @@ def get_pod_template_spec( name=constants.JOB_PARAMETERS[job_kind]["container"], image=base_image, ) - ] + ], + image_pull_secrets=[models.V1LocalObjectReference(name="regcred")], ), ) @@ -365,3 +366,10 @@ def get_pvc_spec( pvc_spec.spec.storage_class_name = storage_class return pvc_spec + + +class SetEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index c584fa38ce..e92f3ff06e 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -29,6 +29,9 @@ "urllib3>=1.15.1", "kubernetes>=23.6.0", "retrying>=1.3.3", + "boto3>=1.33.9", + "transformers>=4.35.2", + "peft>=0.7.0", ] setuptools.setup(