Skip to content

CLI Define and Submit job commands #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 10, 2023
3 changes: 2 additions & 1 deletion src/codeflare_sdk/cli/codeflare_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def get_command(self, ctx, name):
@click.command(cls=CodeflareCLI)
@click.pass_context
def cli(ctx):
load_auth()
if ctx.invoked_subcommand != "login" and ctx.invoked_subcommand != "logout":
load_auth()
ctx.obj = CodeflareContext() # Ran on every command
pass

Expand Down
54 changes: 44 additions & 10 deletions src/codeflare_sdk/cli/commands/define.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import click
import pickle

from codeflare_sdk.cluster.cluster import Cluster
from codeflare_sdk.cluster.config import ClusterConfiguration
from codeflare_sdk.cli.cli_utils import PythonLiteralOption
from codeflare_sdk.job.jobs import DDPJobDefinition


@click.group()
Expand All @@ -15,25 +17,57 @@ def cli():
@click.pass_context
@click.option("--name", type=str, required=True)
@click.option("--namespace", "-n", type=str)
@click.option("--head_info", cls=PythonLiteralOption, type=list)
@click.option("--machine_types", cls=PythonLiteralOption, type=list)
@click.option("--min_cpus", type=int)
@click.option("--max_cpus", type=int)
@click.option("--min_worker", type=int)
@click.option("--max_worker", type=int)
@click.option("--min_memory", type=int)
@click.option("--max_memory", type=int)
@click.option("--head-info", cls=PythonLiteralOption, type=list)
@click.option("--machine-types", cls=PythonLiteralOption, type=list)
@click.option("--min-cpus", type=int)
@click.option("--max-cpus", type=int)
@click.option("--min-worker", type=int)
@click.option("--max-worker", type=int)
@click.option("--min-memory", type=int)
@click.option("--max-memory", type=int)
@click.option("--gpu", type=int)
@click.option("--template", type=str)
@click.option("--instascale", type=bool)
@click.option("--envs", cls=PythonLiteralOption, type=dict)
@click.option("--image", type=str)
@click.option("--local_interactive", type=bool)
@click.option("--image_pull_secrets", cls=PythonLiteralOption, type=list)
@click.option("--local-interactive", type=bool)
@click.option("--image-pull-secrets", cls=PythonLiteralOption, type=list)
def raycluster(ctx, **kwargs):
"""Define a RayCluster with parameter specifications"""
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
if "namespace" not in filtered_kwargs.keys():
filtered_kwargs["namespace"] = ctx.obj.current_namespace
clusterConfig = ClusterConfiguration(**filtered_kwargs)
Cluster(clusterConfig) # Creates yaml file


