From 534357deb3f48a0874183460c2ded8d473a77850 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 11 Dec 2023 12:10:11 -0800 Subject: [PATCH 01/23] Add ClusterAction for active and pending cluster Signed-off-by: Archit Kulkarni --- .../controllers/ray/common/constant.go | 17 +- .../controllers/ray/rayservice_controller.go | 178 ++++++++++++++---- 2 files changed, 146 insertions(+), 49 deletions(-) diff --git a/ray-operator/controllers/ray/common/constant.go b/ray-operator/controllers/ray/common/constant.go index 21df7783e3d..76c5eaca68e 100644 --- a/ray-operator/controllers/ray/common/constant.go +++ b/ray-operator/controllers/ray/common/constant.go @@ -5,14 +5,15 @@ const ( // Default application name DefaultServeAppName = "default" // Belows used as label key - RayServiceLabelKey = "ray.io/service" - RayClusterLabelKey = "ray.io/cluster" - RayNodeTypeLabelKey = "ray.io/node-type" - RayNodeGroupLabelKey = "ray.io/group" - RayNodeLabelKey = "ray.io/is-ray-node" - RayIDLabelKey = "ray.io/identifier" - RayClusterServingServiceLabelKey = "ray.io/serve" - RayServiceClusterHashKey = "ray.io/cluster-hash" + RayServiceLabelKey = "ray.io/service" + RayClusterLabelKey = "ray.io/cluster" + RayNodeTypeLabelKey = "ray.io/node-type" + RayNodeGroupLabelKey = "ray.io/group" + RayNodeLabelKey = "ray.io/is-ray-node" + RayIDLabelKey = "ray.io/identifier" + RayClusterServingServiceLabelKey = "ray.io/serve" + HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" + HashWithoutWorkerGroupSpecKey = "ray.io/hash-without-worker-group-spec" // In KubeRay, the Ray container must be the first application container in a head or worker Pod. RayContainerIndex = 0 diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index ae93466aea7..13be70a2b50 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -358,7 +358,8 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi return nil, nil, err } - if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) { + clusterAction := r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) + if clusterAction == RolloutNew { // For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously. // Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades. enableZeroDowntime := true @@ -366,11 +367,19 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi enableZeroDowntime = false } if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil { - r.markRestart(rayServiceInstance) + // Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will be false and we will + // actually create the pending RayCluster instance. + r.markRestartAndAddPendingClusterName(rayServiceInstance) } else { r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.") } return activeRayCluster, nil, nil + } else if clusterAction == Update { + // Update the active cluster. + if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil { + return nil, nil, err + } + return activeRayCluster, nil, nil } if pendingRayCluster, err = r.createRayClusterInstanceIfNeeded(ctx, rayServiceInstance, pendingRayCluster); err != nil { @@ -462,67 +471,114 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(rayServiceInstance *rayv1 } } +type ClusterAction int + +const ( + DoNothing ClusterAction = iota // value 0 + Update // value 1 + RolloutNew // value 2 +) + // shouldPrepareNewRayCluster checks if we need to generate a new pending cluster. -func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) bool { +func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) ClusterAction { // Prepare new RayCluster if: // 1. No active cluster and no pending cluster // 2. No pending cluster, and the active RayCluster has changed. if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" { if activeRayCluster == nil { r.Log.Info("No active Ray cluster. RayService operator should prepare a new Ray cluster.") - return true + return RolloutNew } - activeClusterHash := activeRayCluster.ObjectMeta.Annotations[common.RayServiceClusterHashKey] - goalClusterHash, err := generateRayClusterJsonHash(rayServiceInstance.Spec.RayClusterSpec) + // TODO (Archit): Maybe factor out the above stuff + + // Case 1: If everything is identical except for the Replicas and WorkersToDelete of + // each WorkerGroup, then do nothing. + activeClusterHash := activeRayCluster.ObjectMeta.Annotations[common.HashWithoutReplicasAndWorkersToDeleteKey] + goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) + errContextFailedToSerialize := "Failed to serialize new RayCluster config. " + + "Manual config updates will NOT be tracked accurately. " + + "Please manually tear down the cluster and apply a new config." if err != nil { - errContext := "Failed to serialize new RayCluster config. " + - "Manual config updates will NOT be tracked accurately. " + - "Please manually tear down the cluster and apply a new config." - r.Log.Error(err, errContext) - return true + r.Log.Error(err, errContextFailedToSerialize) + return RolloutNew } - if activeClusterHash != goalClusterHash { - r.Log.Info("Active RayCluster config doesn't match goal config. " + - "RayService operator should prepare a new Ray cluster.\n" + - "* Active RayCluster config hash: " + activeClusterHash + "\n" + - "* Goal RayCluster config hash: " + goalClusterHash) - } else { - r.Log.Info("Active Ray cluster config matches goal config.") + if activeClusterHash == goalClusterHash { + r.Log.Info("Active Ray cluster config matches goal config. No need to update RayCluster.") + return DoNothing + } + + // Case 2: Otherwise, if everything is identical except some change in WorkerGroupSpecs, + // then Update. + + activeClusterHash = activeRayCluster.ObjectMeta.Annotations[common.HashWithoutWorkerGroupSpecKey] + goalClusterHash, err = generateHashWithoutWorkerGroupSpec(rayServiceInstance.Spec.RayClusterSpec) + if err != nil { + r.Log.Error(err, errContextFailedToSerialize) + return RolloutNew } - return activeClusterHash != goalClusterHash + if activeClusterHash == goalClusterHash { + r.Log.Info("Active Ray cluster config matches goal config, except for WorkerGroupSpecs. Updating RayCluster.") + return Update + } + + // Case 3: Otherwise, rollout a new cluster. + r.Log.Info("Active RayCluster config doesn't match goal config. " + + "RayService operator should prepare a new Ray cluster.\n" + + "* Active RayCluster config hash: " + activeClusterHash + "\n" + + "* Goal RayCluster config hash: " + goalClusterHash) + return RolloutNew } - return false + return DoNothing } // createRayClusterInstanceIfNeeded checks if we need to create a new RayCluster instance. If so, create one. func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Context, rayServiceInstance *rayv1.RayService, pendingRayCluster *rayv1.RayCluster) (*rayv1.RayCluster, error) { + // Early return if no pending RayCluster needs to be created. if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" { - // No exist pending RayCluster and no need to create one. return nil, nil } - // Create a new RayCluster if: - // 1. No RayCluster pending. - // 2. Config update for the pending cluster. - equal, err := compareRayClusterJsonHash(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec) - if err != nil { - r.Log.Error(err, "Fail to generate hash for RayClusterSpec") - return nil, err - } + var clusterAction ClusterAction + var err error - if pendingRayCluster == nil || !equal { - pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) + if pendingRayCluster == nil { + clusterAction = RolloutNew + } else { + clusterAction, err = getClusterAction(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec) if err != nil { + r.Log.Error(err, "Fail to generate hash for RayClusterSpec") return nil, err } } + switch clusterAction { + case RolloutNew: + pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) + case Update: + err = r.updateRayClusterInstance(ctx, pendingRayCluster) + } + + if err != nil { + return nil, err + } + return pendingRayCluster, nil } +// updateRayClusterInstance updates the RayCluster instance. +func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error { + r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance", rayClusterInstance) + if err := r.Update(ctx, rayClusterInstance); err != nil { + r.Log.Error(err, "Fail to update RayCluster "+rayClusterInstance.Name) + return err + } + r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", rayClusterInstance) + return nil +} + // createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance. // One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance. func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) { @@ -585,11 +641,16 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv rayClusterAnnotations[k] = v } rayClusterAnnotations[common.EnableServeServiceKey] = common.EnableServeServiceTrue - rayClusterAnnotations[common.RayServiceClusterHashKey], err = generateRayClusterJsonHash(rayService.Spec.RayClusterSpec) + errContext := "Failed to serialize RayCluster config. " + + "Manual config updates will NOT be tracked accurately. " + + "Please tear down the cluster and apply a new config." + rayClusterAnnotations[common.HashWithoutReplicasAndWorkersToDeleteKey], err = generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) // ARCHIT: new hash saved here + if err != nil { + r.Log.Error(err, errContext) + return nil, err + } + rayClusterAnnotations[common.HashWithoutWorkerGroupSpecKey], err = generateHashWithoutWorkerGroupSpec(rayService.Spec.RayClusterSpec) // ARCHIT: new hash saved here if err != nil { - errContext := "Failed to serialize RayCluster config. " + - "Manual config updates will NOT be tracked accurately. " + - "Please tear down the cluster and apply a new config." r.Log.Error(err, errContext) return nil, err } @@ -856,7 +917,7 @@ func updateDashboardStatus(rayServiceClusterStatus *rayv1.RayServiceStatus, isHe } } -func (r *RayServiceReconciler) markRestart(rayServiceInstance *rayv1.RayService) { +func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(rayServiceInstance *rayv1.RayService) { // Generate RayCluster name for pending cluster. r.Log.V(1).Info("Current cluster is unhealthy, prepare to restart.", "Status", rayServiceInstance.Status) rayServiceInstance.Status.ServiceStatus = rayv1.Restarting @@ -1131,8 +1192,43 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu return nil } -func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, error) { - // Mute all fields that will not trigger new RayCluster preparation. For example, +func getClusterAction(old_spec rayv1.RayClusterSpec, new_spec rayv1.RayClusterSpec) (ClusterAction, error) { + // Return the appropriate action based on the difference in the old and new RayCluster specs. + + // Case 1: If everything is identical except for the Replicas and WorkersToDelete of + // each WorkerGroup, then do nothing. + same_hash, err := compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutReplicasAndWorkersToDelete) + if err != nil { + return DoNothing, err + } + if same_hash { + return DoNothing, nil + } + + // Case 2: Otherwise, if everything is identical except some change in WorkerGroupSpecs, + // then Update. + same_hash, err = compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutWorkerGroupSpec) + if err != nil { + return DoNothing, err + } + if same_hash { + return Update, nil + } + + // Case 3: Otherwise, rollout a new cluster. + return RolloutNew, nil +} + +func generateHashWithoutWorkerGroupSpec(rayClusterSpec rayv1.RayClusterSpec) (string, error) { + updatedRayClusterSpec := rayClusterSpec.DeepCopy() + updatedRayClusterSpec.WorkerGroupSpecs = nil + + // Generate a hash for the RayClusterSpec. + return utils.GenerateJsonHash(updatedRayClusterSpec) +} + +func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClusterSpec) (string, error) { + // Mute certain fields that will not trigger new RayCluster preparation. For example, // Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. updatedRayClusterSpec := rayClusterSpec.DeepCopy() for i := 0; i < len(updatedRayClusterSpec.WorkerGroupSpecs); i++ { @@ -1144,13 +1240,13 @@ func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, er return utils.GenerateJsonHash(updatedRayClusterSpec) } -func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec) (bool, error) { - hash1, err1 := generateRayClusterJsonHash(spec1) +func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec, hashFunc func(rayv1.RayClusterSpec) (string, error)) (bool, error) { + hash1, err1 := hashFunc(spec1) if err1 != nil { return false, err1 } - hash2, err2 := generateRayClusterJsonHash(spec2) + hash2, err2 := hashFunc(spec2) if err2 != nil { return false, err2 } From 3b3ca3f354ffb72a1b77063efa662d4a5ae80f55 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 11 Dec 2023 14:50:56 -0800 Subject: [PATCH 02/23] Fix unit tests Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_unit_test.go | 106 +++++++++++++++--- 1 file changed, 92 insertions(+), 14 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 39dd5c9bc93..2394da5160e 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -23,7 +23,7 @@ import ( clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" ) -func TestGenerateRayClusterJsonHash(t *testing.T) { +func TestGenerateHashWithoutReplicasAndWorkersToDelete(t *testing.T) { // `generateRayClusterJsonHash` will mute fields that will not trigger new RayCluster preparation. For example, // Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. Hence, `hash1` should be equal to // `hash2` in this case. @@ -43,36 +43,111 @@ func TestGenerateRayClusterJsonHash(t *testing.T) { }, } - hash1, err := generateRayClusterJsonHash(cluster.Spec) + hash1, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) assert.Nil(t, err) *cluster.Spec.WorkerGroupSpecs[0].Replicas++ - hash2, err := generateRayClusterJsonHash(cluster.Spec) + hash2, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) assert.Nil(t, err) assert.Equal(t, hash1, hash2) // RayVersion will not be muted, so `hash3` should not be equal to `hash1`. cluster.Spec.RayVersion = "2.100.0" - hash3, err := generateRayClusterJsonHash(cluster.Spec) + hash3, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) assert.Nil(t, err) assert.NotEqual(t, hash1, hash3) + + // MinReplicas will not be muted, so `hash4` should not be equal to `hash1`. + *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ + hash4, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) + assert.Nil(t, err) + assert.NotEqual(t, hash1, hash4) } -func TestCompareRayClusterJsonHash(t *testing.T) { - cluster1 := rayv1.RayCluster{ +func TestGenerateHashWithoutWorkerGroupSpec(t *testing.T) { + // `generateRayClusterJsonHash` will mute fields that will not trigger new RayCluster preparation. For example, + // Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. Hence, `hash1` should be equal to + // `hash2` in this case. + cluster := rayv1.RayCluster{ Spec: rayv1.RayClusterSpec{ RayVersion: "2.8.0", + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }, + }, }, } - cluster2 := cluster1.DeepCopy() - cluster2.Spec.RayVersion = "2.100.0" - equal, err := compareRayClusterJsonHash(cluster1.Spec, cluster2.Spec) + + hash1, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) - assert.False(t, equal) - equal, err = compareRayClusterJsonHash(cluster1.Spec, cluster1.Spec) + cluster.Spec.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} + hash2, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) - assert.True(t, equal) + assert.Equal(t, hash1, hash2) + + // RayVersion will not be muted, so `hash3` should not be equal to `hash1`. + cluster.Spec.RayVersion = "2.100.0" + hash3, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) + assert.Nil(t, err) + assert.NotEqual(t, hash1, hash3) + + // MinReplicas will be muted, so `hash4` should equal to `hash1`. + *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ + hash4, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) + assert.Nil(t, err) + assert.Equal(t, hash1, hash4) + + // Adding a new worker group spec should not affect the hash. + cluster.Spec.WorkerGroupSpecs = append(cluster.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }) + hash5, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) + assert.Nil(t, err) + assert.Equal(t, hash1, hash5) +} + +func TestGetClusterAction(t *testing.T) { + clusterSpec1 := rayv1.RayClusterSpec{ + RayVersion: "2.8.0", + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }, + }, + } + clusterSpec2 := clusterSpec1.DeepCopy() + clusterSpec2.RayVersion = "2.100.0" + + // Test Case 1: Different RayVersions should lead to RolloutNew. + action, err := getClusterAction(clusterSpec1, *clusterSpec2) + assert.Nil(t, err) + assert.Equal(t, RolloutNew, action) + + // Test Case 2: Same spec should lead to DoNothing. + action, err = getClusterAction(clusterSpec1, clusterSpec1) + assert.Nil(t, err) + assert.Equal(t, DoNothing, action) + + // Test Case 3: Only WorkerGroupSpecs different should lead to Update. + clusterSpec3 := clusterSpec1.DeepCopy() + clusterSpec3.WorkerGroupSpecs[0].Replicas = pointer.Int32(5) + action, err = getClusterAction(clusterSpec1, *clusterSpec3) + assert.Nil(t, err) + assert.Equal(t, Update, action) } func TestInconsistentRayServiceStatuses(t *testing.T) { @@ -669,14 +744,17 @@ func TestReconcileRayCluster(t *testing.T) { Status: rayv1.RayServiceStatuses{}, } - hash, err := generateRayClusterJsonHash(rayService.Spec.RayClusterSpec) + hash1, err := generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) + assert.Nil(t, err) + hash2, err := generateHashWithoutWorkerGroupSpec(rayService.Spec.RayClusterSpec) assert.Nil(t, err) activeCluster := rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{ - utils.RayServiceClusterHashKey: hash, + utils.HashWithoutReplicasAndWorkersToDeleteKey: hash1, + utils.HashWithoutWorkerGroupSpecKey: hash2, }, }, } From 19db35ae3fc29b11c4681a258d55f54573b04304 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 11 Dec 2023 15:05:26 -0800 Subject: [PATCH 03/23] Fix unit test Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_unit_test.go | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 2394da5160e..e76d7e442fa 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -87,22 +87,17 @@ func TestGenerateHashWithoutWorkerGroupSpec(t *testing.T) { hash1, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) - cluster.Spec.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} + // MinReplicas will be muted, so `hash4` should equal to `hash1`. + *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ hash2, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) assert.Equal(t, hash1, hash2) - // RayVersion will not be muted, so `hash3` should not be equal to `hash1`. - cluster.Spec.RayVersion = "2.100.0" + // Removing a worker group spec should not affect the hash. + cluster.Spec.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} hash3, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) - assert.NotEqual(t, hash1, hash3) - - // MinReplicas will be muted, so `hash4` should equal to `hash1`. - *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ - hash4, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - assert.Equal(t, hash1, hash4) + assert.Equal(t, hash1, hash3) // Adding a new worker group spec should not affect the hash. cluster.Spec.WorkerGroupSpecs = append(cluster.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ @@ -113,9 +108,15 @@ func TestGenerateHashWithoutWorkerGroupSpec(t *testing.T) { MinReplicas: pointer.Int32(1), MaxReplicas: pointer.Int32(4), }) + hash4, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) + assert.Nil(t, err) + assert.Equal(t, hash1, hash4) + + // RayVersion will not be muted, so `hash4` should not be equal to `hash1`. + cluster.Spec.RayVersion = "2.100.0" hash5, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) assert.Nil(t, err) - assert.Equal(t, hash1, hash5) + assert.NotEqual(t, hash1, hash5) } func TestGetClusterAction(t *testing.T) { From 1f14e458d21f786c7b50340d1aa6e5a90428bebe Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 11 Dec 2023 16:39:03 -0800 Subject: [PATCH 04/23] Update spec before calling k8s client update Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index aa44a513588..42ccd1b7497 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -382,6 +382,7 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi return activeRayCluster, nil, nil } else if clusterAction == Update { // Update the active cluster. + activeRayCluster.Spec = rayServiceInstance.Spec.RayClusterSpec if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil { return nil, nil, err } @@ -564,6 +565,7 @@ func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Cont case RolloutNew: pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) case Update: + pendingRayCluster.Spec = rayServiceInstance.Spec.RayClusterSpec err = r.updateRayClusterInstance(ctx, pendingRayCluster) } From 79fb7c7c57c657a93f60289ddf68217fde4491cf Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Mon, 11 Dec 2023 16:48:39 -0800 Subject: [PATCH 05/23] Fix TestGetClusterAction Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_unit_test.go | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index e76d7e442fa..7f64ed2af2d 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -143,12 +143,37 @@ func TestGetClusterAction(t *testing.T) { assert.Nil(t, err) assert.Equal(t, DoNothing, action) - // Test Case 3: Only WorkerGroupSpecs different should lead to Update. + // Test Case 3: Different WorkerGroupSpecs should lead to Update. clusterSpec3 := clusterSpec1.DeepCopy() - clusterSpec3.WorkerGroupSpecs[0].Replicas = pointer.Int32(5) + clusterSpec3.WorkerGroupSpecs[0].MinReplicas = pointer.Int32(5) action, err = getClusterAction(clusterSpec1, *clusterSpec3) assert.Nil(t, err) assert.Equal(t, Update, action) + + // Test Case 4: Addin a new WorkerGroupSpec should lead to Update. + clusterSpec4 := clusterSpec1.DeepCopy() + clusterSpec4.WorkerGroupSpecs = append(clusterSpec4.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }) + action, err = getClusterAction(clusterSpec1, *clusterSpec4) + assert.Nil(t, err) + assert.Equal(t, Update, action) + + // Test Case 5: Removing a WorkerGroupSpec should lead to Update. + clusterSpec5 := clusterSpec1.DeepCopy() + clusterSpec5.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} + action, err = getClusterAction(clusterSpec1, *clusterSpec5) + assert.Nil(t, err) + assert.Equal(t, Update, action) + + // Test Case 6: Only changing the number of replicas should lead to DoNothing. + clusterSpec6 := clusterSpec1.DeepCopy() + clusterSpec6.WorkerGroupSpecs[0].Replicas = pointer.Int32(3) + action, err = getClusterAction(clusterSpec1, *clusterSpec6) + assert.Nil(t, err) + assert.Equal(t, DoNothing, action) } func TestInconsistentRayServiceStatuses(t *testing.T) { From c189a684ec07d1c27ab941fdcfd9d4bdb111fe18 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 12 Dec 2023 08:29:55 -0800 Subject: [PATCH 06/23] Pull RayCluster before updating Signed-off-by: Archit Kulkarni --- .../controllers/ray/rayservice_controller.go | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 42ccd1b7497..df8b234bb1c 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/yaml" @@ -382,7 +383,10 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi return activeRayCluster, nil, nil } else if clusterAction == Update { // Update the active cluster. - activeRayCluster.Spec = rayServiceInstance.Spec.RayClusterSpec + r.Log.Info("Updating the active RayCluster instance.") + if activeRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name); err != nil { + return nil, nil, err + } if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil { return nil, nil, err } @@ -563,9 +567,13 @@ func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Cont switch clusterAction { case RolloutNew: + r.Log.Info("Creating a new pending RayCluster instance.") pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) case Update: - pendingRayCluster.Spec = rayServiceInstance.Spec.RayClusterSpec + r.Log.Info("Updating the pending RayCluster instance.") + if pendingRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name); err != nil { + return nil, err + } err = r.updateRayClusterInstance(ctx, pendingRayCluster) } @@ -579,14 +587,37 @@ func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Cont // updateRayClusterInstance updates the RayCluster instance. func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error { r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance", rayClusterInstance) - if err := r.Update(ctx, rayClusterInstance); err != nil { - r.Log.Error(err, "Fail to update RayCluster "+rayClusterInstance.Name) + + // Fetch the current state of the RayCluster + currentRayCluster := &rayv1.RayCluster{} + err := r.Get(ctx, types.NamespacedName{ + Name: rayClusterInstance.Name, + Namespace: rayClusterInstance.Namespace, + }, currentRayCluster) + + if err != nil { + r.Log.Error(err, "Failed to get the current state of RayCluster") return err } - r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", rayClusterInstance) + + // Update the fetched RayCluster with new changes + currentRayCluster.Spec = rayClusterInstance.Spec + + // Update the labels and annotations + currentRayCluster.Labels = rayClusterInstance.Labels + currentRayCluster.Annotations = rayClusterInstance.Annotations + + // Update the RayCluster + if err = r.Update(ctx, currentRayCluster); err != nil { + r.Log.Error(err, "Fail to update RayCluster "+currentRayCluster.Name) + return err + } + + r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", currentRayCluster) return nil } + // createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance. // One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance. func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) { From 1ec5b4d43aacc696e9e2c81828fb2a9706364314 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 12 Dec 2023 08:32:14 -0800 Subject: [PATCH 07/23] Update ray-operator/controllers/ray/rayservice_controller.go Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index df8b234bb1c..36329a13909 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -374,7 +374,7 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi enableZeroDowntime = false } if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil { - // Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will be false and we will + // Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will return DoNothing and we will // actually create the pending RayCluster instance. r.markRestartAndAddPendingClusterName(rayServiceInstance) } else { From 33622cd9c16cb1dc46077ec146bd7f1a9d1afa22 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 12 Dec 2023 08:45:40 -0800 Subject: [PATCH 08/23] Lint Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index df8b234bb1c..4f4f928c349 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -594,7 +594,6 @@ func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, ray Name: rayClusterInstance.Name, Namespace: rayClusterInstance.Namespace, }, currentRayCluster) - if err != nil { r.Log.Error(err, "Failed to get the current state of RayCluster") return err @@ -617,7 +616,6 @@ func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, ray return nil } - // createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance. // One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance. func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) { From 59bb53b0831e3f332c89be95f420763541bb7325 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 12 Dec 2023 11:37:04 -0800 Subject: [PATCH 09/23] Add integration test for active RayCluster in `rayservice_controller_test.go` Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_test.go | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 373ec24d2b4..1bc8b81628e 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -483,6 +483,39 @@ applications: getRayClusterNameFunc(ctx, myRayService), time.Second*15, time.Millisecond*500).Should(Equal(initialPendingClusterName), "New active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) }) + It("should update the active RayCluster in place when WorkerGroupSpecs are modified by the user in RayServiceSpec", func() { + initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)() + + // Add a new worker group to the RayServiceSpec + newWorkerGroupSpec := myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy() + newWorkerGroupSpec.GroupName = "worker-group-2" + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name) + myRayService.Spec.RayClusterSpec.WorkerGroupSpecs = append(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs, *newWorkerGroupSpec) + return k8sClient.Update(ctx, myRayService) + }) + Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource") + + // Confirm it didn't switch to a new RayCluster + Consistently( + getRayClusterNameFunc(ctx, myRayService), + time.Second*5, time.Millisecond*500).Should(Equal(initialClusterName), "My current RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Status.ActiveServiceStatus.RayClusterName, Namespace: "default"}, myRayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name) + + // Verify that the active RayCluster eventually reflects the changes in WorkerGroupSpecs + Eventually( + getActiveRayClusterWorkerGroupSpecsFunc(ctx, myRayService), + time.Second*15, + time.Millisecond*500, + ).Should( + HaveLen(2), + ) + }) It("Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", func() { // Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY. @@ -695,6 +728,19 @@ func getRayClusterNameFunc(ctx context.Context, rayService *rayv1.RayService) fu } } +func getActiveRayClusterWorkerGroupSpecsFunc(ctx context.Context, rayService *rayv1.RayService) func() ([]rayv1.WorkerGroupSpec, error) { + return func() ([]rayv1.WorkerGroupSpec, error) { + if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: "default"}, rayService); err != nil { + return nil, err + } + rayCluster := &rayv1.RayCluster{} + if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Status.ActiveServiceStatus.RayClusterName, Namespace: "default"}, rayCluster); err != nil { + return nil, err + } + return rayCluster.Spec.WorkerGroupSpecs, nil + } +} + func getPreparingRayClusterNameFunc(ctx context.Context, rayService *rayv1.RayService) func() (string, error) { return func() (string, error) { if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: "default"}, rayService); err != nil { From ff7c56b780efb128f067b379e8fc822803bcbe4a Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 12 Dec 2023 12:13:32 -0800 Subject: [PATCH 10/23] Add test for updating pending cluster in `rayservice_controller_test.go` Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_test.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 1bc8b81628e..59219a0401e 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -516,6 +516,66 @@ applications: HaveLen(2), ) }) + It("should update the pending RayCluster in place when WorkerGroupSpecs are modified by the user in RayServiceSpec", func() { + // Trigger a new RayCluster preparation by updating the RayVersion. + oldRayVersion := myRayService.Spec.RayClusterSpec.RayVersion + newRayVersion := "2.300.0" + Expect(oldRayVersion).ShouldNot(Equal(newRayVersion)) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name) + myRayService.Spec.RayClusterSpec.RayVersion = newRayVersion + return k8sClient.Update(ctx, myRayService) + }) + Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource with new RayVersion") + Eventually( + getPreparingRayClusterNameFunc(ctx, myRayService), + time.Second*60, time.Millisecond*500).Should(Not(BeEmpty()), "New pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName) + initialPendingClusterName, _ := getPreparingRayClusterNameFunc(ctx, myRayService)() + + // Add a new worker group to the RayServiceSpec + newWorkerGroupSpec := myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy() + newWorkerGroupSpec.GroupName = "worker-group-3" + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService), + time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name) + myRayService.Spec.RayClusterSpec.WorkerGroupSpecs = append(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs, *newWorkerGroupSpec) + return k8sClient.Update(ctx, myRayService) + }) + Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource with new WorkerGroupSpecs") + + // Sanity check: length of myRayService.Spec.RayClusterSpec.WorkerGroupSpecs should be 3 + Expect(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs).Should(HaveLen(3)) + + // Confirm it didn't switch to a new RayCluster + Consistently( + getPreparingRayClusterNameFunc(ctx, myRayService), + time.Second*5, time.Millisecond*500).Should(Equal(initialPendingClusterName), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName) + + // Verify that the pending RayCluster eventually reflects the changes in WorkerGroupSpecs + Eventually( + getPendingRayClusterWorkerGroupSpecsFunc(ctx, myRayService), + time.Second*15, + time.Millisecond*500, + ).Should( + HaveLen(3), + ) + + // The pending RayCluster will become the active RayCluster after: + // (1) The pending RayCluster's head Pod becomes Running and Ready + // (2) The pending RayCluster's Serve Deployments are HEALTHY. + updateHeadPodToRunningAndReady(ctx, initialPendingClusterName) + fakeRayDashboardClient.SetSingleApplicationStatus(generateServeStatus(rayv1.DeploymentStatusEnum.HEALTHY, rayv1.ApplicationStatusEnum.RUNNING)) + Eventually( + getPreparingRayClusterNameFunc(ctx, myRayService), + time.Second*15, time.Millisecond*500).Should(BeEmpty(), "Pending RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName) + Eventually( + getRayClusterNameFunc(ctx, myRayService), + time.Second*15, time.Millisecond*500).Should(Equal(initialPendingClusterName), "New active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName) + }) It("Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", func() { // Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY. @@ -750,6 +810,19 @@ func getPreparingRayClusterNameFunc(ctx context.Context, rayService *rayv1.RaySe } } +func getPendingRayClusterWorkerGroupSpecsFunc(ctx context.Context, rayService *rayv1.RayService) func() ([]rayv1.WorkerGroupSpec, error) { + return func() ([]rayv1.WorkerGroupSpec, error) { + if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: "default"}, rayService); err != nil { + return nil, err + } + rayCluster := &rayv1.RayCluster{} + if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Status.PendingServiceStatus.RayClusterName, Namespace: "default"}, rayCluster); err != nil { + return nil, err + } + return rayCluster.Spec.WorkerGroupSpecs, nil + } +} + func checkServiceHealth(ctx context.Context, rayService *rayv1.RayService) func() (bool, error) { return func() (bool, error) { if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService); err != nil { From 0520aa42fd5b974f8e630c8584a7dbcbc1c7071d Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Thu, 14 Dec 2023 10:48:30 -0800 Subject: [PATCH 11/23] Don't print the whole RayCluster Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index e672f1e5210..92284eaee34 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -586,7 +586,9 @@ func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Cont // updateRayClusterInstance updates the RayCluster instance. func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error { - r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance", rayClusterInstance) + r.Log.V(1).Info("updateRayClusterInstance", "Name", rayClusterInstance.Name, "Namespace", rayClusterInstance.Namespace) + // Printing the whole RayCluster is too noisy. Only print the spec. + r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance.Spec", rayClusterInstance.Spec) // Fetch the current state of the RayCluster currentRayCluster := &rayv1.RayCluster{} From e77a72d22e0b3bb47a93ed7bbd9b9db0ce222135 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Thu, 14 Dec 2023 10:52:39 -0800 Subject: [PATCH 12/23] Use `getRayClusterByNamespacedName` Signed-off-by: Archit Kulkarni --- .../controllers/ray/rayservice_controller.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 92284eaee34..00aaf9359bd 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/yaml" @@ -591,16 +590,20 @@ func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, ray r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance.Spec", rayClusterInstance.Spec) // Fetch the current state of the RayCluster - currentRayCluster := &rayv1.RayCluster{} - err := r.Get(ctx, types.NamespacedName{ - Name: rayClusterInstance.Name, + currentRayCluster, err := r.getRayClusterByNamespacedName(ctx, client.ObjectKey{ Namespace: rayClusterInstance.Namespace, - }, currentRayCluster) + Name: rayClusterInstance.Name, + }) if err != nil { - r.Log.Error(err, "Failed to get the current state of RayCluster") + r.Log.Error(err, "Failed to get the current state of RayCluster", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name) return err } + if currentRayCluster == nil { + r.Log.Info("RayCluster not found, possibly deleted", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name) + return nil + } + // Update the fetched RayCluster with new changes currentRayCluster.Spec = rayClusterInstance.Spec From b91e45174aac548bc17e515ad0feee1f4a5afd24 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Thu, 14 Dec 2023 12:33:31 -0800 Subject: [PATCH 13/23] return DoNothing if failed to serialize Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 00aaf9359bd..f1c25c21e51 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -510,7 +510,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra "Please manually tear down the cluster and apply a new config." if err != nil { r.Log.Error(err, errContextFailedToSerialize) - return RolloutNew + return DoNothing } if activeClusterHash == goalClusterHash { @@ -525,7 +525,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra goalClusterHash, err = generateHashWithoutWorkerGroupSpec(rayServiceInstance.Spec.RayClusterSpec) if err != nil { r.Log.Error(err, errContextFailedToSerialize) - return RolloutNew + return DoNothing } if activeClusterHash == goalClusterHash { From 6576d73f09a05d2afe4ac39c8141c97e78a3adbb Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 19 Dec 2023 07:34:15 -0800 Subject: [PATCH 14/23] Only update if workergroups are the same and new ones appended Signed-off-by: Archit Kulkarni --- .../controllers/ray/rayservice_controller.go | 67 +++++++++++-------- .../ray/rayservice_controller_unit_test.go | 64 +----------------- .../controllers/ray/utils/constant.go | 2 +- 3 files changed, 42 insertions(+), 91 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index f1c25c21e51..87335f7f607 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "reflect" + "strconv" "strings" "time" @@ -499,7 +500,6 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra r.Log.Info("No active Ray cluster. RayService operator should prepare a new Ray cluster.") return RolloutNew } - // TODO (Archit): Maybe factor out the above stuff // Case 1: If everything is identical except for the Replicas and WorkersToDelete of // each WorkerGroup, then do nothing. @@ -518,19 +518,34 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra return DoNothing } - // Case 2: Otherwise, if everything is identical except some change in WorkerGroupSpecs, - // then Update. + // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of + // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. - activeClusterHash = activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutWorkerGroupSpecKey] - goalClusterHash, err = generateHashWithoutWorkerGroupSpec(rayServiceInstance.Spec.RayClusterSpec) + activeClusterHash = activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] + activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey]) if err != nil { r.Log.Error(err, errContextFailedToSerialize) return DoNothing } + goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs) - if activeClusterHash == goalClusterHash { - r.Log.Info("Active Ray cluster config matches goal config, except for WorkerGroupSpecs. Updating RayCluster.") - return Update + if goalNumWorkerGroups > activeClusterNumWorkerGroups { + + // Remove the new workergroup(s) from the end before calculating the hash. + goalClusterSpec := rayServiceInstance.Spec.RayClusterSpec.DeepCopy() + goalClusterSpec.WorkerGroupSpecs = goalClusterSpec.WorkerGroupSpecs[:activeClusterNumWorkerGroups] + + // Generate the hash of the old worker group specs. + goalClusterHash, err = generateHashWithoutReplicasAndWorkersToDelete(*goalClusterSpec) + if err != nil { + r.Log.Error(err, errContextFailedToSerialize) + return DoNothing + } + + if activeClusterHash == goalClusterHash { + r.Log.Info("Active Ray cluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.") + return Update + } } // Case 3: Otherwise, rollout a new cluster. @@ -691,11 +706,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv r.Log.Error(err, errContext) return nil, err } - rayClusterAnnotations[utils.HashWithoutWorkerGroupSpecKey], err = generateHashWithoutWorkerGroupSpec(rayService.Spec.RayClusterSpec) - if err != nil { - r.Log.Error(err, errContext) - return nil, err - } + rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)) rayCluster := &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -1247,28 +1258,26 @@ func getClusterAction(old_spec rayv1.RayClusterSpec, new_spec rayv1.RayClusterSp return DoNothing, nil } - // Case 2: Otherwise, if everything is identical except some change in WorkerGroupSpecs, - // then Update. - same_hash, err = compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutWorkerGroupSpec) - if err != nil { - return DoNothing, err - } - if same_hash { - return Update, nil + // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of + // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. + new_spec_without_new_worker_groups := new_spec.DeepCopy() + if len(new_spec.WorkerGroupSpecs) > len(old_spec.WorkerGroupSpecs) { + // Remove the new worker groups from the new spec. + new_spec_without_new_worker_groups.WorkerGroupSpecs = new_spec_without_new_worker_groups.WorkerGroupSpecs[:len(old_spec.WorkerGroupSpecs)] + + same_hash, err = compareRayClusterJsonHash(old_spec, *new_spec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete) + if err != nil { + return DoNothing, err + } + if same_hash { + return Update, nil + } } // Case 3: Otherwise, rollout a new cluster. return RolloutNew, nil } -func generateHashWithoutWorkerGroupSpec(rayClusterSpec rayv1.RayClusterSpec) (string, error) { - updatedRayClusterSpec := rayClusterSpec.DeepCopy() - updatedRayClusterSpec.WorkerGroupSpecs = nil - - // Generate a hash for the RayClusterSpec. - return utils.GenerateJsonHash(updatedRayClusterSpec) -} - func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClusterSpec) (string, error) { // Mute certain fields that will not trigger new RayCluster preparation. For example, // Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 7f64ed2af2d..50d963ab724 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -64,61 +64,6 @@ func TestGenerateHashWithoutReplicasAndWorkersToDelete(t *testing.T) { assert.NotEqual(t, hash1, hash4) } -func TestGenerateHashWithoutWorkerGroupSpec(t *testing.T) { - // `generateRayClusterJsonHash` will mute fields that will not trigger new RayCluster preparation. For example, - // Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. Hence, `hash1` should be equal to - // `hash2` in this case. - cluster := rayv1.RayCluster{ - Spec: rayv1.RayClusterSpec{ - RayVersion: "2.8.0", - WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ - { - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{}, - }, - Replicas: pointer.Int32(2), - MinReplicas: pointer.Int32(1), - MaxReplicas: pointer.Int32(4), - }, - }, - }, - } - - hash1, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - - // MinReplicas will be muted, so `hash4` should equal to `hash1`. - *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ - hash2, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - assert.Equal(t, hash1, hash2) - - // Removing a worker group spec should not affect the hash. - cluster.Spec.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} - hash3, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - assert.Equal(t, hash1, hash3) - - // Adding a new worker group spec should not affect the hash. - cluster.Spec.WorkerGroupSpecs = append(cluster.Spec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{}, - }, - Replicas: pointer.Int32(2), - MinReplicas: pointer.Int32(1), - MaxReplicas: pointer.Int32(4), - }) - hash4, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - assert.Equal(t, hash1, hash4) - - // RayVersion will not be muted, so `hash4` should not be equal to `hash1`. - cluster.Spec.RayVersion = "2.100.0" - hash5, err := generateHashWithoutWorkerGroupSpec(cluster.Spec) - assert.Nil(t, err) - assert.NotEqual(t, hash1, hash5) -} - func TestGetClusterAction(t *testing.T) { clusterSpec1 := rayv1.RayClusterSpec{ RayVersion: "2.8.0", @@ -150,7 +95,7 @@ func TestGetClusterAction(t *testing.T) { assert.Nil(t, err) assert.Equal(t, Update, action) - // Test Case 4: Addin a new WorkerGroupSpec should lead to Update. + // Test Case 4: Adding a new WorkerGroupSpec should lead to Update. clusterSpec4 := clusterSpec1.DeepCopy() clusterSpec4.WorkerGroupSpecs = append(clusterSpec4.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ Replicas: pointer.Int32(2), @@ -770,17 +715,14 @@ func TestReconcileRayCluster(t *testing.T) { Status: rayv1.RayServiceStatuses{}, } - hash1, err := generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) - assert.Nil(t, err) - hash2, err := generateHashWithoutWorkerGroupSpec(rayService.Spec.RayClusterSpec) + hash, err := generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) assert.Nil(t, err) activeCluster := rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "active-cluster", Namespace: namespace, Annotations: map[string]string{ - utils.HashWithoutReplicasAndWorkersToDeleteKey: hash1, - utils.HashWithoutWorkerGroupSpecKey: hash2, + utils.HashWithoutReplicasAndWorkersToDeleteKey: hash, }, }, } diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 1166b132d59..95858bdd0d8 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -13,7 +13,7 @@ const ( RayIDLabelKey = "ray.io/identifier" RayClusterServingServiceLabelKey = "ray.io/serve" HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" - HashWithoutWorkerGroupSpecKey = "ray.io/hash-without-worker-group-spec" + NumWorkerGroupsKey = "ray.io/num-worker-groups" // In KubeRay, the Ray container must be the first application container in a head or worker Pod. RayContainerIndex = 0 From d76c9044e705c8620a6393f0074146452423ddd9 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 19 Dec 2023 08:04:42 -0800 Subject: [PATCH 15/23] Fix unit tests Signed-off-by: Archit Kulkarni --- .../ray/rayservice_controller_unit_test.go | 50 +++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 50d963ab724..0a9927adf3a 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "reflect" + "strconv" "testing" "time" @@ -88,12 +89,12 @@ func TestGetClusterAction(t *testing.T) { assert.Nil(t, err) assert.Equal(t, DoNothing, action) - // Test Case 3: Different WorkerGroupSpecs should lead to Update. + // Test Case 3: Different WorkerGroupSpecs should lead to RolloutNew. clusterSpec3 := clusterSpec1.DeepCopy() clusterSpec3.WorkerGroupSpecs[0].MinReplicas = pointer.Int32(5) action, err = getClusterAction(clusterSpec1, *clusterSpec3) assert.Nil(t, err) - assert.Equal(t, Update, action) + assert.Equal(t, RolloutNew, action) // Test Case 4: Adding a new WorkerGroupSpec should lead to Update. clusterSpec4 := clusterSpec1.DeepCopy() @@ -106,12 +107,12 @@ func TestGetClusterAction(t *testing.T) { assert.Nil(t, err) assert.Equal(t, Update, action) - // Test Case 5: Removing a WorkerGroupSpec should lead to Update. + // Test Case 5: Removing a WorkerGroupSpec should lead to RolloutNew. clusterSpec5 := clusterSpec1.DeepCopy() clusterSpec5.WorkerGroupSpecs = []rayv1.WorkerGroupSpec{} action, err = getClusterAction(clusterSpec1, *clusterSpec5) assert.Nil(t, err) - assert.Equal(t, Update, action) + assert.Equal(t, RolloutNew, action) // Test Case 6: Only changing the number of replicas should lead to DoNothing. clusterSpec6 := clusterSpec1.DeepCopy() @@ -119,6 +120,46 @@ func TestGetClusterAction(t *testing.T) { action, err = getClusterAction(clusterSpec1, *clusterSpec6) assert.Nil(t, err) assert.Equal(t, DoNothing, action) + + // Test Case 7: Only changing the number of replicas in an existing WorkerGroupSpec *and* adding a new WorkerGroupSpec should lead to Update. + clusterSpec7 := clusterSpec1.DeepCopy() + clusterSpec7.WorkerGroupSpecs[0].Replicas = pointer.Int32(3) + clusterSpec7.WorkerGroupSpecs = append(clusterSpec7.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }) + action, err = getClusterAction(clusterSpec1, *clusterSpec7) + assert.Nil(t, err) + assert.Equal(t, Update, action) + + // Test Case 8: Adding two new WorkerGroupSpecs should lead to Update. + clusterSpec8 := clusterSpec1.DeepCopy() + clusterSpec8.WorkerGroupSpecs = append(clusterSpec8.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }) + clusterSpec8.WorkerGroupSpecs = append(clusterSpec8.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Replicas: pointer.Int32(3), + MinReplicas: pointer.Int32(2), + MaxReplicas: pointer.Int32(5), + }) + action, err = getClusterAction(clusterSpec1, *clusterSpec8) + assert.Nil(t, err) + assert.Equal(t, Update, action) + + // Test Case 9: Changing an existing WorkerGroupSpec outside of Replicas/WorkersToDelete *and* adding a new WorkerGroupSpec should lead to RolloutNew. + clusterSpec9 := clusterSpec1.DeepCopy() + clusterSpec9.WorkerGroupSpecs[0].MaxReplicas = pointer.Int32(5) + clusterSpec9.WorkerGroupSpecs = append(clusterSpec9.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ + Replicas: pointer.Int32(2), + MinReplicas: pointer.Int32(1), + MaxReplicas: pointer.Int32(4), + }) + action, err = getClusterAction(clusterSpec1, *clusterSpec9) + assert.Nil(t, err) + assert.Equal(t, RolloutNew, action) } func TestInconsistentRayServiceStatuses(t *testing.T) { @@ -723,6 +764,7 @@ func TestReconcileRayCluster(t *testing.T) { Namespace: namespace, Annotations: map[string]string{ utils.HashWithoutReplicasAndWorkersToDeleteKey: hash, + utils.NumWorkerGroupsKey: strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)), }, }, } From ce83a1fb0e4cef6e160de3d9ecbccdd27ae7e1d4 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 20 Dec 2023 15:53:16 -0800 Subject: [PATCH 16/23] Remove redundant hash calculation Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 87335f7f607..d1b83ccfd4c 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -520,8 +520,6 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. - - activeClusterHash = activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey]) if err != nil { r.Log.Error(err, errContextFailedToSerialize) From 16aa1ecb2c6760a59343d6b8fd7cbd33e9e155d9 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 20 Dec 2023 15:55:01 -0800 Subject: [PATCH 17/23] Add log for number of worker groups Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index d1b83ccfd4c..21cca9adb47 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -526,7 +526,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra return DoNothing } goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs) - + r.Log.Info("number of worker groups", "activeClusterNumWorkerGroups", activeClusterNumWorkerGroups, "goalNumWorkerGroups", goalNumWorkerGroups) if goalNumWorkerGroups > activeClusterNumWorkerGroups { // Remove the new workergroup(s) from the end before calculating the hash. From 0517a10ff22538c2c36bee097705c44b52c11e9e Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 20 Dec 2023 15:55:32 -0800 Subject: [PATCH 18/23] Fix Ray Cluster -> RayCluster Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 21cca9adb47..1e3ba4c2a16 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -541,7 +541,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *ra } if activeClusterHash == goalClusterHash { - r.Log.Info("Active Ray cluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.") + r.Log.Info("Active RayCluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.") return Update } } From e62ac3478d1761c0f15072e135496e955f227382 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Wed, 20 Dec 2023 15:57:17 -0800 Subject: [PATCH 19/23] Change snake_case to CamelCase in hash function Signed-off-by: Archit Kulkarni --- .../controllers/ray/rayservice_controller.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 1e3ba4c2a16..c4b535fee38 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -1243,31 +1243,31 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu return nil } -func getClusterAction(old_spec rayv1.RayClusterSpec, new_spec rayv1.RayClusterSpec) (ClusterAction, error) { +func getClusterAction(oldSpec rayv1.RayClusterSpec, newSpec rayv1.RayClusterSpec) (ClusterAction, error) { // Return the appropriate action based on the difference in the old and new RayCluster specs. // Case 1: If everything is identical except for the Replicas and WorkersToDelete of // each WorkerGroup, then do nothing. - same_hash, err := compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutReplicasAndWorkersToDelete) + sameHash, err := compareRayClusterJsonHash(oldSpec, newSpec, generateHashWithoutReplicasAndWorkersToDelete) if err != nil { return DoNothing, err } - if same_hash { + if sameHash { return DoNothing, nil } // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. - new_spec_without_new_worker_groups := new_spec.DeepCopy() - if len(new_spec.WorkerGroupSpecs) > len(old_spec.WorkerGroupSpecs) { + newSpec_without_new_worker_groups := newSpec.DeepCopy() + if len(newSpec.WorkerGroupSpecs) > len(oldSpec.WorkerGroupSpecs) { // Remove the new worker groups from the new spec. - new_spec_without_new_worker_groups.WorkerGroupSpecs = new_spec_without_new_worker_groups.WorkerGroupSpecs[:len(old_spec.WorkerGroupSpecs)] + newSpec_without_new_worker_groups.WorkerGroupSpecs = newSpec_without_new_worker_groups.WorkerGroupSpecs[:len(oldSpec.WorkerGroupSpecs)] - same_hash, err = compareRayClusterJsonHash(old_spec, *new_spec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete) + sameHash, err = compareRayClusterJsonHash(oldSpec, *newSpec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete) if err != nil { return DoNothing, err } - if same_hash { + if sameHash { return Update, nil } } From 65774d72969763978b75674c9ec0e8bde937e5b4 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 26 Dec 2023 09:24:58 -0800 Subject: [PATCH 20/23] Delete unnecessary and broken test for updating minreplicas Signed-off-by: Archit Kulkarni --- .../controllers/ray/rayservice_controller_unit_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 0a9927adf3a..4b8ddb69de1 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -57,12 +57,6 @@ func TestGenerateHashWithoutReplicasAndWorkersToDelete(t *testing.T) { hash3, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) assert.Nil(t, err) assert.NotEqual(t, hash1, hash3) - - // MinReplicas will not be muted, so `hash4` should not be equal to `hash1`. - *cluster.Spec.WorkerGroupSpecs[0].MinReplicas++ - hash4, err := generateHashWithoutReplicasAndWorkersToDelete(cluster.Spec) - assert.Nil(t, err) - assert.NotEqual(t, hash1, hash4) } func TestGetClusterAction(t *testing.T) { From cb394c44d8447fff9a045626da548844a2a19f1e Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 26 Dec 2023 09:27:39 -0800 Subject: [PATCH 21/23] Use oldNumWorkerGroupSpecs + 1 instead of hardcoding 2 in unit test Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 59219a0401e..d1040190ef7 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -485,7 +485,7 @@ applications: }) It("should update the active RayCluster in place when WorkerGroupSpecs are modified by the user in RayServiceSpec", func() { initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)() - + oldNumWorkerGroupSpecs := len(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs) // Add a new worker group to the RayServiceSpec newWorkerGroupSpec := myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy() newWorkerGroupSpec.GroupName = "worker-group-2" @@ -513,7 +513,7 @@ applications: time.Second*15, time.Millisecond*500, ).Should( - HaveLen(2), + HaveLen(oldNumWorkerGroupSpecs + 1), ) }) It("should update the pending RayCluster in place when WorkerGroupSpecs are modified by the user in RayServiceSpec", func() { From 81365805acea627addfbd81d1408668b5ef30cae Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 26 Dec 2023 09:29:46 -0800 Subject: [PATCH 22/23] Use oldNumWorkerGroupSpecs + 1 in remaining unit test Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index d1040190ef7..43c39a28158 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -535,6 +535,7 @@ applications: initialPendingClusterName, _ := getPreparingRayClusterNameFunc(ctx, myRayService)() // Add a new worker group to the RayServiceSpec + oldNumWorkerGroupSpecs := len(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs) newWorkerGroupSpec := myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].DeepCopy() newWorkerGroupSpec.GroupName = "worker-group-3" @@ -548,7 +549,7 @@ applications: Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource with new WorkerGroupSpecs") // Sanity check: length of myRayService.Spec.RayClusterSpec.WorkerGroupSpecs should be 3 - Expect(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs).Should(HaveLen(3)) + Expect(myRayService.Spec.RayClusterSpec.WorkerGroupSpecs).Should(HaveLen(oldNumWorkerGroupSpecs + 1)) // Confirm it didn't switch to a new RayCluster Consistently( @@ -561,7 +562,7 @@ applications: time.Second*15, time.Millisecond*500, ).Should( - HaveLen(3), + HaveLen(oldNumWorkerGroupSpecs + 1), ) // The pending RayCluster will become the active RayCluster after: From 8b0056b29773bed8d28533d549ae966ef017f1db Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Tue, 26 Dec 2023 11:23:47 -0800 Subject: [PATCH 23/23] Fix snake case Signed-off-by: Archit Kulkarni --- ray-operator/controllers/ray/rayservice_controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index c4b535fee38..23bb06412a7 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -1258,12 +1258,12 @@ func getClusterAction(oldSpec rayv1.RayClusterSpec, newSpec rayv1.RayClusterSpec // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. - newSpec_without_new_worker_groups := newSpec.DeepCopy() + newSpecWithoutWorkerGroups := newSpec.DeepCopy() if len(newSpec.WorkerGroupSpecs) > len(oldSpec.WorkerGroupSpecs) { // Remove the new worker groups from the new spec. - newSpec_without_new_worker_groups.WorkerGroupSpecs = newSpec_without_new_worker_groups.WorkerGroupSpecs[:len(oldSpec.WorkerGroupSpecs)] + newSpecWithoutWorkerGroups.WorkerGroupSpecs = newSpecWithoutWorkerGroups.WorkerGroupSpecs[:len(oldSpec.WorkerGroupSpecs)] - sameHash, err = compareRayClusterJsonHash(oldSpec, *newSpec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete) + sameHash, err = compareRayClusterJsonHash(oldSpec, *newSpecWithoutWorkerGroups, generateHashWithoutReplicasAndWorkersToDelete) if err != nil { return DoNothing, err }