Skip to content

Commit a6cf6e0

Browse files
[RayService] Allow updating WorkerGroupSpecs without rolling out new cluster (#1734)
Previously, when the RayCluster spec of a RayService was updated, one of two things would happen: A new cluster would be rolled out via "zero-downtime-upgrade", or In the case where only the Replicas and WorkersToDelete fields changed, nothing would happen. This behavior was added by [Bug] RayService restarts repeatedly with Autoscaler #1037 to prevent the Autoscaler from inadvertently triggering rollouts when modifying these fields.) This PR adds a third case: If WorkerGroupSpecs is modified in the following specific way and it doesn't fall into the case above, then the RayService controller will update the RayCluster instance in place without rolling out a new one. Here is the specific way that triggers the third case: The existing worker groups are not modified except for Replicas and WorkersToDelete, and one or more entries to WorkerGroupSpecs is added. In general, the updating happens in two places: For the active RayCluster For the pending RayCluster Either of these clusters two may see an update to the spec, so we must handle both of these places. In a followup, we may add the following optimization: If an existing worker group is modified and one or more entries to WorkerGroupSpecs is added, we should reject the spec. This will require using an admission webhook or storing the previous spec somehow. (If we just store the hash as we currently do, we cannot reconstruct the previous spec because all we have is the hash.) Other followup issues for this PR: [RayService] Refactor to unify cluster decision for active and pending RayClusters #1761 [RayService] [CI] Some tests for pending/active clusters may spuriously pass because head pod is not manually set to ready #1768 [RayService] [Enhancement] Avoid unnecessary pod deletion when updating RayCluster #1769 --------- Signed-off-by: Archit Kulkarni <[email protected]> Signed-off-by: Archit Kulkarni <[email protected]>
1 parent d49a7af commit a6cf6e0

File tree

4 files changed

+406
-64
lines changed

4 files changed

+406
-64
lines changed

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 179 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"reflect"
8+
"strconv"
89
"strings"
910
"time"
1011

@@ -364,19 +365,32 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
364365
return nil, nil, err
365366
}
366367

367-
if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) {
368+
clusterAction := r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster)
369+
if clusterAction == RolloutNew {
368370
// For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously.
369371
// Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades.
370372
enableZeroDowntime := true
371373
if s := os.Getenv(ENABLE_ZERO_DOWNTIME); strings.ToLower(s) == "false" {
372374
enableZeroDowntime = false
373375
}
374376
if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil {
375-
r.markRestart(rayServiceInstance)
377+
// Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will return DoNothing and we will
378+
// actually create the pending RayCluster instance.
379+
r.markRestartAndAddPendingClusterName(rayServiceInstance)
376380
} else {
377381
r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.")
378382
}
379383
return activeRayCluster, nil, nil
384+
} else if clusterAction == Update {
385+
// Update the active cluster.
386+
r.Log.Info("Updating the active RayCluster instance.")
387+
if activeRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name); err != nil {
388+
return nil, nil, err
389+
}
390+
if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil {
391+
return nil, nil, err
392+
}
393+
return activeRayCluster, nil, nil
380394
}
381395

382396
if pendingRayCluster, err = r.createRayClusterInstanceIfNeeded(ctx, rayServiceInstance, pendingRayCluster); err != nil {
@@ -468,67 +482,158 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(rayServiceInstance *rayv1
468482
}
469483
}
470484

485+
type ClusterAction int
486+
487+
const (
488+
DoNothing ClusterAction = iota // value 0
489+
Update // value 1
490+
RolloutNew // value 2
491+
)
492+
471493
// shouldPrepareNewRayCluster checks if we need to generate a new pending cluster.
472-
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) bool {
494+
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) ClusterAction {
473495
// Prepare new RayCluster if:
474496
// 1. No active cluster and no pending cluster
475497
// 2. No pending cluster, and the active RayCluster has changed.
476498
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" {
477499
if activeRayCluster == nil {
478500
r.Log.Info("No active Ray cluster. RayService operator should prepare a new Ray cluster.")
479-
return true
501+
return RolloutNew
480502
}
481-
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.RayServiceClusterHashKey]
482-
goalClusterHash, err := generateRayClusterJsonHash(rayServiceInstance.Spec.RayClusterSpec)
503+
504+
// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
505+
// each WorkerGroup, then do nothing.
506+
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
507+
goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
508+
errContextFailedToSerialize := "Failed to serialize new RayCluster config. " +
509+
"Manual config updates will NOT be tracked accurately. " +
510+
"Please manually tear down the cluster and apply a new config."
483511
if err != nil {
484-
errContext := "Failed to serialize new RayCluster config. " +
485-
"Manual config updates will NOT be tracked accurately. " +
486-
"Please manually tear down the cluster and apply a new config."
487-
r.Log.Error(err, errContext)
488-
return true
512+
r.Log.Error(err, errContextFailedToSerialize)
513+
return DoNothing
489514
}
490515