@cli.command()
@click.pass_context
@click.option("--script", type=str, required=True)
@click.option("--m", type=str)
@click.option("--script-args", cls=PythonLiteralOption, type=list)
@click.option("--name", type=str, required=True)
@click.option("--cpu", type=int)
@click.option("--gpu", type=int)
@click.option("--memMB", type=int)
@click.option("--h", type=str)
@click.option("--j", type=str)
@click.option("--env", cls=PythonLiteralOption, type=dict)
@click.option("--max-retries", type=int)
@click.option("--mounts", cls=PythonLiteralOption, type=list)
@click.option("--rdzv-port", type=int)
@click.option("--rdzv-backend", type=str)
@click.option("--scheduler-args", cls=PythonLiteralOption, type=dict)
@click.option("--image", type=str)
@click.option("--workspace", type=str)
def job(ctx, **kwargs):
"""Define a job with specified resources"""
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
if "memmb" in filtered_kwargs:
filtered_kwargs["memMB"] = filtered_kwargs["memmb"]
del filtered_kwargs["memmb"]
job_def = DDPJobDefinition(**filtered_kwargs)
job_file_path = ctx.obj.codeflare_path + f"/{job_def.name}"
with open(job_file_path, "wb") as file:
pickle.dump(job_def, file)
click.echo("Job definition saved to " + job_file_path)
2 changes: 1 addition & 1 deletion src/codeflare_sdk/cli/commands/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
def cli(ctx, server, token, insecure_skip_tls_verify, certificate_authority):
"""
Login to your Kubernetes cluster and save login for subsequent use
Login to your Kubernetes cluster and save login for later use
"""
auth = TokenAuthentication(
token, server, insecure_skip_tls_verify, certificate_authority
Expand Down
42 changes: 42 additions & 0 deletions src/codeflare_sdk/cli/commands/submit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import click
import os

from codeflare_sdk.cluster.cluster import Cluster
import pickle
from torchx.runner import get_runner

from codeflare_sdk.cluster.cluster import get_cluster


@click.group()
Expand Down Expand Up @@ -30,3 +35,40 @@ def raycluster(name, wait):
return
cluster.up()
cluster.wait_ready()


@cli.command()
@click.pass_context
@click.argument("name", type=str)
@click.option("--cluster-name", type=str)
@click.option("--namespace", type=str)
def job(ctx, name, cluster_name, namespace):
"""
Submit a defined job to the Kubernetes cluster or a RayCluster
"""
runner = get_runner()
job_path = ctx.obj.codeflare_path + f"/{name}"
if not os.path.isfile(job_path):
click.echo(
f"Error submitting job. Make sure the job is defined before submitting it"
)
return
with open(job_path, "rb") as file:
job_def = pickle.load(file)
if not cluster_name:
job = job_def.submit()
submission_id = runner.describe(job._app_handle).name.split(":")[1]
click.echo(f"Job {submission_id} submitted successfully")
return
namespace = namespace or ctx.obj.current_namespace
try:
cluster = get_cluster(cluster_name, namespace)
except FileNotFoundError:
click.echo(f"Cluster {name} not found in {namespace} namespace")
return
job = job_def.submit(cluster)
full_name = runner.describe(job._app_handle).name
submission_id = full_name[full_name.rfind(name) :]
click.echo(
f"Job {submission_id} submitted onto {cluster_name} RayCluster successfully\nView dashboard: {cluster.cluster_dashboard_uri()}"
)
92 changes: 75 additions & 17 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ def test_cluster_definition_cli(mocker):
define raycluster
--name=cli-test-cluster
--namespace=default
--min_worker=1
--max_worker=2
--min_cpus=3
--max_cpus=4
--min_memory=5
--max_memory=6
--min-worker=1
--max-worker=2
--min-cpus=3
--max-cpus=4
--min-memory=5
--max-memory=6
--gpu=7
--instascale=True
--machine_types='["cpu.small", "gpu.large"]'
--image_pull_secrets='["cli-test-pull-secret"]'
--machine-types='["cpu.small", "gpu.large"]'
--image-pull-secrets='["cli-test-pull-secret"]'
"""
result = runner.invoke(cli, define_cluster_command)
assert (
Expand Down Expand Up @@ -165,15 +165,6 @@ def test_login_tls_cli(mocker):
)


def test_logout_cli(mocker):
runner = CliRunner()
mocker.patch.object(client, "ApiClient")
k8s_logout_command = "logout"
logout_result = runner.invoke(cli, k8s_logout_command)
assert "Successfully logged out of 'testserver:6443'\n" in logout_result.output
assert not os.path.exists(os.path.expanduser("~/.codeflare/auth"))


def test_load_auth():
load_auth()
assert sdk_auth.api_client is not None
Expand Down Expand Up @@ -243,6 +234,63 @@ def test_list_clusters_all_namespaces(mocker, capsys):
)


def test_job_definition_cli():
runner = CliRunner()
define_job_command = """
define job
--script=test-script.py
--script-args='["arg1", "arg2"]'
--memMB=2
--image=test-image
--name=test
"""
result = runner.invoke(cli, define_job_command)
file_path = os.path.expanduser("~") + "/.codeflare/test"
assert result.output == "Job definition saved to " + file_path + "\n"
try:
with open(file_path, "rb") as file:
job = pickle.load(file)
except Exception as e:
print("Error opening file: ", e)
assert 0 == 1
assert job.script == "test-script.py"
assert job.script_args == ["arg1", "arg2"]
assert job.memMB == 2
assert job.image == "test-image"
assert job.name == "test"


def test_job_submission_cli(mocker):
mocker.patch.object(client, "ApiClient")
mocker.patch(
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
side_effect=get_ray_obj,
)
mocker.patch(
"codeflare_sdk.cluster.cluster.Cluster.cluster_dashboard_uri",
return_value="test-url.com",
)
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.schedule",
return_value="test-url.com",
)
mocker.patch("torchx.runner.Runner.describe", return_value=AppDef(name="test-1234"))
runner = CliRunner()
submit_job_command = """
submit job
test
--cluster-name=quicktest
--namespace=default
"""
result = runner.invoke(cli, submit_job_command)
assert (
result.output
== "Written to: quicktest.yaml\n"
+ "test-1234 submitted onto quicktest RayCluster successfully\n"
+ "View dashboard: test-url.com\n"
)


def test_raycluster_details_cli(mocker):
runner = CliRunner()
mocker.patch(
Expand Down Expand Up @@ -368,6 +416,16 @@ def test_raycluster_list_cli(mocker):
) in result.output


# Keep this test at the end of CLI tests
def test_logout_cli(mocker):
runner = CliRunner()
mocker.patch.object(client, "ApiClient")
k8s_logout_command = "logout"
logout_result = runner.invoke(cli, k8s_logout_command)
assert logout_result.output == "Successfully logged out of 'testserver:6443'\n"
assert not os.path.exists(os.path.expanduser("~/.codeflare/auth"))


# For mocking openshift client results
fake_res = openshift.Result("fake")

Expand Down