From 410c5d4be2b2098bd999d5c34c486e51c2a01b10 Mon Sep 17 00:00:00 2001 From: zach593 Date: Sun, 9 Mar 2025 17:26:59 +0800 Subject: [PATCH] use singleton ratelimiter for dynamically created clients Signed-off-by: zach593 --- cmd/agent/app/agent.go | 10 +- .../app/controllermanager.go | 14 ++- cmd/metrics-adapter/app/options/options.go | 3 +- pkg/controllers/context/context.go | 6 +- .../execution/execution_controller_test.go | 4 +- .../mcs/service_export_controller.go | 5 +- .../endpointslice_collect_controller.go | 5 +- .../status/cluster_status_controller.go | 6 +- .../status/cluster_status_controller_test.go | 56 +++------ .../status/work_status_controller.go | 5 +- .../status/work_status_controller_test.go | 2 +- pkg/metricsadapter/controller.go | 6 +- pkg/metricsadapter/multiclient/client.go | 5 +- pkg/search/controller.go | 2 +- pkg/util/membercluster_client.go | 41 +++++-- pkg/util/membercluster_client_test.go | 26 ++-- pkg/util/objectwatcher/objectwatcher.go | 15 ++- pkg/util/ratelimiter.go | 70 +++++++++++ pkg/util/ratelimiter_test.go | 114 ++++++++++++++++++ test/e2e/framework/cluster.go | 2 +- 20 files changed, 296 insertions(+), 101 deletions(-) create mode 100644 pkg/util/ratelimiter.go create mode 100644 pkg/util/ratelimiter_test.go diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index bf8335592daf..67547f2d8b01 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -265,7 +265,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop return fmt.Errorf("failed to setup custom resource interpreter: %w", err) } - objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter) + rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst) + clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter} + + objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter) controllerContext := controllerscontext.Context{ Mgr: mgr, ObjectWatcher: objectWatcher, @@ -278,8 +281,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterSuccessThreshold: opts.ClusterSuccessThreshold, ClusterFailureThreshold: opts.ClusterFailureThreshold, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, - ClusterAPIQPS: opts.ClusterAPIQPS, - ClusterAPIBurst: opts.ClusterAPIBurst, ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, RateLimiterOptions: opts.RateLimiterOpts, EnableClusterResourceModeling: opts.EnableClusterResourceModeling, @@ -289,6 +290,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop }, StopChan: stopChan, ResourceInterpreter: resourceInterpreter, + ClusterClientOption: clusterClientOption, } if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil { @@ -315,7 +317,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error) StopChan: ctx.StopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, - ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst}, + ClusterClientOption: ctx.ClusterClientOption, ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency, ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration, ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction, diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e7068b00567f..790624705e01 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -324,7 +324,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool, StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSet, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst}, + ClusterClientOption: ctx.ClusterClientOption, ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, ClusterLeaseDuration: opts.ClusterLeaseDuration, ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, @@ -428,6 +428,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er ObjectWatcher: ctx.ObjectWatcher, PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterClientOption: ctx.ClusterClientOption, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs, RateLimiterOptions: ctx.Opts.RateLimiterOptions, @@ -465,6 +466,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, WorkerNumber: 3, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterClientOption: ctx.ClusterClientOption, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, } serviceExportController.RunWorkQueue() @@ -487,6 +489,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable WorkerNumber: 3, PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterClientOption: ctx.ClusterClientOption, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, } endpointSliceCollectController.RunWorkQueue() @@ -744,6 +747,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop } controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan) + + rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst) + clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter} + // We need a service lister to build a resource interpreter with `ClusterIPServiceResolver` // witch allows connection to the customized interpreter webhook without a cluster DNS service. sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration) @@ -756,7 +763,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup custom resource interpreter: %v", err) } - objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter) + objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter) resourceDetector := &detector.ResourceDetector{ DiscoveryClientSet: discoverClientSet, @@ -808,8 +815,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop ClusterSuccessThreshold: opts.ClusterSuccessThreshold, ClusterFailureThreshold: opts.ClusterFailureThreshold, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, - ClusterAPIQPS: opts.ClusterAPIQPS, - ClusterAPIBurst: opts.ClusterAPIBurst, SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(), ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs, EnableTaintManager: opts.EnableTaintManager, @@ -824,6 +829,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop OverrideManager: overrideManager, ControlPlaneInformerManager: controlPlaneInformerManager, ResourceInterpreter: resourceInterpreter, + ClusterClientOption: clusterClientOption, } if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil { diff --git a/cmd/metrics-adapter/app/options/options.go b/cmd/metrics-adapter/app/options/options.go index c75e4071dd2c..a18fba6fbfcf 100755 --- a/cmd/metrics-adapter/app/options/options.go +++ b/cmd/metrics-adapter/app/options/options.go @@ -134,7 +134,8 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer, factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) kubeClient := kubernetes.NewForConfigOrDie(restConfig) kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst}) + limiterGetter := util.GetRateLimiterGetter().SetLimits(o.ClusterAPIQPS, o.ClusterAPIBurst) + metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter}) metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions) metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme)) diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index d07274a85aa4..caefd6916763 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -30,6 +30,7 @@ import ( "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" @@ -67,10 +68,6 @@ type Options struct { ClusterFailureThreshold metav1.Duration // ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync. ClusterCacheSyncTimeout metav1.Duration - // ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver. - ClusterAPIQPS float32 - // ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver. - ClusterAPIBurst int // SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating. SkippedPropagatingNamespaces []*regexp.Regexp // ClusterName is the name of cluster. @@ -112,6 +109,7 @@ type Context struct { OverrideManager overridemanager.OverrideManager ControlPlaneInformerManager genericmanager.SingleClusterInformerManager ResourceInterpreter resourceinterpreter.ResourceInterpreter + ClusterClientOption *util.ClientOption } // IsControllerEnabled check if a specified controller enabled or not. diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go index ce53d860f54e..196264412a2f 100644 --- a/pkg/controllers/execution/execution_controller_test.go +++ b/pkg/controllers/execution/execution_controller_test.go @@ -229,7 +229,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) informerManager.Start(cluster.Name) informerManager.WaitForCacheSync(cluster.Name) - clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) { + clusterClientSetFunc := func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) { return &util.DynamicClusterClient{ ClusterName: clusterName, DynamicClientSet: dynamicClientSet, @@ -241,7 +241,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr InformerManager: informerManager, EventRecorder: recorder, RESTMapper: restMapper, - ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter), + ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter), } } diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 16a682edb797..5dc867c451d3 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -68,7 +68,8 @@ type ServiceExportController struct { InformerManager genericmanager.MultiClusterInformerManager WorkerNumber int // WorkerNumber is the number of worker goroutines PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. - ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc + ClusterClientOption *util.ClientOption ClusterCacheSyncTimeout metav1.Duration // eventHandlers holds the handlers which used to handle events reported from member clusters. @@ -234,7 +235,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error { singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return err diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 8f398166b9df..ff771dd9c1a4 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -60,7 +60,8 @@ type EndpointSliceCollectController struct { InformerManager genericmanager.MultiClusterInformerManager WorkerNumber int // WorkerNumber is the number of worker goroutines PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. - ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc + ClusterClientOption *util.ClientOption // eventHandlers holds the handlers which used to handle events reported from member clusters. // Each handler takes the cluster name as key and takes the handler function as the value, e.g. // "member1": instance of ResourceEventHandler @@ -177,7 +178,7 @@ func (c *EndpointSliceCollectController) buildResourceInformers(clusterName stri func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error { singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return err diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index e3e3617235ba..608e428bebd3 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -87,8 +87,8 @@ type ClusterStatusController struct { TypedInformerManager typedmanager.MultiClusterInformerManager GenericInformerManager genericmanager.MultiClusterInformerManager StopChan <-chan struct{} - ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error) - ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterClientSetFunc util.NewClusterClientSetFunc + ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc // ClusterClientOption holds the attributes that should be injected to a Kubernetes client. ClusterClientOption *util.ClientOption @@ -343,7 +343,7 @@ func (c *ClusterStatusController) initializeGenericInformerManagerForCluster(clu return } - dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client) + dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client, c.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName) return diff --git a/pkg/controllers/status/cluster_status_controller_test.go b/pkg/controllers/status/cluster_status_controller_test.go index c6221228aac6..0b4d6473b2e0 100644 --- a/pkg/controllers/status/cluster_status_controller_test.go +++ b/pkg/controllers/status/cluster_status_controller_test.go @@ -113,11 +113,8 @@ func TestClusterStatusController_Reconcile(t *testing.T) { Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), GenericInformerManager: genericmanager.GetInstance(), TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, - ClusterClientSetFunc: util.NewClusterClientSet, + ClusterClientOption: &util.ClientOption{}, + ClusterClientSetFunc: util.NewClusterClientSet, } if tt.cluster != nil { @@ -900,11 +897,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) { ).WithStatusSubresource(cluster).Build(), GenericInformerManager: genericmanager.GetInstance(), TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, - ClusterClientSetFunc: util.NewClusterClientSet, + ClusterClientOption: &util.ClientOption{}, + ClusterClientSetFunc: util.NewClusterClientSet, } err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus) @@ -964,11 +958,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) { Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), GenericInformerManager: genericmanager.GetInstance(), TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, - ClusterClientSetFunc: util.NewClusterClientSet, + ClusterClientOption: &util.ClientOption{}, + ClusterClientSetFunc: util.NewClusterClientSet, } err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus) @@ -976,20 +967,17 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) { }) } -func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client) (*util.DynamicClusterClient, error) { +func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) { return nil, fmt.Errorf("err") } func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *testing.T) { t.Run("failed to create dynamicClient", func(*testing.T) { c := &ClusterStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), - GenericInformerManager: genericmanager.GetInstance(), - TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), + GenericInformerManager: genericmanager.GetInstance(), + TypedInformerManager: typedmanager.GetInstance(), + ClusterClientOption: &util.ClientOption{}, ClusterClientSetFunc: util.NewClusterClientSet, ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError, } @@ -1002,13 +990,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t t.Run("suc to create dynamicClient", func(*testing.T) { c := &ClusterStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), - GenericInformerManager: genericmanager.GetInstance(), - TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), + GenericInformerManager: genericmanager.GetInstance(), + TypedInformerManager: typedmanager.GetInstance(), + ClusterClientOption: &util.ClientOption{}, ClusterClientSetFunc: util.NewClusterClientSet, ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, } @@ -1022,13 +1007,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t func TestClusterStatusController_initLeaseController(_ *testing.T) { c := &ClusterStatusController{ - Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), - GenericInformerManager: genericmanager.GetInstance(), - TypedInformerManager: typedmanager.GetInstance(), - ClusterClientOption: &util.ClientOption{ - QPS: 5, - Burst: 10, - }, + Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), + GenericInformerManager: genericmanager.GetInstance(), + TypedInformerManager: typedmanager.GetInstance(), + ClusterClientOption: &util.ClientOption{}, ClusterClientSetFunc: util.NewClusterClientSet, ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError, } diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index af02fab2704e..0a0264c0d447 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -71,7 +71,8 @@ type WorkStatusController struct { ConcurrentWorkStatusSyncs int ObjectWatcher objectwatcher.ObjectWatcher PredicateFunc predicate.Predicate - ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc + ClusterClientOption *util.ClientOption ClusterCacheSyncTimeout metav1.Duration RateLimiterOptions ratelimiterflag.Options ResourceInterpreter resourceinterpreter.ResourceInterpreter @@ -533,7 +534,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1. // the cache in informer manager should be updated. singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) if singleClusterInformerManager == nil { - dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) return nil, err diff --git a/pkg/controllers/status/work_status_controller_test.go b/pkg/controllers/status/work_status_controller_test.go index 24d2f5f29d6b..0b9abbde9b8c 100644 --- a/pkg/controllers/status/work_status_controller_test.go +++ b/pkg/controllers/status/work_status_controller_test.go @@ -734,7 +734,7 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets if len(dynamicClientSets) > 0 { c.ResourceInterpreter = FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()} - c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter) + c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, nil, c.ResourceInterpreter) // Generate InformerManager clusterName := cluster.Name diff --git a/pkg/metricsadapter/controller.go b/pkg/metricsadapter/controller.go index b9bf778939ec..a8749b8b0e5d 100755 --- a/pkg/metricsadapter/controller.go +++ b/pkg/metricsadapter/controller.go @@ -58,6 +58,7 @@ type MetricsController struct { MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface queue workqueue.TypedRateLimitingInterface[any] restConfig *rest.Config + clusterClientOption *util.ClientOption } // NewMetricsController creates a new metrics controller @@ -73,6 +74,7 @@ func NewMetricsController(stopCh <-chan struct{}, restConfig *rest.Config, facto queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ Name: "metrics-adapter", }), + clusterClientOption: clusterClientOption, } controller.addEventHandler() @@ -235,11 +237,11 @@ func (m *MetricsController) handleClusters() bool { if !m.TypedInformerManager.IsManagerExist(clusterName) { klog.Info("Try to build informer manager for cluster ", clusterName) controlPlaneClient := gclient.NewForConfigOrDie(m.restConfig) - clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, nil) + clusterClient, err := util.NewClusterClientSet(clusterName, controlPlaneClient, m.clusterClientOption) if err != nil { return false } - clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient) + clusterDynamicClient, err := util.NewClusterDynamicClientSet(clusterName, controlPlaneClient, m.clusterClientOption) if err != nil { return false } diff --git a/pkg/metricsadapter/multiclient/client.go b/pkg/metricsadapter/multiclient/client.go index abf30fb39e18..0729a31fe08c 100644 --- a/pkg/metricsadapter/multiclient/client.go +++ b/pkg/metricsadapter/multiclient/client.go @@ -74,8 +74,9 @@ func (m *MultiClusterDiscovery) Set(clusterName string) error { if err != nil { return err } - clusterConfig.QPS = m.clusterClientOption.QPS - clusterConfig.Burst = m.clusterClientOption.Burst + if m.clusterClientOption != nil && m.clusterClientOption.RateLimiterGetter != nil { + clusterConfig.RateLimiter = m.clusterClientOption.RateLimiterGetter(clusterName) + } m.Lock() defer m.Unlock() m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig) diff --git a/pkg/search/controller.go b/pkg/search/controller.go index 16bedf94fbff..934491563940 100644 --- a/pkg/search/controller.go +++ b/pkg/search/controller.go @@ -351,7 +351,7 @@ func (c *Controller) getRegistryBackendHandler(cluster string, matchedRegistries } var clusterDynamicClientBuilder = func(cluster string, controlPlaneClient client.Client) (*util.DynamicClusterClient, error) { - return util.NewClusterDynamicClientSet(cluster, controlPlaneClient) + return util.NewClusterDynamicClientSet(cluster, controlPlaneClient, nil) } // doCacheCluster processes the resourceRegistry object diff --git a/pkg/util/membercluster_client.go b/pkg/util/membercluster_client.go index d5e99bfe81ff..a2ef6bf8e61b 100644 --- a/pkg/util/membercluster_client.go +++ b/pkg/util/membercluster_client.go @@ -28,6 +28,7 @@ import ( kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/scale" + "k8s.io/client-go/util/flowcontrol" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -60,13 +61,9 @@ type ClusterScaleClient struct { // ClientOption holds the attributes that should be injected to a Kubernetes client. type ClientOption struct { - // QPS indicates the maximum QPS to the master from this client. - // If it's zero, the created RESTClient will use DefaultQPS: 5 - QPS float32 - - // Burst indicates the maximum burst for throttle. - // If it's zero, the created RESTClient will use DefaultBurst: 10. - Burst int + // RateLimiter is used to limit the QPS to the master from this client. + // We use this instead of QPS/Burst to avoid multiple client initializations causing QPS/Burst to lose its effect. + RateLimiterGetter func(key string) flowcontrol.RateLimiter } // NewClusterScaleClientSet returns a ClusterScaleClient for the given member cluster. @@ -102,6 +99,9 @@ func NewClusterScaleClientSet(clusterName string, client client.Client) (*Cluste return &clusterScaleClientSet, nil } +// NewClusterClientSetFunc is a function that returns a ClusterClient for the given member cluster. +type NewClusterClientSetFunc = func(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) + // NewClusterClientSet returns a ClusterClient for the given member cluster. func NewClusterClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*ClusterClient, error) { clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client)) @@ -113,8 +113,9 @@ func NewClusterClientSet(clusterName string, client client.Client, clientOption if clusterConfig != nil { if clientOption != nil { - clusterConfig.QPS = clientOption.QPS - clusterConfig.Burst = clientOption.Burst + if clientOption.RateLimiterGetter != nil { + clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName) + } } clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig) } @@ -132,16 +133,20 @@ func NewClusterClientSetForAgent(clusterName string, _ client.Client, clientOpti if clusterConfig != nil { if clientOption != nil { - clusterConfig.QPS = clientOption.QPS - clusterConfig.Burst = clientOption.Burst + if clientOption.RateLimiterGetter != nil { + clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName) + } } clusterClientSet.KubeClient = kubeclientset.NewForConfigOrDie(clusterConfig) } return &clusterClientSet, nil } +// NewClusterDynamicClientSetFunc is a function that returns a dynamic client for the given member cluster. +type NewClusterDynamicClientSetFunc = func(clusterName string, client client.Client, clientOption *ClientOption) (*DynamicClusterClient, error) + // NewClusterDynamicClientSet returns a dynamic client for the given member cluster. -func NewClusterDynamicClientSet(clusterName string, client client.Client) (*DynamicClusterClient, error) { +func NewClusterDynamicClientSet(clusterName string, client client.Client, clientOption *ClientOption) (*DynamicClusterClient, error) { clusterConfig, err := BuildClusterConfig(clusterName, clusterGetter(client), secretGetter(client)) if err != nil { return nil, err @@ -149,13 +154,18 @@ func NewClusterDynamicClientSet(clusterName string, client client.Client) (*Dyna var clusterClientSet = DynamicClusterClient{ClusterName: clusterName} if clusterConfig != nil { + if clientOption != nil { + if clientOption.RateLimiterGetter != nil { + clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName) + } + } clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig) } return &clusterClientSet, nil } // NewClusterDynamicClientSetForAgent returns a dynamic client for the given member cluster which will be used in karmada agent. -func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client) (*DynamicClusterClient, error) { +func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client, clientOption *ClientOption) (*DynamicClusterClient, error) { clusterConfig, err := controllerruntime.GetConfig() if err != nil { return nil, fmt.Errorf("error building kubeconfig of member cluster: %s", err.Error()) @@ -163,6 +173,11 @@ func NewClusterDynamicClientSetForAgent(clusterName string, _ client.Client) (*D var clusterClientSet = DynamicClusterClient{ClusterName: clusterName} if clusterConfig != nil { + if clientOption != nil { + if clientOption.RateLimiterGetter != nil { + clusterConfig.RateLimiter = clientOption.RateLimiterGetter(clusterName) + } + } clusterClientSet.DynamicClientSet = dynamic.NewForConfigOrDie(clusterConfig) } return &clusterClientSet, nil diff --git a/pkg/util/membercluster_client_test.go b/pkg/util/membercluster_client_test.go index 4aa026d16542..d1a5f6583825 100644 --- a/pkg/util/membercluster_client_test.go +++ b/pkg/util/membercluster_client_test.go @@ -195,7 +195,7 @@ func TestNewClusterClientSetForAgent(t *testing.T) { args: args{ clusterName: "test-agent", client: fakeclient.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(), - clientOption: &ClientOption{QPS: 100, Burst: 200}, + clientOption: &ClientOption{}, }, wantErr: false, }, @@ -231,8 +231,9 @@ func TestNewClusterClientSetForAgent(t *testing.T) { func TestNewClusterDynamicClientSetForAgent(t *testing.T) { type args struct { - clusterName string - client client.Client + clusterName string + client client.Client + clusterClientOption *ClientOption } tests := []struct { name string @@ -263,7 +264,7 @@ func TestNewClusterDynamicClientSetForAgent(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewClusterDynamicClientSetForAgent(tt.args.clusterName, tt.args.client) + got, err := NewClusterDynamicClientSetForAgent(tt.args.clusterName, tt.args.client, tt.args.clusterClientOption) if tt.wantErr { assert.Error(t, err) assert.Nil(t, got) @@ -368,7 +369,7 @@ func TestNewClusterClientSet(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}, }).Build(), - clientOption: &ClientOption{QPS: 100, Burst: 200}, + clientOption: &ClientOption{}, }, wantErr: false, }, @@ -389,7 +390,7 @@ func TestNewClusterClientSet(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token")}, }).Build(), - clientOption: &ClientOption{QPS: 100, Burst: 200}, + clientOption: &ClientOption{}, }, wantErr: false, }, @@ -409,7 +410,7 @@ func TestNewClusterClientSet(t *testing.T) { &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}}).Build(), - clientOption: &ClientOption{QPS: 100, Burst: 200}, + clientOption: &ClientOption{}, }, wantErr: true, }, @@ -430,7 +431,7 @@ func TestNewClusterClientSet(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "ns1", Name: "secret1"}, Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}, }).Build(), - clientOption: &ClientOption{QPS: 100, Burst: 200}, + clientOption: &ClientOption{}, }, wantErr: false, }, @@ -499,8 +500,9 @@ func TestNewClusterClientSet_ClientWorks(t *testing.T) { func TestNewClusterDynamicClientSet(t *testing.T) { type args struct { - clusterName string - client client.Client + clusterName string + client client.Client + clusterClientOption *ClientOption } tests := []struct { name string @@ -647,7 +649,7 @@ func TestNewClusterDynamicClientSet(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewClusterDynamicClientSet(tt.args.clusterName, tt.args.client) + got, err := NewClusterDynamicClientSet(tt.args.clusterName, tt.args.client, tt.args.clusterClientOption) if tt.wantErr { assert.Error(t, err) assert.Nil(t, got) @@ -691,7 +693,7 @@ func TestNewClusterDynamicClientSet_ClientWorks(t *testing.T) { Data: map[string][]byte{clusterv1alpha1.SecretTokenKey: []byte("token"), clusterv1alpha1.SecretCADataKey: testCA}, }).Build() - clusterClient, err := NewClusterDynamicClientSet(clusterName, hostClient) + clusterClient, err := NewClusterDynamicClientSet(clusterName, hostClient, nil) assert.NoError(t, err) assert.NotNil(t, clusterClient) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 47a98560e7f8..32faf71a5f51 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -47,33 +47,32 @@ type ObjectWatcher interface { NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) } -// ClientSetFunc is used to generate client set of member cluster -type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error) - type objectWatcherImpl struct { Lock sync.RWMutex RESTMapper meta.RESTMapper KubeClientSet client.Client VersionRecord map[string]map[string]string - ClusterClientSetFunc ClientSetFunc + ClusterClientSetFunc util.NewClusterDynamicClientSetFunc + ClusterClientOption *util.ClientOption resourceInterpreter resourceinterpreter.ResourceInterpreter InformerManager genericmanager.MultiClusterInformerManager } // NewObjectWatcher returns an instance of ObjectWatcher -func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher { +func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc util.NewClusterDynamicClientSetFunc, clusterClientOption *util.ClientOption, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher { return &objectWatcherImpl{ KubeClientSet: kubeClientSet, VersionRecord: make(map[string]map[string]string), RESTMapper: restMapper, ClusterClientSetFunc: clusterClientSetFunc, + ClusterClientOption: clusterClientOption, resourceInterpreter: interpreter, InformerManager: genericmanager.GetInstance(), } } func (o *objectWatcherImpl) Create(ctx context.Context, clusterName string, desireObj *unstructured.Unstructured) error { - dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err) return err @@ -146,7 +145,7 @@ func (o *objectWatcherImpl) Update(ctx context.Context, clusterName string, desi desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation) } - dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err) return err @@ -199,7 +198,7 @@ func (o *objectWatcherImpl) Delete(ctx context.Context, clusterName string, desi return nil } - dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet) + dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet, o.ClusterClientOption) if err != nil { klog.Errorf("Failed to build dynamic cluster client for cluster %s, err: %v.", clusterName, err) return err diff --git a/pkg/util/ratelimiter.go b/pkg/util/ratelimiter.go new file mode 100644 index 000000000000..6760a488e687 --- /dev/null +++ b/pkg/util/ratelimiter.go @@ -0,0 +1,70 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" + + "k8s.io/client-go/util/flowcontrol" +) + +var defaultRateLimiterGetter = &RateLimiterGetter{} + +// GetRateLimiterGetter returns a RateLimiterGetter. +func GetRateLimiterGetter() *RateLimiterGetter { + return defaultRateLimiterGetter +} + +// RateLimiterGetter is a struct to get rate limiter. +type RateLimiterGetter struct { + limiters map[string]flowcontrol.RateLimiter + mu sync.Mutex + qps float32 + burst int +} + +// SetLimits sets the qps and burst. +func (r *RateLimiterGetter) SetLimits(qps float32, burst int) *RateLimiterGetter { + r.mu.Lock() + defer r.mu.Unlock() + r.qps = qps + r.burst = burst + return r +} + +// GetRateLimiter gets rate limiter by key. +func (r *RateLimiterGetter) GetRateLimiter(key string) flowcontrol.RateLimiter { + r.mu.Lock() + defer r.mu.Unlock() + if r.limiters == nil { + r.limiters = make(map[string]flowcontrol.RateLimiter) + } + limiter, ok := r.limiters[key] + if !ok { + qps := r.qps + if qps <= 0 { + qps = 5 + } + burst := r.burst + if burst <= 0 { + burst = 10 + } + limiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) + r.limiters[key] = limiter + } + return limiter +} diff --git a/pkg/util/ratelimiter_test.go b/pkg/util/ratelimiter_test.go new file mode 100644 index 000000000000..6a31fc2a0fac --- /dev/null +++ b/pkg/util/ratelimiter_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/util/flowcontrol" +) + +func TestGetRateLimiterGetter(t *testing.T) { + tests := []struct { + name string + want *RateLimiterGetter + }{ + { + name: "get the default singleton", + want: defaultRateLimiterGetter, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, GetRateLimiterGetter(), "GetRateLimiterGetter()") + }) + } +} + +func TestRateLimiterGetter_GetRateLimiter(t *testing.T) { + tests := []struct { + name string + getter *RateLimiterGetter + want flowcontrol.RateLimiter + }{ + { + name: "if qps/burst not set, use default value", + getter: &RateLimiterGetter{}, + want: flowcontrol.NewTokenBucketRateLimiter(5, 10), + }, + { + name: "SetLimits() should able to work", + getter: func() *RateLimiterGetter { + return (&RateLimiterGetter{}).SetLimits(100, 200) + }(), + want: flowcontrol.NewTokenBucketRateLimiter(100, 200), + }, + { + name: "if qps/burst invalid, use default value", + getter: func() *RateLimiterGetter { + return (&RateLimiterGetter{}).SetLimits(-1, -1) + }(), + want: flowcontrol.NewTokenBucketRateLimiter(5, 10), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, tt.getter.GetRateLimiter("a"), "GetRateLimiter()") + }) + } +} + +func TestRateLimiterGetter_GetSameRateLimiter(t *testing.T) { + type args struct { + key1 string + key2 string + } + tests := []struct { + name string + args args + wantSameLimiter bool + }{ + { + name: "for a single cluster, the same limiter instance is obtained each time", + args: args{ + key1: "a", + key2: "a", + }, + wantSameLimiter: true, + }, + { + name: "the rate limiter is independent for each cluster", + args: args{ + key1: "a", + key2: "b", + }, + wantSameLimiter: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + getter := &RateLimiterGetter{} + rl1 := getter.GetRateLimiter(tt.args.key1) + rl2 := getter.GetRateLimiter(tt.args.key2) + gotSameLimiter := rl1 == rl2 + assert.Equalf(t, tt.wantSameLimiter, gotSameLimiter, + "key1: %v, key2: %v, wantSameLimiter: %v, rl1: %p, rl2: %p", + tt.args.key1, tt.args.key2, tt.wantSameLimiter, rl1, rl2) + }) + } +} diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index 13f84e32cb59..e08abae14349 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -199,7 +199,7 @@ func newClusterClientSet(controlPlaneClient client.Client, c *clusterv1alpha1.Cl if err != nil { return nil, nil, err } - clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient) + clusterDynamicClient, err := util.NewClusterDynamicClientSet(c.Name, controlPlaneClient, nil) if err != nil { return nil, nil, err }