diff --git a/src/codeflare_sdk/cli/codeflare_cli.py b/src/codeflare_sdk/cli/codeflare_cli.py index 2731ac0b7..c9f17a6dc 100644 --- a/src/codeflare_sdk/cli/codeflare_cli.py +++ b/src/codeflare_sdk/cli/codeflare_cli.py @@ -44,7 +44,8 @@ def get_command(self, ctx, name): @click.command(cls=CodeflareCLI) @click.pass_context def cli(ctx): - load_auth() + if ctx.invoked_subcommand != "login" and ctx.invoked_subcommand != "logout": + load_auth() ctx.obj = CodeflareContext() # Ran on every command pass diff --git a/src/codeflare_sdk/cli/commands/define.py b/src/codeflare_sdk/cli/commands/define.py index 4db177f3b..7c8d4476f 100644 --- a/src/codeflare_sdk/cli/commands/define.py +++ b/src/codeflare_sdk/cli/commands/define.py @@ -1,8 +1,10 @@ import click +import pickle from codeflare_sdk.cluster.cluster import Cluster from codeflare_sdk.cluster.config import ClusterConfiguration from codeflare_sdk.cli.cli_utils import PythonLiteralOption +from codeflare_sdk.job.jobs import DDPJobDefinition @click.group() @@ -15,21 +17,21 @@ def cli(): @click.pass_context @click.option("--name", type=str, required=True) @click.option("--namespace", "-n", type=str) -@click.option("--head_info", cls=PythonLiteralOption, type=list) -@click.option("--machine_types", cls=PythonLiteralOption, type=list) -@click.option("--min_cpus", type=int) -@click.option("--max_cpus", type=int) -@click.option("--min_worker", type=int) -@click.option("--max_worker", type=int) -@click.option("--min_memory", type=int) -@click.option("--max_memory", type=int) +@click.option("--head-info", cls=PythonLiteralOption, type=list) +@click.option("--machine-types", cls=PythonLiteralOption, type=list) +@click.option("--min-cpus", type=int) +@click.option("--max-cpus", type=int) +@click.option("--min-worker", type=int) +@click.option("--max-worker", type=int) +@click.option("--min-memory", type=int) +@click.option("--max-memory", type=int) @click.option("--gpu", type=int) @click.option("--template", type=str) @click.option("--instascale", type=bool) @click.option("--envs", cls=PythonLiteralOption, type=dict) @click.option("--image", type=str) -@click.option("--local_interactive", type=bool) -@click.option("--image_pull_secrets", cls=PythonLiteralOption, type=list) +@click.option("--local-interactive", type=bool) +@click.option("--image-pull-secrets", cls=PythonLiteralOption, type=list) def raycluster(ctx, **kwargs): """Define a RayCluster with parameter specifications""" filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} @@ -37,3 +39,35 @@ def raycluster(ctx, **kwargs): filtered_kwargs["namespace"] = ctx.obj.current_namespace clusterConfig = ClusterConfiguration(**filtered_kwargs) Cluster(clusterConfig) # Creates yaml file + + +@cli.command() +@click.pass_context +@click.option("--script", type=str, required=True) +@click.option("--m", type=str) +@click.option("--script-args", cls=PythonLiteralOption, type=list) +@click.option("--name", type=str, required=True) +@click.option("--cpu", type=int) +@click.option("--gpu", type=int) +@click.option("--memMB", type=int) +@click.option("--h", type=str) +@click.option("--j", type=str) +@click.option("--env", cls=PythonLiteralOption, type=dict) +@click.option("--max-retries", type=int) +@click.option("--mounts", cls=PythonLiteralOption, type=list) +@click.option("--rdzv-port", type=int) +@click.option("--rdzv-backend", type=str) +@click.option("--scheduler-args", cls=PythonLiteralOption, type=dict) +@click.option("--image", type=str) +@click.option("--workspace", type=str) +def job(ctx, **kwargs): + """Define a job with specified resources""" + filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} + if "memmb" in filtered_kwargs: + filtered_kwargs["memMB"] = filtered_kwargs["memmb"] + del filtered_kwargs["memmb"] + job_def = DDPJobDefinition(**filtered_kwargs) + job_file_path = ctx.obj.codeflare_path + f"/{job_def.name}" + with open(job_file_path, "wb") as file: + pickle.dump(job_def, file) + click.echo("Job definition saved to " + job_file_path) diff --git a/src/codeflare_sdk/cli/commands/login.py b/src/codeflare_sdk/cli/commands/login.py index 288607a89..56df911b2 100644 --- a/src/codeflare_sdk/cli/commands/login.py +++ b/src/codeflare_sdk/cli/commands/login.py @@ -25,7 +25,7 @@ ) def cli(ctx, server, token, insecure_skip_tls_verify, certificate_authority): """ - Login to your Kubernetes cluster and save login for subsequent use + Login to your Kubernetes cluster and save login for later use """ auth = TokenAuthentication( token, server, insecure_skip_tls_verify, certificate_authority diff --git a/src/codeflare_sdk/cli/commands/submit.py b/src/codeflare_sdk/cli/commands/submit.py index 8a476d602..7c6760cae 100644 --- a/src/codeflare_sdk/cli/commands/submit.py +++ b/src/codeflare_sdk/cli/commands/submit.py @@ -1,6 +1,11 @@ import click +import os from codeflare_sdk.cluster.cluster import Cluster +import pickle +from torchx.runner import get_runner + +from codeflare_sdk.cluster.cluster import get_cluster @click.group() @@ -30,3 +35,40 @@ def raycluster(name, wait): return cluster.up() cluster.wait_ready() + + +@cli.command() +@click.pass_context +@click.argument("name", type=str) +@click.option("--cluster-name", type=str) +@click.option("--namespace", type=str) +def job(ctx, name, cluster_name, namespace): + """ + Submit a defined job to the Kubernetes cluster or a RayCluster + """ + runner = get_runner() + job_path = ctx.obj.codeflare_path + f"/{name}" + if not os.path.isfile(job_path): + click.echo( + f"Error submitting job. Make sure the job is defined before submitting it" + ) + return + with open(job_path, "rb") as file: + job_def = pickle.load(file) + if not cluster_name: + job = job_def.submit() + submission_id = runner.describe(job._app_handle).name.split(":")[1] + click.echo(f"Job {submission_id} submitted successfully") + return + namespace = namespace or ctx.obj.current_namespace + try: + cluster = get_cluster(cluster_name, namespace) + except FileNotFoundError: + click.echo(f"Cluster {name} not found in {namespace} namespace") + return + job = job_def.submit(cluster) + full_name = runner.describe(job._app_handle).name + submission_id = full_name[full_name.rfind(name) :] + click.echo( + f"Job {submission_id} submitted onto {cluster_name} RayCluster successfully\nView dashboard: {cluster.cluster_dashboard_uri()}" + ) diff --git a/tests/unit_test.py b/tests/unit_test.py index a9258017f..fc5cc1db2 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -95,16 +95,16 @@ def test_cluster_definition_cli(mocker): define raycluster --name=cli-test-cluster --namespace=default - --min_worker=1 - --max_worker=2 - --min_cpus=3 - --max_cpus=4 - --min_memory=5 - --max_memory=6 + --min-worker=1 + --max-worker=2 + --min-cpus=3 + --max-cpus=4 + --min-memory=5 + --max-memory=6 --gpu=7 --instascale=True - --machine_types='["cpu.small", "gpu.large"]' - --image_pull_secrets='["cli-test-pull-secret"]' + --machine-types='["cpu.small", "gpu.large"]' + --image-pull-secrets='["cli-test-pull-secret"]' """ result = runner.invoke(cli, define_cluster_command) assert ( @@ -165,15 +165,6 @@ def test_login_tls_cli(mocker): ) -def test_logout_cli(mocker): - runner = CliRunner() - mocker.patch.object(client, "ApiClient") - k8s_logout_command = "logout" - logout_result = runner.invoke(cli, k8s_logout_command) - assert "Successfully logged out of 'testserver:6443'\n" in logout_result.output - assert not os.path.exists(os.path.expanduser("~/.codeflare/auth")) - - def test_load_auth(): load_auth() assert sdk_auth.api_client is not None @@ -243,6 +234,63 @@ def test_list_clusters_all_namespaces(mocker, capsys): ) +def test_job_definition_cli(): + runner = CliRunner() + define_job_command = """ + define job + --script=test-script.py + --script-args='["arg1", "arg2"]' + --memMB=2 + --image=test-image + --name=test + """ + result = runner.invoke(cli, define_job_command) + file_path = os.path.expanduser("~") + "/.codeflare/test" + assert result.output == "Job definition saved to " + file_path + "\n" + try: + with open(file_path, "rb") as file: + job = pickle.load(file) + except Exception as e: + print("Error opening file: ", e) + assert 0 == 1 + assert job.script == "test-script.py" + assert job.script_args == ["arg1", "arg2"] + assert job.memMB == 2 + assert job.image == "test-image" + assert job.name == "test" + + +def test_job_submission_cli(mocker): + mocker.patch.object(client, "ApiClient") + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + side_effect=get_ray_obj, + ) + mocker.patch( + "codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri", + return_value="test-url.com", + ) + mocker.patch( + "codeflare_sdk.job.jobs.torchx_runner.schedule", + return_value="test-url.com", + ) + mocker.patch("torchx.runner.Runner.describe", return_value=AppDef(name="test-1234")) + runner = CliRunner() + submit_job_command = """ + submit job + test + --cluster-name=quicktest + --namespace=default + """ + result = runner.invoke(cli, submit_job_command) + assert ( + result.output + == "Written to: quicktest.yaml\n" + + "test-1234 submitted onto quicktest RayCluster successfully\n" + + "View dashboard: test-url.com\n" + ) + + def test_raycluster_details_cli(mocker): runner = CliRunner() mocker.patch( @@ -368,6 +416,16 @@ def test_raycluster_list_cli(mocker): ) in result.output +# Keep this test at the end of CLI tests +def test_logout_cli(mocker): + runner = CliRunner() + mocker.patch.object(client, "ApiClient") + k8s_logout_command = "logout" + logout_result = runner.invoke(cli, k8s_logout_command) + assert logout_result.output == "Successfully logged out of 'testserver:6443'\n" + assert not os.path.exists(os.path.expanduser("~/.codeflare/auth")) + + # For mocking openshift client results fake_res = openshift.Result("fake")