Skip to content

Commit 1594e88

Browse files
authored
[RayJob][Status][5/n] Refactor getOrCreateK8sJob (#1750)
1 parent 62bbc13 commit 1594e88

File tree

2 files changed

+21
-51
lines changed

2 files changed

+21
-51
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -212,28 +212,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
212212
rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL)
213213

214214
// Ensure k8s job has been created
215-
jobName, wasJobCreated, err := r.getOrCreateK8sJob(ctx, rayJobInstance, rayClusterInstance)
216-
if err != nil {
217-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
218-
}
219-
220-
if wasJobCreated {
221-
r.Log.Info("K8s job successfully created", "RayJob", rayJobInstance.Name, "jobId", jobName)
222-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Created", "Created k8s job %s", jobName)
223-
} else {
224-
r.Log.Info("K8s job successfully retrieved", "RayJob", rayJobInstance.Name, "jobId", jobName)
225-
}
226-
227-
// Check the status of the k8s job and update the RayJobInstance status accordingly.
228-
// Get the k8s job
229-
k8sJob := &batchv1.Job{}
230-
err = r.Client.Get(ctx, types.NamespacedName{Name: jobName, Namespace: rayJobInstance.Namespace}, k8sJob)
231-
if err != nil {
232-
if errors.IsNotFound(err) {
233-
r.Log.Info("Job not found", "RayJob", rayJobInstance.Name, "jobId", jobName)
234-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
235-
}
236-
r.Log.Error(err, "failed to get k8s job")
215+
if err := r.createK8sJobIfNeed(ctx, rayJobInstance, rayClusterInstance); err != nil {
237216
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
238217
}
239218

@@ -262,17 +241,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
262241
// the RayJob is submitted against the RayCluster created by THIS job, then
263242
// try to gracefully stop the Ray job and delete (suspend) the cluster
264243
if rayJobInstance.Spec.Suspend && len(rayJobInstance.Spec.ClusterSelector) == 0 {
265-
info, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
266-
if err != nil {
267-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
268-
}
269-
if !rayv1.IsJobTerminal(info.JobStatus) {
244+
if !rayv1.IsJobTerminal(jobInfo.JobStatus) {
270245
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
271246
if err != nil {
272247
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
273248
}
274249
}
275-
if info.JobStatus != rayv1.JobStatusStopped {
250+
if jobInfo.JobStatus != rayv1.JobStatusStopped {
276251
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
277252
}
278253

@@ -325,6 +300,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
325300
}
326301
}
327302

303+
// TODO (kevin85421): Use the source of truth `jobInfo.JobStatus` instead.
328304
if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) {
329305
// Requeue the RayJob to poll its status from the running Ray job
330306
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
@@ -334,8 +310,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
334310
return ctrl.Result{}, nil
335311
}
336312

337-
// getOrCreateK8sJob creates a Kubernetes Job for the Ray Job if it doesn't exist, otherwise returns the existing one. It returns the Job name and a boolean indicating whether the Job was created.
338-
func (r *RayJobReconciler) getOrCreateK8sJob(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (string, bool, error) {
313+
// createK8sJobIfNeed creates a Kubernetes Job for the RayJob if it doesn't exist.
314+
func (r *RayJobReconciler) createK8sJobIfNeed(ctx context.Context, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) error {
339315
jobName := rayJobInstance.Name
340316
jobNamespace := rayJobInstance.Namespace
341317

@@ -346,18 +322,18 @@ func (r *RayJobReconciler) getOrCreateK8sJob(ctx context.Context, rayJobInstance
346322
submitterTemplate, err := r.getSubmitterTemplate(rayJobInstance, rayClusterInstance)
347323
if err != nil {
348324
r.Log.Error(err, "failed to get submitter template")
349-
return "", false, err
325+
return err
350326
}
351327
return r.createNewK8sJob(ctx, rayJobInstance, submitterTemplate)
352328
}
353329

354330
// Some other error occurred while trying to get the Job
355-
r.Log.Error(err, "failed to get k8s Job")
356-
return "", false, err
331+
r.Log.Error(err, "failed to get Kubernetes Job")
332+
return err
357333
}
358334

359-
// Job already exists, instead of returning an error we return a "success"
360-
return jobName, false, nil
335+
r.Log.Info("Kubernetes Job already exists", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
336+
return nil
361337
}
362338

363339
// getSubmitterTemplate builds the submitter pod template for the Ray job.
@@ -399,8 +375,8 @@ func (r *RayJobReconciler) getSubmitterTemplate(rayJobInstance *rayv1.RayJob, ra
399375
return submitterTemplate, nil
400376
}
401377

402-
// createNewK8sJob creates a new Kubernetes Job. It returns the Job's name and a boolean indicating whether a new Job was created.
403-
func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *rayv1.RayJob, submitterTemplate corev1.PodTemplateSpec) (string, bool, error) {
378+
// createNewK8sJob creates a new Kubernetes Job. It returns an error.
379+
func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *rayv1.RayJob, submitterTemplate corev1.PodTemplateSpec) error {
404380
job := &batchv1.Job{
405381
ObjectMeta: metav1.ObjectMeta{
406382
Name: rayJobInstance.Name,
@@ -422,17 +398,17 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *
422398
// Set the ownership in order to do the garbage collection by k8s.
423399
if err := ctrl.SetControllerReference(rayJobInstance, job, r.Scheme); err != nil {
424400
r.Log.Error(err, "failed to set controller reference")
425-
return "", false, err
401+
return err
426402
}
427403

428404
// Create the Kubernetes Job
429405
if err := r.Client.Create(ctx, job); err != nil {
430406
r.Log.Error(err, "failed to create k8s Job")
431-
return "", false, err
407+
return err
432408
}
433-
434-
// Return the Job's name and true indicating a new job was created
435-
return job.Name, true, nil
409+
r.Log.Info("Kubernetes Job created", "RayJob", rayJobInstance.Name, "Kubernetes Job", job.Name)
410+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Created", "Created Kubernetes Job %s", job.Name)
411+
return nil
436412
}
437413

438414
func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1.RayJob) (reconcile.Result, error) {

ray-operator/controllers/ray/rayjob_controller_unit_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
1717
)
1818

19-
func TestGetOrCreateK8sJob(t *testing.T) {
19+
func TestCreateK8sJobIfNeed(t *testing.T) {
2020
newScheme := runtime.NewScheme()
2121
_ = rayv1.AddToScheme(newScheme)
2222
_ = batchv1.AddToScheme(newScheme)
@@ -67,21 +67,15 @@ func TestGetOrCreateK8sJob(t *testing.T) {
6767
Recorder: &record.FakeRecorder{},
6868
}
6969

70-
retrievedJobName, wasCreated, err := rayJobReconciler.getOrCreateK8sJob(ctx, rayJob, rayCluster)
71-
70+
err := rayJobReconciler.createK8sJobIfNeed(ctx, rayJob, rayCluster)
7271
assert.NoError(t, err)
73-
assert.False(t, wasCreated)
74-
assert.Equal(t, "test-rayjob", retrievedJobName)
7572

7673
// Test 2: Create a new k8s job if it does not already exist
7774
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(rayCluster, rayJob).Build()
7875
rayJobReconciler.Client = fakeClient
7976

80-
retrievedJobName, wasCreated, err = rayJobReconciler.getOrCreateK8sJob(ctx, rayJob, rayCluster)
81-
77+
err = rayJobReconciler.createK8sJobIfNeed(ctx, rayJob, rayCluster)
8278
assert.NoError(t, err)
83-
assert.True(t, wasCreated)
84-
assert.Equal(t, "test-rayjob", retrievedJobName)
8579
}
8680

8781
func TestGetSubmitterTemplate(t *testing.T) {

0 commit comments

Comments
 (0)