Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][kubectl-plugin] Implement kubectl ray job submit with Deletion Policy API for RayJob Cleanup #3064

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

win5923
Copy link
Contributor

@win5923 win5923 commented Feb 14, 2025

Why are these changes needed?

Adds the shutdown-after-job-finishes, deletion-policy and ttl-seconds-after-finished flag to the kubectl ray job submit to allow users to manage RayJob CRs cleanup more easily by configuring deletion policies.

This behavior relies on ray-operator enable RayJobDeletionPolicy feature gate.

If the user has not enabled the RayJobDeletionPolicy

$ kubectl ray job submit --name ray-job-sample --deletion-policy DeleteCluster --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.
Deleting RayJob...
Cleaned Up RayJob: ray-job-sample
2025/02/16 15:06:14 The RayJob spec is invalid default/ray-job-sample: RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature

  • Set ttl-seconds-after-finished but not set shutdown-after-job-finishes
$ kubectl ray job submit --name ray-job-sample --ttl-seconds-after-finished 10  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml  -- python sample_code.py
Error: TTLSecondsAfterFinished only works when shutdown-after-job-finishes is set to true
  • Set ttl-seconds-after-finished and shutdown-after-job-finishes
$ kubectl ray job submit --name ray-job-sample --shutdown-after-job-finishes --ttl-seconds-after-finished 10  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.

$ k get rayjob
NAME             JOB STATUS   DEPLOYMENT STATUS   RAY CLUSTER NAME                  START TIME             END TIME               AGE
ray-job-sample   SUCCEEDED    Complete            ray-job-sample-raycluster-5lvzq   2025-02-15T04:06:52Z   2025-02-15T04:07:36Z   62s

$ k get raycluster
No resources found in default namespace.
  • Set deletion-policy to DeleteSelf
$ kubectl ray job submit --name ray-job-sample --deletion-policy DeleteSelf  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.

$ k get raycluster
No resources found in default namespace.

$ k get rayjob
No resources found in default namespace.
  • Set deletion-policy to DeleteCluster
kubectl ray job submit --name ray-job-sample --deletion-policy DeleteCluster  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.

$ k get rayjob
NAME             JOB STATUS   DEPLOYMENT STATUS   RAY CLUSTER NAME                  START TIME             END TIME               AGE
ray-job-sample   SUCCEEDED    Complete            ray-job-sample-raycluster-krkv5   2025-02-15T04:14:04Z   2025-02-15T04:14:51Z   66s

$ k get raycluster
No resources found in default namespace.
  • Set deletion-policy to DeleteWorkers
$ kubectl ray job submit --name ray-job-sample --deletion-policy DeleteWorkers  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.

$ k get rayjob
NAME             JOB STATUS   DEPLOYMENT STATUS   RAY CLUSTER NAME                  START TIME             END TIME               AGE
ray-job-sample   SUCCEEDED    Complete            ray-job-sample-raycluster-w5j8m   2025-02-15T04:16:23Z   2025-02-15T04:17:11Z   80s

$ k get raycluster
NAME                              DESIRED WORKERS   AVAILABLE WORKERS   CPUS   MEMORY   GPUS   STATUS   AGE
ray-job-sample-raycluster-w5j8m                                         2      4Gi      0      ready    66s

$ k get pod
NAME                                         READY   STATUS    RESTARTS   AGE
kuberay-operator-f95d56cc6-txxwd             1/1     Running   0          101m
ray-job-sample-raycluster-w5j8m-head-qhdn7   1/1     Running   0          97s
  • Set deletion-policy to DeleteNone
$ kubectl ray job submit --name ray-job-sample --deletion-policy DeleteNone  --working-dir ~/workdir --runtime-env ~/workdir/runtimeEnv.yaml -- python sample_code.py
Submitted RayJob ray-job-sample.

$ k get rayjob
NAME             JOB STATUS   DEPLOYMENT STATUS   RAY CLUSTER NAME                  START TIME             END TIME               AGE
ray-job-sample   SUCCEEDED    Complete            ray-job-sample-raycluster-rdx6h   2025-02-15T05:06:04Z   2025-02-15T05:06:47Z   52s

