Skip to content

Commit 8854cab

Browse files
committed
update
1 parent 59503c6 commit 8854cab

File tree

2 files changed

+33
-213
lines changed

2 files changed

+33
-213
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 33 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -515,101 +515,46 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
515515
}
516516

517517
rayClusterInstance := &rayv1.RayCluster{}
518-
err := r.Get(ctx, rayClusterNamespacedName, rayClusterInstance)
519-
if err == nil {
520-
r.Log.Info("Found associated RayCluster for RayJob", "rayjob", rayJobInstance.Name, "raycluster", rayClusterNamespacedName)
521-
522-
// Case1: The job is submitted to an existing ray cluster, simply return the rayClusterInstance.
523-
// We do not use rayJobInstance.Spec.RayClusterSpec == nil to check if the cluster selector mode is activated.
524-
// This is because a user might set both RayClusterSpec and ClusterSelector. with rayJobInstance.Spec.RayClusterSpec == nil,
525-
// though the RayJob controller will still use ClusterSelector, but it's now able to update the replica.
526-
// this could result in a conflict as both the RayJob controller and the autoscaler in the existing RayCluster might try to update replicas simultaneously.
527-
if len(rayJobInstance.Spec.ClusterSelector) != 0 {
528-
r.Log.Info("ClusterSelector is being used to select an existing RayCluster. RayClusterSpec will be disregarded", "raycluster", rayClusterNamespacedName)
529-
return rayClusterInstance, nil
530-
}
531-
532-
// Note, unlike the RayService, which creates new Ray clusters if any spec is changed,
533-
// RayJob only supports changing the replicas. Changes to other specs may lead to
534-
// unexpected behavior. Therefore, the following code focuses solely on updating replicas.
535-
536-
// Case2: In-tree autoscaling is enabled, only the autoscaler should update replicas to prevent race conditions
537-
// between user updates and autoscaler decisions. RayJob controller should not modify the replica. Consider this scenario:
538-
// 1. The autoscaler updates replicas to 10 based on the current workload.
539-
// 2. The user updates replicas to 15 in the RayJob YAML file.
540-
// 3. Both RayJob controller and the autoscaler attempt to update replicas, causing worker pods to be repeatedly created and terminated.
541-
if rayJobInstance.Spec.RayClusterSpec.EnableInTreeAutoscaling != nil && *rayJobInstance.Spec.RayClusterSpec.EnableInTreeAutoscaling {
542-
// Note, currently, there is no method to verify if the user has updated the RayJob since the last reconcile.
543-
// In future, we could utilize annotation that stores the hash of the RayJob since last reconcile to compare.
544-
// For now, we just log a warning message to remind the user regadless whether user has updated RayJob.
545-
r.Log.Info("Since in-tree autoscaling is enabled, any adjustments made to the RayJob will be disregarded and will not be propagated to the RayCluster.")
546-
return rayClusterInstance, nil
547-
}
548-
549-
// Case3: In-tree autoscaling is disabled, respect the user's replicas setting.
550-
// Loop over all worker groups and update replicas.
551-
areReplicasIdentical := true
552-
for i := range rayJobInstance.Spec.RayClusterSpec.WorkerGroupSpecs {
553-
if *rayClusterInstance.Spec.WorkerGroupSpecs[i].Replicas != *rayJobInstance.Spec.RayClusterSpec.WorkerGroupSpecs[i].Replicas {
554-
areReplicasIdentical = false
555-
*rayClusterInstance.Spec.WorkerGroupSpecs[i].Replicas = *rayJobInstance.Spec.RayClusterSpec.WorkerGroupSpecs[i].Replicas
518+
if err := r.Get(ctx, rayClusterNamespacedName, rayClusterInstance); err != nil {
519+
if errors.IsNotFound(err) {
520+
// TODO: If both ClusterSelector and RayClusterSpec are not set, we should avoid retrieving a RayCluster instance.
521+
// Consider moving this logic to a more appropriate location.
522+
if len(rayJobInstance.Spec.ClusterSelector) == 0 && rayJobInstance.Spec.RayClusterSpec == nil {
523+
err := fmt.Errorf("one of ClusterSelector or RayClusterSpec must be set, but both are undefined, err: %v", err)
524+
return nil, err
556525
}
557-
}
558-
559-
// Other specs rather than replicas are changed, warn the user that the RayJob supports replica changes only.
560-
if !utils.CompareJsonStruct(rayClusterInstance.Spec, *rayJobInstance.Spec.RayClusterSpec) {
561-
r.Log.Info("RayJob supports replica changes only. Adjustments made to other specs will be disregarded as they may cause unexpected behavior")
562-
}
563-
564-
// Avoid updating the RayCluster's replicas if it's identical to the RayJob's replicas.
565-
if areReplicasIdentical {
566-
return rayClusterInstance, nil
567-
}
568-
569-
r.Log.Info("Update RayCluster replica", "RayCluster", rayClusterNamespacedName)
570-
if err := r.Update(ctx, rayClusterInstance); err != nil {
571-
r.Log.Error(err, "Fail to update RayCluster replica!", "RayCluster", rayClusterNamespacedName)
572-
// Error updating the RayCluster object.
573-
return nil, client.IgnoreNotFound(err)
574-
}
575-
576-
} else if errors.IsNotFound(err) {
577-
// TODO: If both ClusterSelector and RayClusterSpec are not set, we should avoid retrieving a RayCluster instance.
578-
// Consider moving this logic to a more appropriate location.
579-
if len(rayJobInstance.Spec.ClusterSelector) == 0 && rayJobInstance.Spec.RayClusterSpec == nil {
580-
err := fmt.Errorf("one of ClusterSelector or RayClusterSpec must be set, but both are undefined, err: %v", err)
581-
return nil, err
582-
}
583526

584-
if len(rayJobInstance.Spec.ClusterSelector) != 0 {
585-
err := fmt.Errorf("we have choosed the cluster selector mode, failed to find the cluster named %v, err: %v", rayClusterInstanceName, err)
586-
return nil, err
587-
}
527+
if len(rayJobInstance.Spec.ClusterSelector) != 0 {
528+
err := fmt.Errorf("we have choosed the cluster selector mode, failed to find the cluster named %v, err: %v", rayClusterInstanceName, err)
529+
return nil, err
530+
}
588531

589-
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
590-
if rayJobInstance.Spec.Suspend {
591-
return nil, nil
592-
}
532+
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
533+
if rayJobInstance.Spec.Suspend {
534+
return nil, nil
535+
}
593536

594-
r.Log.Info("RayCluster not found, creating RayCluster!", "raycluster", rayClusterNamespacedName)
595-
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)
596-
if err != nil {
597-
r.Log.Error(err, "unable to construct a new rayCluster")
598-
// Error construct the RayCluster object - requeue the request.
599-
return nil, err
600-
}
601-
if err := r.Create(ctx, rayClusterInstance); err != nil {
602-
r.Log.Error(err, "unable to create rayCluster for rayJob", "rayCluster", rayClusterInstance)
603-
// Error creating the RayCluster object - requeue the request.
537+
r.Log.Info("RayCluster not found, creating RayCluster!", "raycluster", rayClusterNamespacedName)
538+
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)
539+
if err != nil {
540+
r.Log.Error(err, "unable to construct a new rayCluster")
541+
// Error construct the RayCluster object - requeue the request.
542+
return nil, err
543+
}
544+
if err := r.Create(ctx, rayClusterInstance); err != nil {
545+
r.Log.Error(err, "unable to create rayCluster for rayJob", "rayCluster", rayClusterInstance)
546+
// Error creating the RayCluster object - requeue the request.
547+
return nil, err
548+
}
549+
r.Log.Info("created rayCluster for rayJob", "rayCluster", rayClusterInstance)
550+
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Created", "Created cluster %s", rayJobInstance.Status.RayClusterName)
551+
} else {
552+
r.Log.Error(err, "Fail to get RayCluster!")
553+
// Error reading the RayCluster object - requeue the request.
604554
return nil, err
605555
}
606-
r.Log.Info("created rayCluster for rayJob", "rayCluster", rayClusterInstance)
607-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Created", "Created cluster %s", rayJobInstance.Status.RayClusterName)
608-
} else {
609-
r.Log.Error(err, "Get rayCluster instance error!")
610-
// Error reading the RayCluster object - requeue the request.
611-
return nil, err
612556
}
557+
r.Log.Info("Found associated RayCluster for RayJob", "RayJob", rayJobInstance.Name, "RayCluster", rayClusterNamespacedName)
613558

614559
return rayClusterInstance, nil
615560
}

