Skip to content

Commit 91a46c9

Browse files
Merge pull request #293 from carsonmh/cli-view-delete-jobs
View and delete jobs functionality in the CLI
2 parents e3bf2c0 + 2ce8c4f commit 91a46c9

File tree

6 files changed

+286
-19
lines changed

6 files changed

+286
-19
lines changed

Diff for: src/codeflare_sdk/cli/cli_utils.py

+99-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@
33
from kubernetes import client, config
44
import pickle
55
import os
6+
from ray.job_submission import JobSubmissionClient
7+
from torchx.runner import get_runner
8+
from rich.table import Table
9+
from rich import print
610

7-
from codeflare_sdk.cluster.auth import _create_api_client_config
11+
from codeflare_sdk.cluster.cluster import list_clusters_all_namespaces, get_cluster
12+
from codeflare_sdk.cluster.model import RayCluster
13+
from codeflare_sdk.cluster.auth import _create_api_client_config, config_check
814
from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling
915
import codeflare_sdk.cluster.auth as sdk_auth
1016

@@ -73,3 +79,95 @@ def resolve_command(self, ctx, args):
7379
# always return the full command name
7480
_, cmd, args = super().resolve_command(ctx, args)
7581
return cmd.name, cmd, args
82+
83+
84+
def print_jobs(jobs):
85+
headers = ["Submission ID", "Job ID", "RayCluster", "Namespace", "Status"]
86+
table = Table(show_header=True)
87+
for header in headers:
88+
table.add_column(header)
89+
for job in jobs:
90+
table.add_row(*[job[header] for header in headers])
91+
print(table)
92+
93+
94+
def list_all_kubernetes_jobs(print_to_console=True):
95+
k8s_jobs = []
96+
runner = get_runner()
97+
jobs = runner.list(scheduler="kubernetes_mcad")
98+
rayclusters = {
99+
raycluster.name for raycluster in list_clusters_all_namespaces(False)
100+
}
101+
for job in jobs:
102+
namespace, name = job.app_id.split(":")
103+
status = job.state
104+
if name not in rayclusters:
105+
k8s_jobs.append(
106+
{
107+
"Submission ID": name,
108+
"Job ID": "N/A",
109+
"RayCluster": "N/A",
110+
"Namespace": namespace,
111+
"Status": str(status),
112+
"App Handle": job.app_handle,
113+
}
114+
)
115+
if print_to_console:
116+
print_jobs(k8s_jobs)
117+
return k8s_jobs
118+
119+
120+
def list_all_jobs(print_to_console=True):
121+
k8s_jobs = list_all_kubernetes_jobs(False)
122+
rc_jobs = list_all_raycluster_jobs(False)
123+
all_jobs = rc_jobs + k8s_jobs
124+
if print_to_console:
125+
print_jobs(all_jobs)
126+
return all_jobs
127+
128+
129+
def list_raycluster_jobs(cluster: RayCluster, print_to_console=True):
130+
rc_jobs = []
131+
client = JobSubmissionClient(cluster.dashboard)
132+
jobs = client.list_jobs()
133+
for job in jobs:
134+
job_obj = {
135+
"Submission ID": job.submission_id,
136+
"Job ID": job.job_id,
137+
"RayCluster": cluster.name,
138+
"Namespace": cluster.namespace,
139+
"Status": str(job.status),
140+
"App Handle": "ray://torchx/" + cluster.dashboard + "-" + job.submission_id,
141+
}
142+
rc_jobs.append(job_obj)
143+
if print_to_console:
144+
print_jobs(rc_jobs)
145+
return rc_jobs
146+
147+
148+
def list_all_raycluster_jobs(print_to_console=True):
149+
rc_jobs = []
150+
clusters = list_clusters_all_namespaces(False)
151+
for cluster in clusters:
152+
cluster.dashboard = "http://" + cluster.dashboard
153+
rc_jobs += list_raycluster_jobs(cluster, False)
154+
if print_to_console:
155+
print_jobs(rc_jobs)
156+
return rc_jobs
157+
158+
159+
def get_job_app_handle(job_submission):
160+
job = get_job_object(job_submission)
161+
return job["App Handle"]
162+
163+
164+
def get_job_object(job_submission):
165+
all_jobs = list_all_jobs(False)
166+
for job in all_jobs:
167+
if job["Submission ID"] == job_submission:
168+
return job
169+
raise (
170+
FileNotFoundError(
171+
f"Job {job_submission} not found. Try using 'codeflare list --all' to see all jobs"
172+
)
173+
)

