Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanker13 committed Jan 4, 2024
1 parent 7802b2d commit bf04ddc
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 28 deletions.
10 changes: 10 additions & 0 deletions manifests/overlays/kubeflow/kubeflow-training-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ rules:
- paddlejobs/status
verbs:
- get
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- create
- delete
- get
- list
- watch

---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
78 changes: 53 additions & 25 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
HuggingFaceModelParams,
HuggingFaceTrainParams,
HfDatasetParams,
INIT_CONTAINER_MOUNT_PATH,
)
import os

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,45 +97,51 @@ def train(
namespace: str = None,
num_workers: int = 1,
num_procs_per_worker: int = 1,
storage_config: Dict[Literal["size", "storage_class"], str] = None,
storage_config: Dict[Literal["size", "storage_class"], str] = {"size": "10Gi"},
model_provider_parameters: HuggingFaceModelParams = None,
dataset_provider_parameters: Union[HfDatasetParams, S3DatasetParams] = None,
train_parameters: HuggingFaceTrainParams = None,
resources_per_worker: Dict[Literal["gpu", "cpu", "memory"], any] = None,
resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = {
"cpu": 1,
"memory": "2Gi",
},
# Dict[Literal["gpu", "cpu", "memory"], any] = None,
):
"""
Higher level train api
"""
if (
not name
or not storage_config
or not model_provider_parameters
or not dataset_provider_parameters
or not train_parameters
or not resources_per_worker
):
raise ValueError("One of the required parameters is None")

namespace = namespace or self.namespace

if "cpu" not in resources_per_worker or "memory" not in resources_per_worker:
raise ValueError("cpu and memory resources not specified")
else:
limits = {
"cpu": resources_per_worker["cpu"],
"memory": resources_per_worker["memory"],
}
if isinstance(resources_per_worker, dict):
if "gpu" in resources_per_worker:
resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop("gpu")

if (
resources_per_worker["gpu"] is not None
and num_procs_per_worker > resources_per_worker["gpu"]
) or (resources_per_worker["gpu"] is None and num_procs_per_worker != 0):
raise ValueError("Insufficient gpu resources allocated to the container.")
if (
resources_per_worker["gpu"] is not None
and num_procs_per_worker > resources_per_worker["gpu"]
) or (resources_per_worker["gpu"] is None and num_procs_per_worker != 0):
raise ValueError(
"Insufficient gpu resources allocated to the container."
)

if "gpu" in resources_per_worker:
limits["nvidia.com/gpu"] = resources_per_worker["gpu"]
if (
"cpu" not in resources_per_worker
or "memory" not in resources_per_worker
):
raise ValueError("cpu and memory resources not specified")

requests = limits.copy()
resources_per_worker = client.V1ResourceRequirements(
requests=resources_per_worker,
limits=resources_per_worker,
)

try:
self.core_api.create_namespaced_persistent_volume_claim(
Expand All @@ -146,7 +154,7 @@ def train(
),
)
except Exception as e:
print(e)
raise RuntimeError("failed to create pvc")

if isinstance(model_provider_parameters, HuggingFaceModelParams):
mp = "hf"
Expand All @@ -171,19 +179,40 @@ def train(
json.dumps(dataset_provider_parameters.__dict__),
],
volume_mounts=[
models.V1VolumeMount(name=constants.TRAINER_PV, mount_path="/workspace")
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=INIT_CONTAINER_MOUNT_PATH,
)
],
)

# create app container spec
container_spec = utils.get_container_spec(
name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"],
image=constants.TRAINER_TRANSFORMER_IMAGE,
args=["--train_parameters", json.dumps(train_parameters.__dict__)],
args=[
"--model_uri",
model_provider_parameters.model_uri,
"--transformer_type",
model_provider_parameters.transformer_type,
"--model_dir",
model_provider_parameters.download_dir,
"--dataset_dir",
dataset_provider_parameters.download_dir,
"--dataset_name",
dataset_provider_parameters.repo_id,
"--lora_config",
json.dumps(train_parameters.lora_config.__dict__, cls=utils.SetEncoder),
"--training_parameters",
json.dumps(train_parameters.training_parameters.to_dict()),
],
volume_mounts=[
models.V1VolumeMount(name=constants.TRAINER_PV, mount_path="/workspace")
models.V1VolumeMount(
name=constants.TRAINER_PV,
mount_path=constants.TRAINER_CONTAINER_MOUNT_PATH,
)
],
resources=models.V1ResourceRequirements(requests=requests, limits=limits),
resources=resources_per_worker,
)

# create worker pod spec
Expand Down Expand Up @@ -221,7 +250,6 @@ def train(
worker_pod_template_spec=worker_pod_template_spec,
num_worker_replicas=num_workers - 1,
num_procs_per_worker=num_procs_per_worker,
elastic_policy=models.KubeflowOrgV1ElasticPolicy(rdzv_backend="c10d"),
)

self.create_job(job, namespace=namespace)
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
PYTORCHJOB_CONTAINER = "pytorch"
PYTORCHJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower())
PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime"
STORAGE_CONTAINER = "storage"
STORAGE_CONTAINER = "storage-initializer"
STORAGE_CONTAINER_IMAGE = "quay.io/deepanker_gupta/storage:v1"
TRAINER_TRANSFORMER_IMAGE = "quay.io/deepanker_gupta/trainer:v1"
TRAINER_PVC_NAME = "storage-initializer"
Expand Down Expand Up @@ -168,3 +168,5 @@
models.KubeflowOrgV1MPIJob,
models.KubeflowOrgV1PaddleJob,
]

TRAINER_CONTAINER_MOUNT_PATH = "/workspace"
12 changes: 10 additions & 2 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_container_spec(
raise ValueError("container name or image cannot be none")

container_spec = models.V1Container(name=name, image=image)

container_spec.image_pull_policy = "Always"
if args:
container_spec.args = args

Expand Down Expand Up @@ -175,7 +175,8 @@ def get_pod_template_spec(
name=constants.JOB_PARAMETERS[job_kind]["container"],
image=base_image,
)
]
],
image_pull_secrets=[models.V1LocalObjectReference(name="regcred")],
),
)

Expand Down Expand Up @@ -365,3 +366,10 @@ def get_pvc_spec(
pvc_spec.spec.storage_class_name = storage_class

return pvc_spec


class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
3 changes: 3 additions & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"urllib3>=1.15.1",
"kubernetes>=23.6.0",
"retrying>=1.3.3",
"boto3>=1.33.9",
"transformers>=4.35.2",
"peft>=0.7.0",
]

setuptools.setup(
Expand Down

0 comments on commit bf04ddc

Please sign in to comment.