diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 808f07065b..901a9e9028 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -102,6 +102,12 @@ def train( model_provider_parameters=None, dataset_provider_parameters=None, trainer_parameters=None, + init_env_vars: Optional[ + Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]] + ] = None, + env_vars: Optional[ + Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]] + ] = None, storage_config: Dict[str, Optional[Union[str, List[str]]]] = { "size": constants.PVC_DEFAULT_SIZE, "storage_class": None, @@ -164,6 +170,20 @@ def train( and HuggingFace training arguments like optimizer or number of training epochs. This argument must be the type of `kubeflow.storage_initializer.HuggingFaceTrainerParams` + init_env_vars: Environment variable(s) to be attached to init container. + You can specify a dictionary as a mapping object representing the environment + variables. Otherwise, you can specify a list, in which the element can either + be a kubernetes.client.models.V1EnvVar (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md) + or a kubernetes.client.models.V1EnvFromSource (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md) + env_vars: Environment variable(s) to be attached to training container. + You can specify a dictionary as a mapping object representing the environment + variables. Otherwise, you can specify a list, in which the element can either + be a kubernetes.client.models.V1EnvVar (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md) + or a kubernetes.client.models.V1EnvFromSource (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md) storage_config: Configuration for Storage Initializer PVC to download pre-trained model and dataset. You can configure PVC size and storage class name in this argument. """ @@ -254,6 +274,7 @@ def train( json.dumps(dataset_provider_parameters.__dict__), ], volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT], + env_vars=init_env_vars, ) # create app container spec @@ -280,6 +301,7 @@ def train( ], volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT], resources=resources_per_worker, + env_vars=env_vars, ) storage_initializer_volume = models.V1Volume( @@ -329,6 +351,9 @@ def create_job( num_ps_replicas: Optional[int] = None, packages_to_install: Optional[List[str]] = None, pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, + env_vars: Optional[ + Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]] + ] = None, ): """Create the Training Job. Job can be created using one of the following options: @@ -386,6 +411,13 @@ def create_job( to the base image packages if `train_func` parameter is set. These packages are installed before executing the objective function. pip_index_url: The PyPI url from which to install Python packages. + env_vars: Environment variable(s) to be attached to training container. + You can specify a dictionary as a mapping object representing the environment + variables. Otherwise, you can specify a list, in which the element can either + be a kubernetes.client.models.V1EnvVar (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md) + or a kubernetes.client.models.V1EnvFromSource (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md) Raises: ValueError: Invalid input parameters. @@ -463,6 +495,7 @@ def create_job( command=command, args=args, resources=resources_per_worker, + env_vars=env_vars, ) # Get Pod template spec using the above container. diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index 49b25d8a1e..bc5366f078 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -18,6 +18,7 @@ from kubernetes.client import ( ApiClient, V1Container, + V1EnvVar, V1ObjectMeta, V1PodSpec, V1PodTemplateSpec, @@ -140,12 +141,23 @@ def create_job( command=None, args=None, num_workers=2, + env_vars=None, ): + # Handle env_vars as either a dict or a list + if env_vars: + if isinstance(env_vars, dict): + env_vars = [V1EnvVar(name=k, value=v) for k, v in env_vars.items()] + elif isinstance(env_vars, list): + env_vars = [ + v if isinstance(v, V1EnvVar) else V1EnvVar(**v) for v in env_vars + ] + container = V1Container( name=constants.PYTORCHJOB_CONTAINER, image=TEST_IMAGE, command=command, args=args, + env=env_vars, ) master = KubeflowOrgV1ReplicaSpec( @@ -492,6 +504,32 @@ def __init__(self): RuntimeError, None, ), + ( + "valid flow with env_vars as dict", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "env_vars": {"ENV_VAR": "env_value"}, + "base_image": TEST_IMAGE, + "num_workers": 1, + }, + SUCCESS, + create_job(env_vars={"ENV_VAR": "env_value"}, num_workers=1), + ), + ( + "valid flow with env_vars as list", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "env_vars": [V1EnvVar(name="ENV_VAR", value="env_value")], + "base_image": TEST_IMAGE, + "num_workers": 2, + }, + SUCCESS, + create_job( + env_vars=[V1EnvVar(name="ENV_VAR", value="env_value")], num_workers=2 + ), + ), ] test_data_get_job_pods = [ diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 4f2d9c97d7..5389f10baf 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -195,6 +195,9 @@ def get_container_spec( args: Optional[List[str]] = None, resources: Union[dict, models.V1ResourceRequirements, None] = None, volume_mounts: Optional[List[models.V1VolumeMount]] = None, + env_vars: Optional[ + Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]] + ] = None, ) -> models.V1Container: """ Get container spec for the given parameters. @@ -203,6 +206,16 @@ def get_container_spec( if name is None or base_image is None: raise ValueError("Container name or base image cannot be none") + # Handle env_vars as either a dict or a list + if env_vars: + if isinstance(env_vars, dict): + env_vars = [models.V1EnvVar(name=k, value=v) for k, v in env_vars.items()] + elif isinstance(env_vars, list): + env_vars = [ + v if isinstance(v, models.V1EnvVar) else models.V1EnvVar(**v) + for v in env_vars + ] + # Create initial container spec. container_spec = models.V1Container( name=name, @@ -210,6 +223,7 @@ def get_container_spec( command=command, args=args, volume_mounts=volume_mounts, + env=env_vars, ) # Convert dict to the Kubernetes container resources if that is required.