Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use singleton ratelimiter for dynamically created clients #6192

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
Expand All @@ -278,8 +281,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
Expand All @@ -289,6 +290,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
},
StopChan: stopChan,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand All @@ -315,7 +317,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
StopChan: ctx.StopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
Expand Down
14 changes: 10 additions & 4 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
Expand Down Expand Up @@ -428,6 +428,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
Expand Down Expand Up @@ -465,6 +466,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
serviceExportController.RunWorkQueue()
Expand All @@ -487,6 +489,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
endpointSliceCollectController.RunWorkQueue()
Expand Down Expand Up @@ -744,6 +747,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)

rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
Expand All @@ -756,7 +763,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Expand Down Expand Up @@ -808,8 +815,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
EnableTaintManager: opts.EnableTaintManager,
Expand All @@ -824,6 +829,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/metrics-adapter/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
limiterGetter := util.GetRateLimiterGetter().SetLimits(o.ClusterAPIQPS, o.ClusterAPIBurst)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
Expand Down Expand Up @@ -67,10 +68,6 @@ type Options struct {
ClusterFailureThreshold metav1.Duration
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
// SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating.
SkippedPropagatingNamespaces []*regexp.Regexp
// ClusterName is the name of cluster.
Expand Down Expand Up @@ -112,6 +109,7 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
ClusterClientOption *util.ClientOption
}

// IsControllerEnabled check if a specified controller enabled or not.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
clusterClientSetFunc := func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
Expand All @@ -241,7 +241,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
InformerManager: informerManager,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type ServiceExportController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
ClusterCacheSyncTimeout metav1.Duration

// eventHandlers holds the handlers which used to handle events reported from member clusters.
Expand Down Expand Up @@ -234,7 +235,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha
func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type EndpointSliceCollectController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
Expand Down Expand Up @@ -177,7 +178,7 @@ func (c *EndpointSliceCollectController) buildResourceInformers(clusterName stri
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ type ClusterStatusController struct {
TypedInformerManager typedmanager.MultiClusterInformerManager
GenericInformerManager genericmanager.MultiClusterInformerManager
StopChan <-chan struct{}
ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterClientSetFunc util.NewClusterClientSetFunc
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
// ClusterClientOption holds the attributes that should be injected to a Kubernetes client.
ClusterClientOption *util.ClientOption

Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *ClusterStatusController) initializeGenericInformerManagerForCluster(clu
return
}

dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client)
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName)
return
Expand Down
56 changes: 19 additions & 37 deletions pkg/controllers/status/cluster_status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,8 @@ func TestClusterStatusController_Reconcile(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

if tt.cluster != nil {
Expand Down Expand Up @@ -900,11 +897,8 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
).WithStatusSubresource(cluster).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
Expand Down Expand Up @@ -964,32 +958,26 @@ func TestClusterStatusController_updateStatusIfNeeded(t *testing.T) {
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
}

err := c.updateStatusIfNeeded(context.Background(), cluster, currentClusterStatus)
assert.NotEmpty(t, err, "updateStatusIfNeeded doesn't return error")
})
}

func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client) (*util.DynamicClusterClient, error) {
func NewClusterDynamicClientSetForAgentWithError(_ string, _ client.Client, _ *util.ClientOption) (*util.DynamicClusterClient, error) {
return nil, fmt.Errorf("err")
}

func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *testing.T) {
t.Run("failed to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}
Expand All @@ -1002,13 +990,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t

t.Run("suc to create dynamicClient", func(*testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
}
Expand All @@ -1022,13 +1007,10 @@ func TestClusterStatusController_initializeGenericInformerManagerForCluster(t *t

func TestClusterStatusController_initLeaseController(_ *testing.T) {
c := &ClusterStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{
QPS: 5,
Burst: 10,
},
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).Build(),
GenericInformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedmanager.GetInstance(),
ClusterClientOption: &util.ClientOption{},
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: NewClusterDynamicClientSetForAgentWithError,
}
Expand Down
Loading