Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Define the value type of the concurrent map explicitly to avoid type conversion #1789

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"

cmap "github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
Expand Down Expand Up @@ -51,8 +51,8 @@ type RayServiceReconciler struct {
Recorder record.EventRecorder
// Currently, the Ray dashboard doesn't cache the Serve deployment config.
// To avoid reapplying the same config repeatedly, cache the config in this map.
ServeConfigs cmap.ConcurrentMap
RayClusterDeletionTimestamps cmap.ConcurrentMap
ServeConfigs cmap.ConcurrentMap[string, string]
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]

dashboardClientFunc func() utils.RayDashboardClientInterface
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
Expand All @@ -65,8 +65,8 @@ func NewRayServiceReconciler(mgr manager.Manager, dashboardClientFunc func() uti
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
Recorder: mgr.GetEventRecorderFor("rayservice-controller"),
ServeConfigs: cmap.New(),
RayClusterDeletionTimestamps: cmap.New(),
ServeConfigs: cmap.New[string](),
RayClusterDeletionTimestamps: cmap.New[time.Time](),

dashboardClientFunc: dashboardClientFunc,
httpProxyClientFunc: httpProxyClientFunc,
Expand Down Expand Up @@ -411,20 +411,15 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra
// after becoming inactive to give the ingress time to update.
for _, rayClusterInstance := range rayClusterList.Items {
if rayClusterInstance.Name != rayServiceInstance.Status.ActiveServiceStatus.RayClusterName && rayClusterInstance.Name != rayServiceInstance.Status.PendingServiceStatus.RayClusterName {
cachedTimestampObj, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name)
cachedTimestamp, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name)
if !exists {
deletionTimestamp := metav1.Now().Add(RayClusterDeletionDelayDuration)
r.RayClusterDeletionTimestamps.Set(rayClusterInstance.Name, deletionTimestamp)
r.Log.V(1).Info(fmt.Sprintf("Scheduled dangling RayCluster "+
"%s for deletion at %s", rayClusterInstance.Name, deletionTimestamp))
} else {
cachedTimestamp, isTimestamp := cachedTimestampObj.(time.Time)
reasonForDeletion := ""
if !isTimestamp {
reasonForDeletion = fmt.Sprintf("Deletion cache contains "+
"unexpected, non-timestamp object for RayCluster %s. "+
"Deleting cluster immediately.", rayClusterInstance.Name)
} else if time.Since(cachedTimestamp) > 0*time.Second {
if time.Since(cachedTimestamp) > 0*time.Second {
reasonForDeletion = fmt.Sprintf("Deletion timestamp %s "+
"for RayCluster %s has passed. Deleting cluster "+
"immediately.", cachedTimestamp, rayClusterInstance.Name)
Expand Down Expand Up @@ -724,7 +719,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster, serveStatus *rayv1.RayServiceStatus) bool {
// If the Serve config has not been cached, update the Serve config.
cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name)
cachedConfigObj, exist := r.ServeConfigs.Get(cacheKey)
cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey)

if !exist {
r.Log.V(1).Info("shouldUpdate",
Expand Down Expand Up @@ -757,11 +752,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(rayServiceInstan
reason := fmt.Sprintf("Current Serve config matches cached Serve config, "+
"and some deployments have been deployed for cluster %s", rayClusterInstance.Name)

cachedServeConfigV2, isServeConfigV2 := cachedConfigObj.(string)
if !isServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("No V2 Serve Config of type string has been cached for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
} else if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

cmap "github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map/v2"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -663,7 +663,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) {
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
ServeConfigs: cmap.New(),
ServeConfigs: cmap.New[string](),
}

namespace := "ray"
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/openshift/api v0.0.0-20211209135129-c58d9f695577
github.com/orcaman/concurrent-map v1.0.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading