Skip to content

Commit

Permalink
unify http ratelimiter
Browse files Browse the repository at this point in the history
Signed-off-by: zach593 <[email protected]>
  • Loading branch information
zach593 committed Feb 5, 2025
1 parent ae2d1c1 commit 7fa65cf
Show file tree
Hide file tree
Showing 26 changed files with 314 additions and 112 deletions.
14 changes: 8 additions & 6 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -140,8 +141,7 @@ func run(ctx context.Context, opts *options.Options) error {
if err != nil {
return fmt.Errorf("error building kubeconfig of karmada control plane: %w", err)
}
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst

controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
clusterConfig, err := controllerruntime.GetConfig()
if err != nil {
return fmt.Errorf("error building kubeconfig of member cluster: %w", err)
Expand Down Expand Up @@ -265,7 +265,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
return fmt.Errorf("failed to setup custom resource interpreter: %w", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter)
rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, clusterClientOption, resourceInterpreter)
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
Expand All @@ -278,8 +281,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
Expand All @@ -289,6 +290,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
},
StopChan: stopChan,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand All @@ -315,7 +317,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
StopChan: ctx.StopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: ctx.Opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: ctx.Opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: ctx.Opts.ClusterLeaseRenewIntervalFraction,
Expand Down
3 changes: 2 additions & 1 deletion cmd/aggregated-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilversion "k8s.io/apiserver/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"

Expand Down Expand Up @@ -120,7 +121,7 @@ func (o *Options) Run(ctx context.Context) error {
}

restConfig := config.GenericConfig.ClientConfig
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister()
config.GenericConfig.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")

Expand Down
17 changes: 12 additions & 5 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -144,7 +145,7 @@ func Run(ctx context.Context, opts *options.Options) error {
if err != nil {
panic(err)
}
controlPlaneRestConfig.QPS, controlPlaneRestConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
controlPlaneRestConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)
controllerManager, err := controllerruntime.NewManager(controlPlaneRestConfig, controllerruntime.Options{
Logger: klog.Background(),
Scheme: gclient.NewSchema(),
Expand Down Expand Up @@ -324,7 +325,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
ClusterClientOption: ctx.ClusterClientOption,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
Expand Down Expand Up @@ -428,6 +429,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
Expand Down Expand Up @@ -465,6 +467,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
serviceExportController.RunWorkQueue()
Expand All @@ -487,6 +490,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterClientOption: ctx.ClusterClientOption,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
}
endpointSliceCollectController.RunWorkQueue()
Expand Down Expand Up @@ -744,6 +748,10 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)

rateLimiterGetter := util.GetRateLimiterGetter().SetLimits(opts.ClusterAPIQPS, opts.ClusterAPIBurst)
clusterClientOption := &util.ClientOption{RateLimiterGetter: rateLimiterGetter.GetRateLimiter}

// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
Expand All @@ -756,7 +764,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, clusterClientOption, resourceInterpreter)

resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Expand Down Expand Up @@ -808,8 +816,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterSuccessThreshold: opts.ClusterSuccessThreshold,
ClusterFailureThreshold: opts.ClusterFailureThreshold,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedNamespacesRegexps(),
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
EnableTaintManager: opts.EnableTaintManager,
Expand All @@ -824,6 +830,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
ClusterClientOption: clusterClientOption,
}

if err := controllers.StartControllers(controllerContext, controllersDisabledByDefault); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/descheduler/app/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -134,7 +135,7 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)

karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
Expand Down
4 changes: 2 additions & 2 deletions cmd/karmada-search/app/karmada-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
genericoptions "k8s.io/apiserver/pkg/server/options"
utilversion "k8s.io/apiserver/pkg/util/version"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -179,8 +180,7 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
return nil, err
}

serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst
serverConfig.ClientConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)
serverConfig.Config.EffectiveVersion = utilversion.NewEffectiveVersion("1.0")

httpClient, err := rest.HTTPClientFor(serverConfig.ClientConfig)
Expand Down
6 changes: 4 additions & 2 deletions cmd/metrics-adapter/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -128,13 +129,14 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
klog.Errorf("Unable to build restConfig: %v", err)
return nil, err
}
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(o.KubeAPIQPS, o.KubeAPIBurst)

karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
limiterGetter := util.GetRateLimiterGetter().SetLimits(o.ClusterAPIQPS, o.ClusterAPIBurst)
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{RateLimiterGetter: limiterGetter.GetRateLimiter})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
Expand Down
3 changes: 2 additions & 1 deletion cmd/scheduler-estimator/app/scheduler-estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -121,7 +122,7 @@ func run(ctx context.Context, opts *options.Options) error {
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.ClusterAPIQPS, opts.ClusterAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.ClusterAPIQPS, opts.ClusterAPIBurst)

kubeClient := kubernetes.NewForConfigOrDie(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
Expand Down
3 changes: 2 additions & 1 deletion cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -146,7 +147,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
if err != nil {
return fmt.Errorf("error building kubeconfig: %s", err.Error())
}
restConfig.QPS, restConfig.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)

dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
Expand Down
3 changes: 2 additions & 1 deletion cmd/webhook/app/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"

"github.com/spf13/cobra"
"k8s.io/client-go/util/flowcontrol"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -116,7 +117,7 @@ func Run(ctx context.Context, opts *options.Options) error {
if err != nil {
panic(err)
}
config.QPS, config.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(opts.KubeAPIQPS, opts.KubeAPIBurst)

hookManager, err := controllerruntime.NewManager(config, controllerruntime.Options{
Logger: klog.Background(),
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
Expand Down Expand Up @@ -67,10 +68,6 @@ type Options struct {
ClusterFailureThreshold metav1.Duration
// ClusterCacheSyncTimeout is the timeout period waiting for cluster cache to sync.
ClusterCacheSyncTimeout metav1.Duration
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
// SkippedPropagatingNamespaces is a list of namespace regular expressions, matching namespaces will be skipped propagating.
SkippedPropagatingNamespaces []*regexp.Regexp
// ClusterName is the name of cluster.
Expand Down Expand Up @@ -112,6 +109,7 @@ type Context struct {
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
ClusterClientOption *util.ClientOption
}

// IsControllerEnabled check if a specified controller enabled or not.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
clusterClientSetFunc := func(string, client.Client, *util.ClientOption) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
Expand All @@ -241,7 +241,7 @@ func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Contr
InformerManager: informerManager,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil, resourceInterpreter),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type ServiceExportController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
ClusterCacheSyncTimeout metav1.Duration

// eventHandlers holds the handlers which used to handle events reported from member clusters.
Expand Down Expand Up @@ -234,7 +235,7 @@ func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha
func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type EndpointSliceCollectController struct {
InformerManager genericmanager.MultiClusterInformerManager
WorkerNumber int // WorkerNumber is the number of worker goroutines
PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys.
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterDynamicClientSetFunc util.NewClusterDynamicClientSetFunc
ClusterClientOption *util.ClientOption
// eventHandlers holds the handlers which used to handle events reported from member clusters.
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
Expand Down Expand Up @@ -177,7 +178,7 @@ func (c *EndpointSliceCollectController) buildResourceInformers(clusterName stri
func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client, c.ClusterClientOption)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
return err
Expand Down
Loading

0 comments on commit 7fa65cf

Please sign in to comment.