Skip to content

Commit

Permalink
use singleton ratelimiter for dynamically created clients
Browse files Browse the repository at this point in the history
Signed-off-by: zach593 <[email protected]>
  • Loading branch information
zach593 committed Mar 9, 2025
1 parent 8d43fe2 commit 410c5d4
Show file tree
Hide file tree
Showing 20 changed files with 296 additions and 101 deletions.
10 changes: 6 additions & 4 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
Expand All @@ -278,8 +281,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
Expand All @@ -289,6 +290,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
},
StopChan: stopChan,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand All @@ -315,7 +317,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
StopChan: ctx.StopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
Expand Down
14 changes: 10 additions & 4 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
Expand Down Expand Up @@ -428,6 +428,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
Expand Down Expand Up @@ -465,6 +466,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
serviceExportController.RunWorkQueue()
Expand All @@ -487,6 +489,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
endpointSliceCollectController.RunWorkQueue()
Expand Down Expand Up @@ -744,6 +747,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)

rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
Expand All @@ -756,7 +763,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Expand Down Expand Up @@ -808,8 +815,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
EnableTaintManager: opts.EnableTaintManager,
Expand All @@ -824,6 +829,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/metrics-adapter/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
limiterGetter := util.GetRateLimiterGetter().SetLimits(o.ClusterAPIQPS, o.ClusterAPIBurst)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
Expand Down Expand Up @@ -67,10 +68,6 @@ type Options struct {
ClusterFailureThreshold metav1.Duration
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
// SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating.
SkippedPropagatingNamespaces []*regexp.Regexp
// ClusterName is the name of cluster.
Expand Down Expand Up @@ -112,6 +109,7 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
ClusterClientOption *util.ClientOption
}

// IsControllerEnabled check if a specified controller enabled or not.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
clusterClientSetFunc := func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
Expand All @@ -241,7 +241,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
InformerManager: informerManager,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type ServiceExportController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
ClusterCacheSyncTimeout metav1.Duration

// eventHandlers holds the handlers which used to handle events reported from member clusters.
Expand Down Expand Up @@ -234,7 +235,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha
func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type EndpointSliceCollectController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
Expand Down Expand Up @@ -177,7 +178,7 @@ func (c *EndpointSliceCollectController) buildResourceInformers(clusterName stri
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ type ClusterStatusController struct {
TypedInformerManager typedmanager.MultiClusterInformerManager
GenericInformerManager genericmanager.MultiClusterInformerManager
StopChan <-chan struct{}
ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterClientSetFunc util.NewClusterClientSetFunc
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
// ClusterClientOption holds the attributes that should be injected to a Kubernetes client.
ClusterClientOption *util.ClientOption

Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *ClusterStatusController) initializeGenericInformerManagerForCluster(clu
return
}

dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client)
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName)
return
Expand Down
56 changes: 19 additions & 37 deletions pkg/controllers/status/cluster_status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func TestClusterStatusController_Reconcile(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

if tt.cluster != nil {
Expand Down Expand Up @@ -900,11 +897,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
).WithStatusSubresource(cluster).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
Expand Down Expand Up @@ -964,32 +958,26 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
assert.NotEmpty(t, err, "updateStatusIfNeeded doesn't return error")
})
}

func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client) (*util.DynamicClusterClient, error) {
func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) {
return nil, fmt.Errorf("err")
}

func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *testing.T) {
t.Run("failed to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}
Expand All @@ -1002,13 +990,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t

t.Run("suc to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
Expand All @@ -1022,13 +1007,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t

func TestClusterStatusController_initLeaseController(_ *testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}
Expand Down
Loading

0 comments on commit 410c5d4

Please sign in to comment.