491-
if activeClusterHash != goalClusterHash {
492-
r.Log.Info("Active RayCluster config doesn't match goal config. " +
493-
"RayService operator should prepare a new Ray cluster.\n" +
494-
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
495-
"* Goal RayCluster config hash: " + goalClusterHash)
496-
} else {
497-
r.Log.Info("Active Ray cluster config matches goal config.")
516+
if activeClusterHash == goalClusterHash {
517+
r.Log.Info("Active Ray cluster config matches goal config. No need to update RayCluster.")
518+
return DoNothing
498519
}
499520

500-
return activeClusterHash != goalClusterHash
521+
// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
522+
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.
523+
activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey])
524+
if err != nil {
525+
r.Log.Error(err, errContextFailedToSerialize)
526+
return DoNothing
527+
}
528+
goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs)
529+
r.Log.Info("number of worker groups", "activeClusterNumWorkerGroups", activeClusterNumWorkerGroups, "goalNumWorkerGroups", goalNumWorkerGroups)
530+
if goalNumWorkerGroups > activeClusterNumWorkerGroups {
531+
532+
// Remove the new workergroup(s) from the end before calculating the hash.
533+
goalClusterSpec := rayServiceInstance.Spec.RayClusterSpec.DeepCopy()
534+
goalClusterSpec.WorkerGroupSpecs = goalClusterSpec.WorkerGroupSpecs[:activeClusterNumWorkerGroups]
535+
536+
// Generate the hash of the old worker group specs.
537+
goalClusterHash, err = generateHashWithoutReplicasAndWorkersToDelete(*goalClusterSpec)
538+
if err != nil {
539+
r.Log.Error(err, errContextFailedToSerialize)
540+
return DoNothing
541+
}
542+
543+
if activeClusterHash == goalClusterHash {
544+
r.Log.Info("Active RayCluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.")
545+
return Update
546+
}
547+
}
548+
549+
// Case 3: Otherwise, rollout a new cluster.
550+
r.Log.Info("Active RayCluster config doesn't match goal config. " +
551+
"RayService operator should prepare a new Ray cluster.\n" +
552+
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
553+
"* Goal RayCluster config hash: " + goalClusterHash)
554+
return RolloutNew
501555
}
502556

503-
return false
557+
return DoNothing
504558
}
505559

506560
// createRayClusterInstanceIfNeeded checks if we need to create a new RayCluster instance. If so, create one.
507561
func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Context, rayServiceInstance *rayv1.RayService, pendingRayCluster *rayv1.RayCluster) (*rayv1.RayCluster, error) {
562+
// Early return if no pending RayCluster needs to be created.
508563
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" {
509-
// No exist pending RayCluster and no need to create one.
510564
return nil, nil
511565
}
512566

513-
// Create a new RayCluster if:
514-
// 1. No RayCluster pending.
515-
// 2. Config update for the pending cluster.
516-
equal, err := compareRayClusterJsonHash(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec)
517-
if err != nil {
518-
r.Log.Error(err, "Fail to generate hash for RayClusterSpec")
519-
return nil, err
567+
var clusterAction ClusterAction
568+
var err error
569+
570+
if pendingRayCluster == nil {
571+
clusterAction = RolloutNew
572+
} else {
573+
clusterAction, err = getClusterAction(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec)
574+
if err != nil {
575+
r.Log.Error(err, "Fail to generate hash for RayClusterSpec")
576+
return nil, err
577+
}
520578
}
521579

522-
if pendingRayCluster == nil || !equal {
580+
switch clusterAction {
581+
case RolloutNew:
582+
r.Log.Info("Creating a new pending RayCluster instance.")
523583
pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName)
524-
if err != nil {
584+
case Update:
585+
r.Log.Info("Updating the pending RayCluster instance.")
586+
if pendingRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name); err != nil {
525587
return nil, err
526588
}
589+
err = r.updateRayClusterInstance(ctx, pendingRayCluster)
590+
}
591+
592+
if err != nil {
593+
return nil, err
527594
}
528595

529596
return pendingRayCluster, nil
530597
}
531598