Diff for: src/codeflare_sdk/cli/commands/cancel.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import click
2+
from torchx.runner import get_runner
3+
4+
5+
from codeflare_sdk.cli.cli_utils import get_job_app_handle
6+
7+
8+
@click.group()
9+
def cli():
10+
"""Cancel a resource"""
11+
pass
12+
13+
14+
@cli.command()
15+
@click.pass_context
16+
@click.argument("submission-id", type=str)
17+
def job(ctx, submission_id):
18+
"""Cancel a job"""
19+
runner = get_runner()
20+
try:
21+
app_handle = get_job_app_handle(submission_id)
22+
runner.cancel(app_handle=app_handle)
23+
click.echo(f"{submission_id} cancelled successfully")
24+
except FileNotFoundError:
25+
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
26+
except Exception as e:
27+
click.echo("Error cancelling job: " + str(e))

Diff for: src/codeflare_sdk/cli/commands/list.py

+29-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import click
2-
from kubernetes import client, config
32

43
from codeflare_sdk.cluster.cluster import (
54
list_clusters_all_namespaces,
65
list_all_clusters,
76
)
87
from codeflare_sdk.cli.cli_utils import PluralAlias
8+
from codeflare_sdk.cluster.cluster import get_cluster
9+
from codeflare_sdk.cluster.cluster import _copy_to_ray
10+
from codeflare_sdk.cli.cli_utils import list_all_jobs
11+
from codeflare_sdk.cli.cli_utils import list_all_kubernetes_jobs
12+
from codeflare_sdk.cli.cli_utils import list_raycluster_jobs
913

1014

1115
@click.group(cls=PluralAlias)
@@ -16,15 +20,31 @@ def cli():
1620

1721
@cli.command()
1822
@click.option("--namespace", type=str)
19-
@click.option("--all", is_flag=True)
2023
@click.pass_context
21-
def raycluster(ctx, namespace, all):
22-
"""List all rayclusters in a specified namespace"""
23-
if all and namespace:
24-
click.echo("--all and --namespace are mutually exclusive")
25-
return
26-
namespace = namespace or ctx.obj.current_namespace
27-
if not all:
24+
def raycluster(ctx, namespace):
25+
"""
26+
List all rayclusters
27+
"""
28+
if namespace:
2829
list_all_clusters(namespace)
2930
return
3031
list_clusters_all_namespaces()
32+
33+
34+
@cli.command()
35+
@click.pass_context
36+
@click.option("--cluster-name", "-c", type=str)
37+
@click.option("--namespace", "-n", type=str)
38+
@click.option("--kube-mcad-scheduler-only", is_flag=True)
39+
def job(ctx, cluster_name, namespace, kube_mcad_scheduler_only):
40+
"""
41+
List all jobs submitted
42+
"""
43+
if cluster_name:
44+
cluster = get_cluster(cluster_name, namespace or ctx.obj.current_namespace)
45+
list_raycluster_jobs(_copy_to_ray(cluster), True)
46+
return
47+
if kube_mcad_scheduler_only:
48+
list_all_kubernetes_jobs(True)
49+
return
50+
list_all_jobs(True)

Diff for: src/codeflare_sdk/cli/commands/logs.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import click
2+
from torchx.runner import get_runner
3+
4+
from codeflare_sdk.cli.cli_utils import get_job_app_handle
5+
6+
7+
@click.group()
8+
def cli():
9+
"""Get the logs of a specified resource"""
10+
pass
11+
12+
13+
@cli.command()
14+
@click.pass_context
15+
@click.argument("submission-id", type=str)
16+
def job(ctx, submission_id):
17+
"""Get the logs of a specified job"""
18+
runner = get_runner()
19+
try:
20+
app_handle = get_job_app_handle(submission_id)
21+
click.echo("".join(runner.log_lines(app_handle, None)))
22+
except FileNotFoundError:
23+
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
24+
except Exception as e:
25+
click.echo("Error getting job logs: " + str(e))

Diff for: src/codeflare_sdk/cli/commands/status.py

+17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import click
2+
from torchx.runner import get_runner
23

34
from codeflare_sdk.cluster.cluster import get_cluster
5+
from codeflare_sdk.cli.cli_utils import get_job_app_handle
46

57

68
@click.group()
@@ -22,3 +24,18 @@ def raycluster(ctx, name, namespace):
2224
click.echo(f"Cluster {name} not found in {namespace} namespace")
2325
return
2426
cluster.status()
27+
28+
29+
@cli.command()
30+
@click.pass_context
31+
@click.argument("submission-id", type=str)
32+
def job(ctx, submission_id):
33+
"""Get the status of a specified job"""
34+
runner = get_runner()
35+
try:
36+
app_handle = get_job_app_handle(submission_id)
37+
click.echo(runner.status(app_handle=app_handle))
38+
except FileNotFoundError:
39+
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
40+
except Exception as e:
41+
click.echo("Error getting job status: " + str(e))

0 commit comments

Comments
 (0)