6
6
"time"
7
7
8
8
"github.com/go-logr/logr"
9
- fmtErrors "github.com/pkg/errors"
10
9
batchv1 "k8s.io/api/batch/v1"
11
10
corev1 "k8s.io/api/core/v1"
12
11
"k8s.io/apimachinery/pkg/api/errors"
@@ -146,14 +145,14 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
146
145
if ! errors .IsNotFound (err ) {
147
146
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
148
147
}
149
- if err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusComplete , nil ); err != nil {
148
+ if err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusComplete ); err != nil {
150
149
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
151
150
}
152
151
return ctrl.Result {}, nil
153
152
}
154
153
155
154
if rayClusterInstance .DeletionTimestamp != nil {
156
- if err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusComplete , nil ); err != nil {
155
+ if err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusComplete ); err != nil {
157
156
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
158
157
}
159
158
return ctrl.Result {}, nil
@@ -168,7 +167,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
168
167
169
168
var rayClusterInstance * rayv1.RayCluster
170
169
if rayClusterInstance , err = r .getOrCreateRayClusterInstance (ctx , rayJobInstance ); err != nil {
171
- err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusFailedToGetOrCreateRayCluster , err )
172
170
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
173
171
}
174
172
// If there is no cluster instance and no error suspend the job deployment
@@ -177,7 +175,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
177
175
if rayJobInstance .Status .JobDeploymentStatus == rayv1 .JobDeploymentStatusSuspended {
178
176
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
179
177
}
180
- err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusSuspended , err )
178
+ err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusSuspended )
181
179
if err != nil {
182
180
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
183
181
}
@@ -216,7 +214,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
216
214
// Ensure k8s job has been created
217
215
jobName , wasJobCreated , err := r .getOrCreateK8sJob (ctx , rayJobInstance , rayClusterInstance )
218
216
if err != nil {
219
- err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusFailedJobDeploy , err )
220
217
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
221
218
}
222
219
@@ -234,27 +231,29 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
234
231
if err != nil {
235
232
if errors .IsNotFound (err ) {
236
233
r .Log .Info ("Job not found" , "RayJob" , rayJobInstance .Name , "jobId" , jobName )
237
- err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusWaitForK8sJob , err )
238
234
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
239
235
}
240
236
r .Log .Error (err , "failed to get k8s job" )
241
- err = r .updateState (ctx , rayJobInstance , nil , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusFailedToGetJobStatus , err )
242
237
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
243
238
}
244
239
245
240
// Check the current status of ray jobs
246
241
jobInfo , err := rayDashboardClient .GetJobInfo (ctx , rayJobInstance .Status .JobId )
247
242
if err != nil {
248
243
r .Log .Error (err , "failed to get job info" , "jobId" , rayJobInstance .Status .JobId )
249
- err = r .updateState (ctx , rayJobInstance , jobInfo , rayJobInstance .Status .JobStatus , rayv1 .JobDeploymentStatusFailedToGetJobStatus , err )
250
244
// Dashboard service in head pod takes time to start, it's possible we get connection refused error.
251
245
// Requeue after few seconds to avoid continuous connection errors.
252
246
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
253
247
}
254
248
255
- // Update RayJob.Status (Kubernetes CR) from Ray Job Status from Dashboard service
256
- if r .shouldUpdateJobStatus (rayJobInstance .Status .JobStatus , rayJobInstance .Status .JobDeploymentStatus , jobInfo ) {
257
- err = r .updateState (ctx , rayJobInstance , jobInfo , jobInfo .JobStatus , rayv1 .JobDeploymentStatusRunning , nil )
249
+ if jobInfo != nil {
250
+ // TODO (kevin85421): `GetJobInfo` should not return both JobInfo and error with nil values,
251
+ // but it does when the job is not found. This check is a workaround to avoid dereferencing
252
+ // a nil pointer.
253
+ err = r .updateState (ctx , rayJobInstance , jobInfo , jobInfo .JobStatus , rayv1 .JobDeploymentStatusRunning )
254
+ }
255
+
256
+ if err != nil {
258
257
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
259
258
}
260
259
@@ -288,7 +287,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
288
287
rayJobInstance .Status .DashboardURL = ""
289
288
rayJobInstance .Status .JobId = ""
290
289
rayJobInstance .Status .Message = ""
291
- err = r .updateState (ctx , rayJobInstance , jobInfo , rayv1 .JobStatusStopped , rayv1 .JobDeploymentStatusSuspended , nil )
290
+ err = r .updateState (ctx , rayJobInstance , jobInfo , rayv1 .JobStatusStopped , rayv1 .JobDeploymentStatusSuspended )
292
291
if err != nil {
293
292
return ctrl.Result {RequeueAfter : RayJobDefaultRequeueDuration }, err
294
293
}
@@ -520,22 +519,11 @@ func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *r
520
519
rayJob .Status .JobStatus = rayv1 .JobStatusPending
521
520
}
522
521
523
- return r .updateState (ctx , rayJob , nil , rayJob .Status .JobStatus , rayv1 .JobDeploymentStatusInitializing , nil )
524
- }
525
-
526
- func (r * RayJobReconciler ) shouldUpdateJobStatus (oldJobStatus rayv1.JobStatus , oldJobDeploymentStatus rayv1.JobDeploymentStatus , jobInfo * utils.RayJobInfo ) bool {
527
- if jobInfo != nil {
528
- jobStatusChanged := (oldJobStatus != jobInfo .JobStatus )
529
- // If the status changed, or if we didn't have the status before and now we have it, update the status and deployment status.
530
- if jobStatusChanged || oldJobDeploymentStatus == rayv1 .JobDeploymentStatusFailedToGetJobStatus {
531
- return true
532
- }
533
- }
534
- return false
522
+ return r .updateState (ctx , rayJob , nil , rayJob .Status .JobStatus , rayv1 .JobDeploymentStatusInitializing )
535
523
}
536
524
537
525
// make sure the priority is correct
538
- func (r * RayJobReconciler ) updateState (ctx context.Context , rayJob * rayv1.RayJob , jobInfo * utils.RayJobInfo , jobStatus rayv1.JobStatus , jobDeploymentStatus rayv1.JobDeploymentStatus , err error ) error {
526
+ func (r * RayJobReconciler ) updateState (ctx context.Context , rayJob * rayv1.RayJob , jobInfo * utils.RayJobInfo , jobStatus rayv1.JobStatus , jobDeploymentStatus rayv1.JobDeploymentStatus ) error {
539
527
r .Log .Info ("UpdateState" , "oldJobStatus" , rayJob .Status .JobStatus , "newJobStatus" , jobStatus , "oldJobDeploymentStatus" , rayJob .Status .JobDeploymentStatus , "newJobDeploymentStatus" , jobDeploymentStatus )
540
528
541
529
// Let's skip update the APIServer if it's synced.
@@ -558,10 +546,10 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1.RayJob
558
546
// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
559
547
rayJob .Status .ObservedGeneration = rayJob .ObjectMeta .Generation
560
548
561
- if errStatus := r .Status ().Update (ctx , rayJob ); errStatus != nil {
562
- return fmtErrors . Errorf ( "combined error: %v %v" , err , errStatus )
549
+ if err := r .Status ().Update (ctx , rayJob ); err != nil {
550
+ return err
563
551
}
564
- return err
552
+ return nil
565
553
}
566
554
567
555
// TODO: select existing rayclusters by ClusterSelector
@@ -645,11 +633,6 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
645
633
return nil , err
646
634
}
647
635
648
- // special case: is the job is complete status and cluster has been recycled.
649
- if isJobSucceedOrFail (rayJobInstance .Status .JobStatus ) && rayJobInstance .Status .JobDeploymentStatus == rayv1 .JobDeploymentStatusComplete {
650
- r .Log .Info ("The cluster has been recycled for the job, skip duplicate creation" , "rayjob" , rayJobInstance .Name )
651
- return nil , err
652
- }
653
636
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
654
637
if rayJobInstance .Spec .Suspend {
655
638
return nil , nil
0 commit comments