599+
// updateRayClusterInstance updates the RayCluster instance.
600+
func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error {
601+
r.Log.V(1).Info("updateRayClusterInstance", "Name", rayClusterInstance.Name, "Namespace", rayClusterInstance.Namespace)
602+
// Printing the whole RayCluster is too noisy. Only print the spec.
603+
r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance.Spec", rayClusterInstance.Spec)
604+
605+
// Fetch the current state of the RayCluster
606+
currentRayCluster, err := r.getRayClusterByNamespacedName(ctx, client.ObjectKey{
607+
Namespace: rayClusterInstance.Namespace,
608+
Name: rayClusterInstance.Name,
609+
})
610+
if err != nil {
611+
r.Log.Error(err, "Failed to get the current state of RayCluster", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name)
612+
return err
613+
}
614+
615+
if currentRayCluster == nil {
616+
r.Log.Info("RayCluster not found, possibly deleted", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name)
617+
return nil
618+
}
619+
620+
// Update the fetched RayCluster with new changes
621+
currentRayCluster.Spec = rayClusterInstance.Spec
622+
623+
// Update the labels and annotations
624+
currentRayCluster.Labels = rayClusterInstance.Labels
625+
currentRayCluster.Annotations = rayClusterInstance.Annotations
626+
627+
// Update the RayCluster
628+
if err = r.Update(ctx, currentRayCluster); err != nil {
629+
r.Log.Error(err, "Fail to update RayCluster "+currentRayCluster.Name)
630+
return err
631+
}
632+
633+
r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", currentRayCluster)
634+
return nil
635+
}
636+
532637
// createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance.
533638
// One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance.
534639
func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) {
@@ -591,14 +696,15 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
591696
rayClusterAnnotations[k] = v
592697
}
593698
rayClusterAnnotations[utils.EnableServeServiceKey] = utils.EnableServeServiceTrue
594-
rayClusterAnnotations[utils.RayServiceClusterHashKey], err = generateRayClusterJsonHash(rayService.Spec.RayClusterSpec)
699+
errContext := "Failed to serialize RayCluster config. " +
700+
"Manual config updates will NOT be tracked accurately. " +
701+
"Please tear down the cluster and apply a new config."
702+
rayClusterAnnotations[utils.HashWithoutReplicasAndWorkersToDeleteKey], err = generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec)
595703
if err != nil {
596-
errContext := "Failed to serialize RayCluster config. " +
597-
"Manual config updates will NOT be tracked accurately. " +
598-
"Please tear down the cluster and apply a new config."
599704
r.Log.Error(err, errContext)
600705
return nil, err
601706
}
707+
rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs))
602708

603709
rayCluster := &rayv1.RayCluster{
604710
ObjectMeta: metav1.ObjectMeta{
@@ -862,7 +968,7 @@ func updateDashboardStatus(rayServiceClusterStatus *rayv1.RayServiceStatus, isHe
862968
}
863969
}
864970

865-
func (r *RayServiceReconciler) markRestart(rayServiceInstance *rayv1.RayService) {
971+
func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(rayServiceInstance *rayv1.RayService) {
866972
// Generate RayCluster name for pending cluster.
867973
r.Log.V(1).Info("Current cluster is unhealthy, prepare to restart.", "Status", rayServiceInstance.Status)
868974
rayServiceInstance.Status.ServiceStatus = rayv1.Restarting
@@ -1139,8 +1245,41 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu
11391245
return nil
11401246
}
11411247

1142-
func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, error) {
1143-
// Mute all fields that will not trigger new RayCluster preparation. For example,
1248+
func getClusterAction(oldSpec rayv1.RayClusterSpec, newSpec rayv1.RayClusterSpec) (ClusterAction, error) {
1249+
// Return the appropriate action based on the difference in the old and new RayCluster specs.
1250+
1251+
// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
1252+
// each WorkerGroup, then do nothing.
1253+
sameHash, err := compareRayClusterJsonHash(oldSpec, newSpec, generateHashWithoutReplicasAndWorkersToDelete)
1254+
if err != nil {
1255+
return DoNothing, err
1256+
}
1257+
if sameHash {
1258+
return DoNothing, nil
1259+
}
1260+
1261+
// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
1262+
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.
1263+
newSpecWithoutWorkerGroups := newSpec.DeepCopy()
1264+
if len(newSpec.WorkerGroupSpecs) > len(oldSpec.WorkerGroupSpecs) {
1265+
// Remove the new worker groups from the new spec.
1266+
newSpecWithoutWorkerGroups.WorkerGroupSpecs = newSpecWithoutWorkerGroups.WorkerGroupSpecs[:len(oldSpec.WorkerGroupSpecs)]
1267+
1268+
sameHash, err = compareRayClusterJsonHash(oldSpec, *newSpecWithoutWorkerGroups, generateHashWithoutReplicasAndWorkersToDelete)
1269+
if err != nil {
1270+
return DoNothing, err
1271+
}
1272+
if sameHash {
1273+
return Update, nil
1274+
}
1275+
}
1276+
1277+
// Case 3: Otherwise, rollout a new cluster.
1278+
return RolloutNew, nil
1279+
}
1280+
1281+
func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClusterSpec) (string, error) {
1282+
// Mute certain fields that will not trigger new RayCluster preparation. For example,
11441283
// Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down.
11451284
updatedRayClusterSpec := rayClusterSpec.DeepCopy()
11461285
for i := 0; i < len(updatedRayClusterSpec.WorkerGroupSpecs); i++ {
@@ -1152,13 +1291,13 @@ func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, er
11521291
return utils.GenerateJsonHash(updatedRayClusterSpec)
11531292
}
11541293

1155-
func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec) (bool, error) {
1156-
hash1, err1 := generateRayClusterJsonHash(spec1)
1294+
func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec, hashFunc func(rayv1.RayClusterSpec) (string, error)) (bool, error) {
1295+
hash1, err1 := hashFunc(spec1)
11571296
if err1 != nil {
11581297
return false, err1
11591298
}
11601299

1161-
hash2, err2 := generateRayClusterJsonHash(spec2)
1300+
hash2, err2 := hashFunc(spec2)
11621301
if err2 != nil {
11631302
return false, err2
11641303
}

0 commit comments

Comments
 (0)