diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index f6466645a8f..d15d64ac8b5 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -16,7 +16,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" - cmap "github.com/orcaman/concurrent-map" + cmap "github.com/orcaman/concurrent-map/v2" "github.com/go-logr/logr" fmtErrors "github.com/pkg/errors" @@ -51,8 +51,8 @@ type RayServiceReconciler struct { Recorder record.EventRecorder // Currently, the Ray dashboard doesn't cache the Serve deployment config. // To avoid reapplying the same config repeatedly, cache the config in this map. - ServeConfigs cmap.ConcurrentMap - RayClusterDeletionTimestamps cmap.ConcurrentMap + ServeConfigs cmap.ConcurrentMap[string, string] + RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time] dashboardClientFunc func() utils.RayDashboardClientInterface httpProxyClientFunc func() utils.RayHttpProxyClientInterface @@ -65,8 +65,8 @@ func NewRayServiceReconciler(mgr manager.Manager, dashboardClientFunc func() uti Scheme: mgr.GetScheme(), Log: ctrl.Log.WithName("controllers").WithName("RayService"), Recorder: mgr.GetEventRecorderFor("rayservice-controller"), - ServeConfigs: cmap.New(), - RayClusterDeletionTimestamps: cmap.New(), + ServeConfigs: cmap.New[string](), + RayClusterDeletionTimestamps: cmap.New[time.Time](), dashboardClientFunc: dashboardClientFunc, httpProxyClientFunc: httpProxyClientFunc, @@ -411,20 +411,15 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra // after becoming inactive to give the ingress time to update. for _, rayClusterInstance := range rayClusterList.Items { if rayClusterInstance.Name != rayServiceInstance.Status.ActiveServiceStatus.RayClusterName && rayClusterInstance.Name != rayServiceInstance.Status.PendingServiceStatus.RayClusterName { - cachedTimestampObj, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name) + cachedTimestamp, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name) if !exists { deletionTimestamp := metav1.Now().Add(RayClusterDeletionDelayDuration) r.RayClusterDeletionTimestamps.Set(rayClusterInstance.Name, deletionTimestamp) r.Log.V(1).Info(fmt.Sprintf("Scheduled dangling RayCluster "+ "%s for deletion at %s", rayClusterInstance.Name, deletionTimestamp)) } else { - cachedTimestamp, isTimestamp := cachedTimestampObj.(time.Time) reasonForDeletion := "" - if !isTimestamp { - reasonForDeletion = fmt.Sprintf("Deletion cache contains "+ - "unexpected, non-timestamp object for RayCluster %s. "+ - "Deleting cluster immediately.", rayClusterInstance.Name) - } else if time.Since(cachedTimestamp) > 0*time.Second { + if time.Since(cachedTimestamp) > 0*time.Second { reasonForDeletion = fmt.Sprintf("Deletion timestamp %s "+ "for RayCluster %s has passed. Deleting cluster "+ "immediately.", cachedTimestamp, rayClusterInstance.Name) @@ -724,7 +719,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster, serveStatus *rayv1.RayServiceStatus) bool { // If the Serve config has not been cached, update the Serve config. cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name) - cachedConfigObj, exist := r.ServeConfigs.Get(cacheKey) + cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey) if !exist { r.Log.V(1).Info("shouldUpdate", @@ -757,11 +752,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstan reason := fmt.Sprintf("Current Serve config matches cached Serve config, "+ "and some deployments have been deployed for cluster %s", rayClusterInstance.Name) - cachedServeConfigV2, isServeConfigV2 := cachedConfigObj.(string) - if !isServeConfigV2 { - shouldUpdate = true - reason = fmt.Sprintf("No V2 Serve Config of type string has been cached for cluster %s with key %s", rayClusterInstance.Name, cacheKey) - } else if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 { + if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 { shouldUpdate = true reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey) } diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 73e5585e920..fab16e2188a 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - cmap "github.com/orcaman/concurrent-map" + cmap "github.com/orcaman/concurrent-map/v2" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" @@ -663,7 +663,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) { Recorder: &record.FakeRecorder{}, Scheme: scheme.Scheme, Log: ctrl.Log.WithName("controllers").WithName("RayService"), - ServeConfigs: cmap.New(), + ServeConfigs: cmap.New[string](), } namespace := "ray" diff --git a/ray-operator/go.mod b/ray-operator/go.mod index 53526415b98..b8f28f69659 100644 --- a/ray-operator/go.mod +++ b/ray-operator/go.mod @@ -11,7 +11,7 @@ require ( github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 github.com/openshift/api v0.0.0-20211209135129-c58d9f695577 - github.com/orcaman/concurrent-map v1.0.0 + github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/sirupsen/logrus v1.9.3 diff --git a/ray-operator/go.sum b/ray-operator/go.sum index e2067cf7b87..0b8984d0c3a 100644 --- a/ray-operator/go.sum +++ b/ray-operator/go.sum @@ -172,8 +172,8 @@ github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3ev github.com/openshift/api v0.0.0-20211209135129-c58d9f695577 h1:NUe82M8wMYXbd5s+WBAJ2QAZZivs+nhZ3zYgZFwKfqw= github.com/openshift/api v0.0.0-20211209135129-c58d9f695577/go.mod h1:DoslCwtqUpr3d/gsbq4ZlkaMEdYqKxuypsDjorcHhME= github.com/openshift/build-machinery-go v0.0.0-20210712174854-1bb7fd1518d3/go.mod h1:b1BuldmJlbA/xYtdZvKi+7j5YGB44qJUJDZ9zwiNCfE= -github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY= -github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=