Skip to content

Commit

Permalink
[SDK] Adding env vars (#2285)
Browse files Browse the repository at this point in the history
* adding env vars

Signed-off-by: Tarek Abouzeid <[email protected]>

using same implementation in katib

Signed-off-by: Tarek Abouzeid <[email protected]>

updating docs

Signed-off-by: Tarek Abouzeid <[email protected]>

* applying suggestions and adding unit test

Signed-off-by: Tarek Abouzeid <[email protected]>

* reorder and fix unit test

Signed-off-by: Tarek Abouzeid <[email protected]>

* fix unit test

Signed-off-by: Tarek Abouzeid <[email protected]>

---------

Signed-off-by: Tarek Abouzeid <[email protected]>
  • Loading branch information
tarekabouzeid authored Dec 24, 2024
1 parent d7f69e8 commit 80e2097
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
33 changes: 33 additions & 0 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ def train(
model_provider_parameters=None,
dataset_provider_parameters=None,
trainer_parameters=None,
init_env_vars: Optional[
Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]]
] = None,
env_vars: Optional[
Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]]
] = None,
storage_config: Dict[str, Optional[Union[str, List[str]]]] = {
"size": constants.PVC_DEFAULT_SIZE,
"storage_class": None,
Expand Down Expand Up @@ -164,6 +170,20 @@ def train(
and HuggingFace training arguments like optimizer or number of training epochs.
This argument must be the type of
`kubeflow.storage_initializer.HuggingFaceTrainerParams`
init_env_vars: Environment variable(s) to be attached to init container.
You can specify a dictionary as a mapping object representing the environment
variables. Otherwise, you can specify a list, in which the element can either
be a kubernetes.client.models.V1EnvVar (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md)
or a kubernetes.client.models.V1EnvFromSource (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md)
env_vars: Environment variable(s) to be attached to training container.
You can specify a dictionary as a mapping object representing the environment
variables. Otherwise, you can specify a list, in which the element can either
be a kubernetes.client.models.V1EnvVar (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md)
or a kubernetes.client.models.V1EnvFromSource (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md)
storage_config: Configuration for Storage Initializer PVC to download pre-trained model
and dataset. You can configure PVC size and storage class name in this argument.
"""
Expand Down Expand Up @@ -254,6 +274,7 @@ def train(
json.dumps(dataset_provider_parameters.__dict__),
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
env_vars=init_env_vars,
)

# create app container spec
Expand All @@ -280,6 +301,7 @@ def train(
],
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
resources=resources_per_worker,
env_vars=env_vars,
)

storage_initializer_volume = models.V1Volume(
Expand Down Expand Up @@ -329,6 +351,9 @@ def create_job(
num_ps_replicas: Optional[int] = None,
packages_to_install: Optional[List[str]] = None,
pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL,
env_vars: Optional[
Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]]
] = None,
):
"""Create the Training Job.
Job can be created using one of the following options:
Expand Down Expand Up @@ -386,6 +411,13 @@ def create_job(
to the base image packages if `train_func` parameter is set.
These packages are installed before executing the objective function.
pip_index_url: The PyPI url from which to install Python packages.
env_vars: Environment variable(s) to be attached to training container.
You can specify a dictionary as a mapping object representing the environment
variables. Otherwise, you can specify a list, in which the element can either
be a kubernetes.client.models.V1EnvVar (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md)
or a kubernetes.client.models.V1EnvFromSource (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md)
Raises:
ValueError: Invalid input parameters.
Expand Down Expand Up @@ -463,6 +495,7 @@ def create_job(
command=command,
args=args,
resources=resources_per_worker,
env_vars=env_vars,
)

# Get Pod template spec using the above container.
Expand Down
38 changes: 38 additions & 0 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kubernetes.client import (
ApiClient,
V1Container,
V1EnvVar,
V1ObjectMeta,
V1PodSpec,
V1PodTemplateSpec,
Expand Down Expand Up @@ -140,12 +141,23 @@ def create_job(
command=None,
args=None,
num_workers=2,
env_vars=None,
):
# Handle env_vars as either a dict or a list
if env_vars:
if isinstance(env_vars, dict):
env_vars = [V1EnvVar(name=k, value=v) for k, v in env_vars.items()]
elif isinstance(env_vars, list):
env_vars = [
v if isinstance(v, V1EnvVar) else V1EnvVar(**v) for v in env_vars
]

container = V1Container(
name=constants.PYTORCHJOB_CONTAINER,
image=TEST_IMAGE,
command=command,
args=args,
env=env_vars,
)

master = KubeflowOrgV1ReplicaSpec(
Expand Down Expand Up @@ -492,6 +504,32 @@ def __init__(self):
RuntimeError,
None,
),
(
"valid flow with env_vars as dict",
{
"name": TEST_NAME,
"namespace": TEST_NAME,
"env_vars": {"ENV_VAR": "env_value"},
"base_image": TEST_IMAGE,
"num_workers": 1,
},
SUCCESS,
create_job(env_vars={"ENV_VAR": "env_value"}, num_workers=1),
),
(
"valid flow with env_vars as list",
{
"name": TEST_NAME,
"namespace": TEST_NAME,
"env_vars": [V1EnvVar(name="ENV_VAR", value="env_value")],
"base_image": TEST_IMAGE,
"num_workers": 2,
},
SUCCESS,
create_job(
env_vars=[V1EnvVar(name="ENV_VAR", value="env_value")], num_workers=2
),
),
]

test_data_get_job_pods = [
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ def get_container_spec(
args: Optional[List[str]] = None,
resources: Union[dict, models.V1ResourceRequirements, None] = None,
volume_mounts: Optional[List[models.V1VolumeMount]] = None,
env_vars: Optional[
Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]]
] = None,
) -> models.V1Container:
"""
Get container spec for the given parameters.
Expand All @@ -203,13 +206,24 @@ def get_container_spec(
if name is None or base_image is None:
raise ValueError("Container name or base image cannot be none")

# Handle env_vars as either a dict or a list
if env_vars:
if isinstance(env_vars, dict):
env_vars = [models.V1EnvVar(name=k, value=v) for k, v in env_vars.items()]
elif isinstance(env_vars, list):
env_vars = [
v if isinstance(v, models.V1EnvVar) else models.V1EnvVar(**v)
for v in env_vars
]

# Create initial container spec.
container_spec = models.V1Container(
name=name,
image=base_image,
command=command,
args=args,
volume_mounts=volume_mounts,
env=env_vars,
)

# Convert dict to the Kubernetes container resources if that is required.
Expand Down

0 comments on commit 80e2097

Please sign in to comment.