Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 committed Dec 29, 2023
1 parent 7fd79f8 commit eef5ef5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 23 deletions.
27 changes: 9 additions & 18 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"

cmap "github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
Expand Down Expand Up @@ -51,8 +51,8 @@ type RayServiceReconciler struct {
Recorder record.EventRecorder
// Currently, the Ray dashboard doesn't cache the Serve deployment config.
// To avoid reapplying the same config repeatedly, cache the config in this map.
ServeConfigs cmap.ConcurrentMap
RayClusterDeletionTimestamps cmap.ConcurrentMap
ServeConfigs cmap.ConcurrentMap[string, string]
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]

dashboardClientFunc func() utils.RayDashboardClientInterface
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
Expand All @@ -65,8 +65,8 @@ func NewRayServiceReconciler(mgr manager.Manager, dashboardClientFunc func() uti
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
Recorder: mgr.GetEventRecorderFor("rayservice-controller"),
ServeConfigs: cmap.New(),
RayClusterDeletionTimestamps: cmap.New(),
ServeConfigs: cmap.New[string](),
RayClusterDeletionTimestamps: cmap.New[time.Time](),

dashboardClientFunc: dashboardClientFunc,
httpProxyClientFunc: httpProxyClientFunc,
Expand Down Expand Up @@ -411,20 +411,15 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra
// after becoming inactive to give the ingress time to update.
for _, rayClusterInstance := range rayClusterList.Items {
if rayClusterInstance.Name != rayServiceInstance.Status.ActiveServiceStatus.RayClusterName && rayClusterInstance.Name != rayServiceInstance.Status.PendingServiceStatus.RayClusterName {
cachedTimestampObj, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name)
cachedTimestamp, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name)
if !exists {
deletionTimestamp := metav1.Now().Add(RayClusterDeletionDelayDuration)
r.RayClusterDeletionTimestamps.Set(rayClusterInstance.Name, deletionTimestamp)
r.Log.V(1).Info(fmt.Sprintf("Scheduled dangling RayCluster "+
"%s for deletion at %s", rayClusterInstance.Name, deletionTimestamp))
} else {
cachedTimestamp, isTimestamp := cachedTimestampObj.(time.Time)
reasonForDeletion := ""
if !isTimestamp {
reasonForDeletion = fmt.Sprintf("Deletion cache contains "+
"unexpected, non-timestamp object for RayCluster %s. "+
"Deleting cluster immediately.", rayClusterInstance.Name)
} else if time.Since(cachedTimestamp) > 0*time.Second {
if time.Since(cachedTimestamp) > 0*time.Second {
reasonForDeletion = fmt.Sprintf("Deletion timestamp %s "+
"for RayCluster %s has passed. Deleting cluster "+
"immediately.", cachedTimestamp, rayClusterInstance.Name)
Expand Down Expand Up @@ -724,7 +719,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster, serveStatus *rayv1.RayServiceStatus) bool {
// If the Serve config has not been cached, update the Serve config.
cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name)
cachedConfigObj, exist := r.ServeConfigs.Get(cacheKey)
cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey)

if !exist {
r.Log.V(1).Info("shouldUpdate",
Expand Down Expand Up @@ -757,11 +752,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstan
reason := fmt.Sprintf("Current Serve config matches cached Serve config, "+
"and some deployments have been deployed for cluster %s", rayClusterInstance.Name)

cachedServeConfigV2, isServeConfigV2 := cachedConfigObj.(string)
if !isServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("No V2 Serve Config of type string has been cached for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
} else if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

cmap "github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map/v2"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -663,7 +663,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) {
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
ServeConfigs: cmap.New(),
ServeConfigs: cmap.New[string](),
}

namespace := "ray"
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/openshift/api v0.0.0-20211209135129-c58d9f695577
github.com/orcaman/concurrent-map v1.0.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit eef5ef5

Please sign in to comment.