ray-operator/controllers/ray/rayjob_controller_test.go

Lines changed: 0 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"k8s.io/utils/pointer"
3434

3535
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36-
"k8s.io/client-go/util/retry"
3736
"sigs.k8s.io/controller-runtime/pkg/client"
3837
// +kubebuilder:scaffold:imports
3938
)
@@ -203,35 +202,6 @@ var _ = Context("Inside the default namespace", func() {
203202
time.Second*15, time.Millisecond*500).Should(Equal(int(*myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)), fmt.Sprintf("workerGroup %v", workerPods.Items))
204203
})
205204

206-
// If in-tree autoscaling is disabled, the user should be able to update the replica settings.
207-
// This test verifies that the RayJob controller correctly updates the replica settings and increases the number of workers in this scenario.
208-
It("should increase number of worker pods by updating RayJob", func() {
209-
Eventually(
210-
getRayClusterNameForRayJob(ctx, myRayJob),
211-
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)
212-
213-
myRayCluster := &rayv1.RayCluster{}
214-
Eventually(
215-
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
216-
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
217-
218-
newReplicas := *myRayCluster.Spec.WorkerGroupSpecs[0].Replicas + 1
219-
220-
// simulate updating the RayJob directly.
221-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
222-
*myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas = newReplicas
223-
return k8sClient.Update(ctx, myRayJob)
224-
})
225-
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")
226-
227-
// confirm the number of worker pods increased.
228-
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: myRayJob.Status.RayClusterName, utils.RayNodeGroupLabelKey: myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].GroupName}
229-
workerPods := corev1.PodList{}
230-
Eventually(
231-
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
232-
time.Second*15, time.Millisecond*500).Should(Equal(int(newReplicas)), fmt.Sprintf("workerGroup %v", workerPods.Items))
233-
})
234-
235205
It("should be able to update all Pods to Running", func() {
236206
myRayCluster := &rayv1.RayCluster{}
237207
Eventually(
@@ -305,101 +275,6 @@ var _ = Context("Inside the default namespace", func() {
305275
})
306276
})
307277

308-
var _ = Context("Inside the default namespace with autoscaler", func() {
309-
ctx := context.TODO()
310-
myRayJob := myRayJob.DeepCopy()
311-
myRayJob.Name = "rayjob-test-with-autoscaler"
312-
upscalingMode := rayv1.UpscalingMode("Default")
313-
imagePullPolicy := corev1.PullPolicy("IfNotPresent")
314-
myRayJob.Spec.RayClusterSpec.EnableInTreeAutoscaling = pointer.Bool(true)
315-
myRayJob.Spec.RayClusterSpec.AutoscalerOptions = &rayv1.AutoscalerOptions{
316-
UpscalingMode: &upscalingMode,
317-
IdleTimeoutSeconds: pointer.Int32(1),
318-
ImagePullPolicy: &imagePullPolicy,
319-
}
320-
321-
Describe("When creating a rayjob", func() {
322-
It("should create a rayjob object", func() {
323-
err := k8sClient.Create(ctx, myRayJob)
324-
Expect(err).NotTo(HaveOccurred(), "failed to create test RayJob resource")
325-
})
326-
327-
It("should see a rayjob object", func() {
328-
Eventually(
329-
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Name, Namespace: "default"}, myRayJob),
330-
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayJob = %v", myRayJob.Name)
331-
})
332-
333-
// if In-tree autoscaling is enabled, the autoscaler should adjust the number of replicas based on the workload.
334-
// This test emulates the behavior of the autoscaler by directly updating the RayCluster and verifying if the number of worker pods increases accordingly.
335-
It("should create new worker since autoscaler increases the replica", func() {
336-
Eventually(
337-
getRayClusterNameForRayJob(ctx, myRayJob),
338-
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)
339-
340-
myRayCluster := &rayv1.RayCluster{}
341-
Eventually(
342-
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
343-
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
344-
345-
// simulate autoscaler by updating the RayCluster directly. Note that the autoscaler
346-
// will not update the RayJob directly.
347-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
348-
Eventually(
349-
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
350-
time.Second*3, time.Millisecond*500).Should(BeNil(), "Active RayCluster = %v", myRayCluster.Name)
351-
*myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = *myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas + 1
352-
return k8sClient.Update(ctx, myRayCluster)
353-
})
354-
Expect(err).NotTo(HaveOccurred(), "failed to update RayCluster replica")
355-
356-
// confirm a new worker pod is created.
357-
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: myRayJob.Status.RayClusterName, utils.RayNodeGroupLabelKey: myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].GroupName}
358-
workerPods := corev1.PodList{}
359-
Eventually(
360-
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
361-
time.Second*15, time.Millisecond*500).Should(Equal(int(*myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)+1), fmt.Sprintf("workerGroup %v", workerPods.Items))
362-
// confirm RayJob controller does not revert the number of workers.
363-
Consistently(
364-
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
365-
time.Second*5, time.Millisecond*500).Should(Equal(int(*myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)+1), fmt.Sprintf("workerGroup %v", workerPods.Items))
366-
})
367-
368-
// if In-tree autoscaling is enabled, only the autoscaler should update replicas to prevent race conditions
369-
// between user updates and autoscaler decisions. RayJob controller should not modify the replica. Consider this scenario:
370-
// 1. The autoscaler updates replicas to 10 based on the current workload.
371-
// 2. The user updates replicas to 15 in the RayJob YAML file.
372-
// 3. Both RayJob controller and the autoscaler attempt to update replicas, causing worker pods to be repeatedly created and terminated.
373-
// This test emulates a user attempting to update the replica and verifies that the number of worker pods remains unaffected by this change.
374-
It("should not increase number of workers by updating RayJob", func() {
375-
Eventually(
376-
getRayClusterNameForRayJob(ctx, myRayJob),
377-
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)
378-
379-
myRayCluster := &rayv1.RayCluster{}
380-
Eventually(
381-
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
382-
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
383-
384-
oldReplicas := *myRayCluster.Spec.WorkerGroupSpecs[0].Replicas
385-
386-
// simulate updating the RayJob directly.
387-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
388-
*myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas = *myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas + 1
389-
return k8sClient.Update(ctx, myRayJob)
390-
})
391-
Expect(err).NotTo(HaveOccurred(), "failed to update RayJob")
392-
393-
// confirm the number of worker pods is not changed.
394-
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: myRayJob.Status.RayClusterName, utils.RayNodeGroupLabelKey: myRayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].GroupName}
395-
workerPods := corev1.PodList{}
396-
Consistently(
397-
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
398-
time.Second*5, time.Millisecond*500).Should(Equal(int(oldReplicas)), fmt.Sprintf("workerGroup %v", workerPods.Items))
399-
})
400-
})
401-
})
402-
403278
func getRayClusterNameForRayJob(ctx context.Context, rayJob *rayv1.RayJob) func() (string, error) {
404279
return func() (string, error) {
405280
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: "default"}, rayJob); err != nil {

0 commit comments

Comments
 (0)