Skip to content

Commit d49a7af

Browse files
authored
[RayJob][Status][6/n] Redefine JobDeploymentStatusComplete and clean up K8s Job after TTL (#1762)
1 parent cfa1203 commit d49a7af

File tree

11 files changed

+73
-91
lines changed

11 files changed

+73
-91
lines changed

apiserver/pkg/model/converter.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,7 @@ func FromCrdToApiJob(job *rayv1api.RayJob) (pbJob *api.RayJob) {
439439
pbJob.ClusterSpec = PopulateRayClusterSpec(*job.Spec.RayClusterSpec)
440440
}
441441

442-
if job.Spec.TTLSecondsAfterFinished != nil {
443-
pbJob.TtlSecondsAfterFinished = *job.Spec.TTLSecondsAfterFinished
444-
}
442+
pbJob.TtlSecondsAfterFinished = job.Spec.TTLSecondsAfterFinished
445443

446444
if job.DeletionTimestamp != nil {
447445
pbJob.DeleteAt = &timestamp.Timestamp{Seconds: job.DeletionTimestamp.Unix()}

apiserver/pkg/model/converter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ var JobNewClusterTest = rayv1api.RayJob{
285285
"job_submission_id": "123",
286286
},
287287
RuntimeEnvYAML: "mytest yaml",
288-
TTLSecondsAfterFinished: &secondsValue,
288+
TTLSecondsAfterFinished: secondsValue,
289289
RayClusterSpec: &ClusterSpecTest.Spec,
290290
},
291291
}
@@ -301,7 +301,7 @@ var JobExistingClusterTest = rayv1api.RayJob{
301301
Spec: rayv1api.RayJobSpec{
302302
Entrypoint: "python /home/ray/samples/sample_code.py",
303303
RuntimeEnvYAML: "mytest yaml",
304-
TTLSecondsAfterFinished: &secondsValue,
304+
TTLSecondsAfterFinished: secondsValue,
305305
ClusterSelector: map[string]string{
306306
util.RayClusterUserLabelKey: "test",
307307
},
@@ -319,7 +319,7 @@ var JobExistingClusterSubmitterTest = rayv1api.RayJob{
319319
Spec: rayv1api.RayJobSpec{
320320
Entrypoint: "python /home/ray/samples/sample_code.py",
321321
RuntimeEnvYAML: "mytest yaml",
322-
TTLSecondsAfterFinished: &secondsValue,
322+
TTLSecondsAfterFinished: secondsValue,
323323
ClusterSelector: map[string]string{
324324
util.RayClusterUserLabelKey: "test",
325325
},

apiserver/pkg/util/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewRayJob(apiJob *api.RayJob, computeTemplateMap map[string]*api.ComputeTem
3030
Metadata: apiJob.Metadata,
3131
RuntimeEnvYAML: apiJob.RuntimeEnv,
3232
ShutdownAfterJobFinishes: apiJob.ShutdownAfterJobFinishes,
33-
TTLSecondsAfterFinished: &apiJob.TtlSecondsAfterFinished,
33+
TTLSecondsAfterFinished: apiJob.TtlSecondsAfterFinished,
3434
JobId: apiJob.JobId,
3535
RayClusterSpec: nil,
3636
ClusterSelector: apiJob.ClusterSelector,

helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ type RayJobSpec struct {
5858
ShutdownAfterJobFinishes bool `json:"shutdownAfterJobFinishes,omitempty"`
5959
// TTLSecondsAfterFinished is the TTL to clean up RayCluster.
6060
// It's only working when ShutdownAfterJobFinishes set to true.
61-
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
61+
// +kubebuilder:default:=0
62+
TTLSecondsAfterFinished int32 `json:"ttlSecondsAfterFinished,omitempty"`
6263
// RayClusterSpec is the cluster template to run the job
6364
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
6465
// clusterSelector is used to select running rayclusters by labels

ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/apis/ray/v1alpha1/rayjob_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ type RayJobSpec struct {
5858
ShutdownAfterJobFinishes bool `json:"shutdownAfterJobFinishes,omitempty"`
5959
// TTLSecondsAfterFinished is the TTL to clean up RayCluster.
6060
// It's only working when ShutdownAfterJobFinishes set to true.
61-
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
61+
// +kubebuilder:default:=0
62+
TTLSecondsAfterFinished int32 `json:"ttlSecondsAfterFinished,omitempty"`
6263
// RayClusterSpec is the cluster template to run the job
6364
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
6465
// clusterSelector is used to select running rayclusters by labels

ray-operator/apis/ray/v1alpha1/zz_generated.deepcopy.go

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 48 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"k8s.io/utils/pointer"
1616
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1717
"sigs.k8s.io/controller-runtime/pkg/manager"
18-
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1918

2019
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
2120
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
@@ -123,9 +122,33 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
123122
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
124123
}
125124

126-
// Do not reconcile the RayJob if the deployment status is marked as Complete
127-
if rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusComplete {
128-
r.Log.Info("rayjob is complete, skip reconciliation", "rayjob", rayJobInstance.Name)
125+
r.Log.Info("RayJob", "name", rayJobInstance.Name, "namespace", rayJobInstance.Namespace, "JobStatus", rayJobInstance.Status.JobStatus, "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
126+
switch rayJobInstance.Status.JobDeploymentStatus {
127+
case rayv1.JobDeploymentStatusComplete:
128+
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
129+
r.Log.Info("JobDeploymentStatusComplete", "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
130+
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
131+
// TODO (kevin85421): Revisit EndTime and ensure it will always be set after the job is completed.
132+
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
133+
nowTime := time.Now()
134+
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
135+
r.Log.Info(
136+
"RayJob is completed",
137+
"shutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes,
138+
"ttlSecondsAfterFinished", ttlSeconds,
139+
"Status.endTime", rayJobInstance.Status.EndTime,
140+
"Now", nowTime,
141+
"ShutdownTime", shutdownTime)
142+
if shutdownTime.After(nowTime) {
143+
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
144+
r.Log.Info(fmt.Sprintf("shutdownTime not reached, requeue this RayJob for %d seconds", delta))
145+
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
146+
} else {
147+
if err = r.releaseComputeResources(ctx, rayJobInstance); err != nil {
148+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
149+
}
150+
}
151+
}
129152
return ctrl.Result{}, nil
130153
}
131154

@@ -135,28 +158,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
135158
// include STOPPED which is also a terminal status because `suspend` requires to stop the Ray job gracefully before
136159
// delete the RayCluster.
137160
if isJobSucceedOrFail(rayJobInstance.Status.JobStatus) {
138-
// If the function `updateState` updates the JobStatus to Complete successfully, we can skip the reconciliation.
139-
rayClusterInstance := &rayv1.RayCluster{}
140-
rayClusterNamespacedName := types.NamespacedName{
141-
Namespace: rayJobInstance.Namespace,
142-
Name: rayJobInstance.Status.RayClusterName,
143-
}
144-
if err := r.Get(ctx, rayClusterNamespacedName, rayClusterInstance); err != nil {
145-
if !errors.IsNotFound(err) {
146-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
147-
}
148-
if err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1.JobDeploymentStatusComplete); err != nil {
149-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
150-
}
151-
return ctrl.Result{}, nil
152-
}
153-
154-
if rayClusterInstance.DeletionTimestamp != nil {
155-
if err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1.JobDeploymentStatusComplete); err != nil {
156-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
157-
}
158-
return ctrl.Result{}, nil
161+
if err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1.JobDeploymentStatusComplete); err != nil {
162+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
159163
}
164+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
160165
}
161166

162167
// Set `Status.JobDeploymentStatus` to `JobDeploymentStatusInitializing`, and initialize `Status.JobId`
@@ -246,8 +251,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
246251
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
247252
}
248253

249-
_, err = r.deleteCluster(ctx, rayJobInstance)
250-
if err != nil && !errors.IsNotFound(err) {
254+
if err = r.releaseComputeResources(ctx, rayJobInstance); err != nil {
251255
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
252256
}
253257
// Since RayCluster instance is gone, remove it status also
@@ -268,39 +272,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
268272
}
269273
}
270274

271-
// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
272-
if isJobSucceedOrFail(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning {
273-
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
274-
// the RayJob is submitted against the RayCluster created by THIS job, so we can tear that
275-
// RayCluster down.
276-
if rayJobInstance.Spec.TTLSecondsAfterFinished != nil {
277-
r.Log.V(3).Info("TTLSecondsAfterSetting", "end_time", rayJobInstance.Status.EndTime.Time, "now", time.Now(), "ttl", *rayJobInstance.Spec.TTLSecondsAfterFinished)
278-
ttlDuration := time.Duration(*rayJobInstance.Spec.TTLSecondsAfterFinished) * time.Second
279-
if rayJobInstance.Status.EndTime.Time.Add(ttlDuration).After(time.Now()) {
280-
// time.Until prints duration until target time. We add additional 2 seconds to make sure we have buffer and requeueAfter is not 0.
281-
delta := int32(time.Until(rayJobInstance.Status.EndTime.Time.Add(ttlDuration).Add(2 * time.Second)).Seconds())
282-
r.Log.Info("TTLSecondsAfterFinish not reached, requeue it after", "RayJob", rayJobInstance.Name, "time(s)", delta)
283-
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
284-
}
285-
}
286-
r.Log.Info("shutdownAfterJobFinishes set to true, we will delete cluster",
287-
"RayJob", rayJobInstance.Name, "clusterName", fmt.Sprintf("%s/%s", rayJobInstance.Namespace, rayJobInstance.Status.RayClusterName))
288-
_, err = r.deleteCluster(ctx, rayJobInstance)
289-
if err != nil && !errors.IsNotFound(err) {
290-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
291-
}
292-
}
293-
}
294-
295-
// TODO (kevin85421): Use the source of truth `jobInfo.JobStatus` instead.
296-
if isJobPendingOrRunning(jobInfo.JobStatus) {
297-
// Requeue the RayJob to poll its status from the running Ray job
298-
r.Log.Info("Requeue the RayJob because the Ray job is not in a terminal state", "RayJob", rayJobInstance.Name, "JobStatus", jobInfo.JobStatus)
299-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
300-
}
301-
// Otherwise only reconcile the RayJob upon new events for watched resources
302-
// to avoid infinite reconciliation.
303-
return ctrl.Result{}, nil
275+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
304276
}
305277

306278
// createK8sJobIfNeed creates a Kubernetes Job for the RayJob if it doesn't exist.
@@ -388,6 +360,12 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *
388360
},
389361
}
390362

363+
// Without TTLSecondsAfterFinished, the job has a default deletion policy of `orphanDependents` causing
364+
// Pods created by an unmanaged Job to be left around after that Job is fully deleted.
365+
if rayJobInstance.Spec.ShutdownAfterJobFinishes {
366+
job.Spec.TTLSecondsAfterFinished = pointer.Int32(rayJobInstance.Spec.TTLSecondsAfterFinished)
367+
}
368+
391369
// Set the ownership in order to do the garbage collection by k8s.
392370
if err := ctrl.SetControllerReference(rayJobInstance, job, r.Scheme); err != nil {
393371
r.Log.Error(err, "failed to set controller reference")
@@ -404,31 +382,35 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *
404382
return nil
405383
}
406384

407-
func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1.RayJob) (reconcile.Result, error) {
385+
// Delete the RayCluster associated with the RayJob to release the compute resources.
386+
// In the future, we may also need to delete other Kubernetes resources. Note that
387+
// this function doesn't delete the Kubernetes Job. Instead, we use the built-in
388+
// TTL mechanism of the Kubernetes Job for deletion.
389+
func (r *RayJobReconciler) releaseComputeResources(ctx context.Context, rayJobInstance *rayv1.RayJob) error {
408390
clusterIdentifier := types.NamespacedName{
409391
Name: rayJobInstance.Status.RayClusterName,
410392
Namespace: rayJobInstance.Namespace,
411393
}
412394
cluster := rayv1.RayCluster{}
413395
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
414396
if !errors.IsNotFound(err) {
415-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
397+
return err
416398
}
399+
// If the cluster is not found, it means the cluster has been already deleted.
400+
// Don't return error to make this function idempotent.
417401
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
418-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
419402
} else {
420403
if cluster.DeletionTimestamp != nil {
421404
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
422405
} else {
423406
if err := r.Delete(ctx, &cluster); err != nil {
424-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
407+
return err
425408
}
426409
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
427410
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
428-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
429411
}
430412
}
431-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
413+
return nil
432414
}
433415

434416
// isJobSucceedOrFail indicates whether the job comes into end status.

0 commit comments

Comments
 (0)