$ k get rayclusters
NAME                              DESIRED WORKERS   AVAILABLE WORKERS   CPUS   MEMORY   GPUS   STATUS   AGE
ray-job-sample-raycluster-rdx6h   1                 1                   4      8Gi      0      ready    56s

$ k get pod
NAME                                                         READY   STATUS    RESTARTS   AGE
kuberay-operator-f95d56cc6-l8b26                             1/1     Running   0          3m12s
ray-job-sample-raycluster-rdx6h-default-group-worker-xx9qt   1/1     Running   0          59s
ray-job-sample-raycluster-rdx6h-head-2h4l9                   1/1     Running   0          59s

Related issue number

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@win5923 win5923 changed the title [feat][kubectl-plugin] add --cleanup flag to kubectl plugin for RayJob cleanup [feat][kubectl-plugin] add --cleanup flag to ray job submit for RayJob cleanup Feb 14, 2025
@@ -161,6 +162,7 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "amount of memory in each worker group replica")
cmd.Flags().StringVar(&options.workerGPU, "worker-gpu", "0", "number of GPUs in each worker group replica")
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "print the generated YAML instead of creating the cluster. Only works when filename is not provided")
cmd.Flags().BoolVar(&options.cleanupJob, "cleanup", false, "Delete the Ray job after job completion")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can have this be a --deletion-policy that maps to this: https://github.com/ray-project/kuberay/blob/master/ray-operator/apis/ray/v1/rayjob_types.go#L112

For compatibility maybe we need a flag for both shutdownAfterJobFinishes and deletionPolicy

Copy link
Contributor Author

@win5923 win5923 Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have rewrited the code and introduced the shutdown-after-job-finishes, deletion-policy, and ttl-seconds-after-finished flags for cleanup.

@win5923 win5923 marked this pull request as draft February 14, 2025 17:27
@win5923 win5923 changed the title [feat][kubectl-plugin] add --cleanup flag to ray job submit for RayJob cleanup [feat][kubectl-plugin] add shutdown-after-job-finishes and deletion-policy flag to ray job submit for cleanup Feb 15, 2025
@win5923 win5923 changed the title [feat][kubectl-plugin] add shutdown-after-job-finishes and deletion-policy flag to ray job submit for cleanup [feat][kubectl-plugin] Implement kubectl ray job submit with Deletion Policy API for RayJob Cleanup Feb 15, 2025
@win5923 win5923 marked this pull request as ready for review February 15, 2025 04:59
@win5923 win5923 requested a review from andrewsykim February 15, 2025 09:27
@win5923 win5923 marked this pull request as draft February 17, 2025 14:22
dashboardAddr = "http://localhost:8265"
clusterTimeout = 120.0
portforwardtimeout = 60.0
rayjobDeletionTimeout = 30.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judge within 30 seconds whether the event RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature is received.

@win5923 win5923 marked this pull request as ready for review February 17, 2025 16:22
Comment on lines +192 to +226
func (c *k8sClient) WaitRayJobDeletionPolicyEnabled(ctx context.Context, namespace, name string, startTime time.Time, timeout time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout*time.Second)
defer cancel()

watcher, err := c.KubernetesClient().CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s", name),
})
if err != nil {
return fmt.Errorf("failed to watch events for RayJob %s in namespace %s: %w", name, namespace, err)
}
defer watcher.Stop()

for {
select {
case <-timeoutCtx.Done():
// If the RayJobDeletionPolicy feature event is not received within the timeout period, it is considered enabled.
return nil
case event := <-watcher.ResultChan():
if event.Type == watch.Error {
return fmt.Errorf("error watching events: %v", event.Object)
}

e, ok := event.Object.(*corev1.Event)
if !ok {
continue
}

if strings.Contains(e.Message, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") {
if e.FirstTimestamp.Time.After(startTime) || e.LastTimestamp.Time.After(startTime) {
return fmt.Errorf("%s", e.Message)
}
}
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block for 30 seconds until the RayJobDeletionPolicy feature event is found. If the event is not found within 30 seconds, it assume that the user has enabled RayJobDeletionPolicy by default.

There might be a better way, but I haven't thought of one yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants