Skip to content

Commit 8084c62

Browse files
committed
unify http ratelimiter
Signed-off-by: zach593 <[email protected]>
1 parent ae2d1c1 commit 8084c62

File tree

25 files changed

+202
-112
lines changed

25 files changed

+202
-112
lines changed

cmd/agent/app/agent.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/client-go/informers"
2929
kubeclientset "k8s.io/client-go/kubernetes"
3030
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/util/flowcontrol"
3132
cliflag "k8s.io/component-base/cli/flag"
3233
"k8s.io/component-base/term"
3334
"k8s.io/klog/v2"
@@ -140,8 +141,7 @@ func run(ctx context.Context, opts *options.Options) error {
140141
if err != nil {
141142
return fmt.Errorf("error building kubeconfig of karmada control plane: %w", err)
142143
}
143-
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
144-
144+
controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
145145
clusterConfig, err := controllerruntime.GetConfig()
146146
if err != nil {
147147
return fmt.Errorf("error building kubeconfig of member cluster: %w", err)
@@ -265,7 +265,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
265265
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
266266
}
267267

268-
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
268+
rateLimiterGetter := util.GetRateLimiterGetter()
269+
rateLimiterGetter.SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
270+
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
271+
272+
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
269273
controllerContext := controllerscontext.Context{
270274
Mgr: mgr,
271275
ObjectWatcher: objectWatcher,
@@ -278,8 +282,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
278282
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
279283
ClusterFailureThreshold: opts.ClusterFailureThreshold,
280284
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
281-
ClusterAPIQPS: opts.ClusterAPIQPS,
282-
ClusterAPIBurst: opts.ClusterAPIBurst,
283285
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
284286
RateLimiterOptions: opts.RateLimiterOpts,
285287
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
@@ -289,6 +291,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
289291
},
290292
StopChan: stopChan,
291293
ResourceInterpreter: resourceInterpreter,
294+
ClusterClientOption: clusterClientOption,
292295
}
293296

294297
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
@@ -315,7 +318,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
315318
StopChan: ctx.StopChan,
316319
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
317320
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
318-
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
321+
ClusterClientOption: ctx.ClusterClientOption,
319322
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
320323
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
321324
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,

cmd/aggregated-apiserver/app/options/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
utilfeature "k8s.io/apiserver/pkg/util/feature"
3737
utilversion "k8s.io/apiserver/pkg/util/version"
3838
"k8s.io/client-go/kubernetes"
39+
"k8s.io/client-go/util/flowcontrol"
3940
"k8s.io/klog/v2"
4041
netutils "k8s.io/utils/net"
4142

@@ -120,7 +121,7 @@ func (o *Options) Run(ctx context.Context) error {
120121
}
121122

122123
restConfig := config.GenericConfig.ClientConfig
123-
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
124+
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
124125
secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister()
125126
config.GenericConfig.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")
126127

cmd/controller-manager/app/controllermanager.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/client-go/informers"
3030
kubeclientset "k8s.io/client-go/kubernetes"
3131
"k8s.io/client-go/rest"
32+
"k8s.io/client-go/util/flowcontrol"
3233
cliflag "k8s.io/component-base/cli/flag"
3334
"k8s.io/component-base/term"
3435
"k8s.io/klog/v2"
@@ -144,7 +145,7 @@ func Run(ctx context.Context, opts *options.Options) error {
144145
if err != nil {
145146
panic(err)
146147
}
147-
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
148+
controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
148149
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
149150
Logger: klog.Background(),
150151
Scheme: gclient.NewSchema(),
@@ -324,7 +325,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
324325
StopChan: stopChan,
325326
ClusterClientSetFunc: util.NewClusterClientSet,
326327
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
327-
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
328+
ClusterClientOption: ctx.ClusterClientOption,
328329
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
329330
ClusterLeaseDuration: opts.ClusterLeaseDuration,
330331
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
@@ -428,6 +429,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
428429
ObjectWatcher: ctx.ObjectWatcher,
429430
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
430431
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
432+
ClusterClientOption: ctx.ClusterClientOption,
431433
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
432434
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
433435
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
@@ -465,6 +467,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
465467
WorkerNumber: 3,
466468
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
467469
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
470+
ClusterClientOption: ctx.ClusterClientOption,
468471
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
469472
}
470473
serviceExportController.RunWorkQueue()
@@ -487,6 +490,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
487490
WorkerNumber: 3,
488491
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
489492
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
493+
ClusterClientOption: ctx.ClusterClientOption,
490494
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
491495
}
492496
endpointSliceCollectController.RunWorkQueue()
@@ -744,6 +748,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
744748
}
745749

746750
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)
751+
752+
rateLimiterGetter := util.GetRateLimiterGetter()
753+
rateLimiterGetter.SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
754+
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}
755+
747756
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
748757
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
749758
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
@@ -756,7 +765,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
756765
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
757766
}
758767

759-
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
768+
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)
760769

761770
resourceDetector := &detector.ResourceDetector{
762771
DiscoveryClientSet: discoverClientSet,
@@ -808,8 +817,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
808817
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
809818
ClusterFailureThreshold: opts.ClusterFailureThreshold,
810819
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
811-
ClusterAPIQPS: opts.ClusterAPIQPS,
812-
ClusterAPIBurst: opts.ClusterAPIBurst,
813820
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
814821
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
815822
EnableTaintManager: opts.EnableTaintManager,
@@ -824,6 +831,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
824831
OverrideManager: overrideManager,
825832
ControlPlaneInformerManager: controlPlaneInformerManager,
826833
ResourceInterpreter: resourceInterpreter,
834+
ClusterClientOption: clusterClientOption,
827835
}
828836

829837
if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {

cmd/descheduler/app/descheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/client-go/tools/clientcmd"
3232
"k8s.io/client-go/tools/leaderelection"
3333
"k8s.io/client-go/tools/leaderelection/resourcelock"
34+
"k8s.io/client-go/util/flowcontrol"
3435
cliflag "k8s.io/component-base/cli/flag"
3536
"k8s.io/component-base/term"
3637
"k8s.io/klog/v2"
@@ -134,7 +135,7 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
134135
if err != nil {
135136
return fmt.Errorf("error building kubeconfig: %s", err.Error())
136137
}
137-
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
138+
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
138139

139140
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
140141
kubeClient := kubernetes.NewForConfigOrDie(restConfig)

cmd/karmada-search/app/karmada-search.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
genericoptions "k8s.io/apiserver/pkg/server/options"
3434
utilversion "k8s.io/apiserver/pkg/util/version"
3535
"k8s.io/client-go/rest"
36+
"k8s.io/client-go/util/flowcontrol"
3637
cliflag "k8s.io/component-base/cli/flag"
3738
"k8s.io/component-base/term"
3839
"k8s.io/klog/v2"
@@ -179,8 +180,7 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
179180
return nil, err
180181
}
181182

182-
serverConfig.ClientConfig.QPS = o.KubeAPIQPS
183-
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
183+
serverConfig.ClientConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
184184
serverConfig.Config.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")
185185

186186
httpClient, err := rest.HTTPClientFor(serverConfig.ClientConfig)

cmd/metrics-adapter/app/options/options.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/client-go/informers"
2929
"k8s.io/client-go/kubernetes"
3030
"k8s.io/client-go/tools/clientcmd"
31+
"k8s.io/client-go/util/flowcontrol"
3132
"k8s.io/component-base/metrics"
3233
"k8s.io/component-base/metrics/legacyregistry"
3334
"k8s.io/klog/v2"
@@ -128,13 +129,15 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
128129
klog.Errorf("Unable to build restConfig: %v", err)
129130
return nil, err
130131
}
131-
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
132+
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
132133

133134
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
134135
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
135136
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
136137
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
137-
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
138+
limiterGetter := util.GetRateLimiterGetter()
139+
limiterGetter.SetLimits(o.ClusterAPIQPS, o.ClusterAPIBurst)
140+
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter})
138141
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
139142
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
140143
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))

cmd/scheduler-estimator/app/scheduler-estimator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/client-go/dynamic"
3030
"k8s.io/client-go/kubernetes"
3131
"k8s.io/client-go/tools/clientcmd"
32+
"k8s.io/client-go/util/flowcontrol"
3233
cliflag "k8s.io/component-base/cli/flag"
3334
"k8s.io/component-base/term"
3435
"k8s.io/klog/v2"
@@ -121,7 +122,7 @@ func run(ctx context.Context, opts *options.Options) error {
121122
if err != nil {
122123
return fmt.Errorf("error building kubeconfig: %s", err.Error())
123124
}
124-
restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst
125+
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
125126

126127
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
127128
dynamicClient := dynamic.NewForConfigOrDie(restConfig)

cmd/scheduler/app/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/client-go/tools/clientcmd"
3333
"k8s.io/client-go/tools/leaderelection"
3434
"k8s.io/client-go/tools/leaderelection/resourcelock"
35+
"k8s.io/client-go/util/flowcontrol"
3536
cliflag "k8s.io/component-base/cli/flag"
3637
"k8s.io/component-base/term"
3738
"k8s.io/klog/v2"
@@ -146,7 +147,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
146147
if err != nil {
147148
return fmt.Errorf("error building kubeconfig: %s", err.Error())
148149
}
149-
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
150+
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
150151

151152
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
152153
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)

cmd/webhook/app/webhook.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net/http"
2525

2626
"github.com/spf13/cobra"
27+
"k8s.io/client-go/util/flowcontrol"
2728
cliflag "k8s.io/component-base/cli/flag"
2829
"k8s.io/component-base/term"
2930
"k8s.io/klog/v2"
@@ -116,7 +117,7 @@ func Run(ctx context.Context, opts *options.Options) error {
116117
if err != nil {
117118
panic(err)
118119
}
119-
config.QPS, config.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
120+
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
120121

121122
hookManager, err := controllerruntime.NewManager(config, controllerruntime.Options{
122123
Logger: klog.Background(),

pkg/controllers/context/context.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
3131
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
3232
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
33+
"github.com/karmada-io/karmada/pkg/util"
3334
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
3435
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
3536
"github.com/karmada-io/karmada/pkg/util/overridemanager"
@@ -67,10 +68,6 @@ type Options struct {
6768
ClusterFailureThreshold metav1.Duration
6869
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
6970
ClusterCacheSyncTimeout metav1.Duration
70-
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
71-
ClusterAPIQPS float32
72-
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
73-
ClusterAPIBurst int
7471
// SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating.
7572
SkippedPropagatingNamespaces []*regexp.Regexp
7673
// ClusterName is the name of cluster.
@@ -112,6 +109,7 @@ type Context struct {
112109
OverrideManager overridemanager.OverrideManager
113110
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
114111
ResourceInterpreter resourceinterpreter.ResourceInterpreter
112+
ClusterClientOption *util.ClientOption
115113
}
116114

117115
// IsControllerEnabled check if a specified controller enabled or not.

0 commit comments

Comments
 (0)