Skip to content

Commit 8e09bf6

Browse files
authored
Merge pull request #6271 from ctripcloud/context
replace stopCh with context
2 parents 707360f + 273e959 commit 8e09bf6

File tree

65 files changed

+404
-607
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+404
-607
lines changed

cmd/agent/app/agent.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func run(ctx context.Context, opts *options.Options) error {
233233
ctrlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
234234
ctrlmetrics.Registry.MustRegister(metrics.NewBuildInfoCollector())
235235

236-
if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
236+
if err = setupControllers(ctx, controllerManager, opts); err != nil {
237237
return err
238238
}
239239

@@ -245,18 +245,18 @@ func run(ctx context.Context, opts *options.Options) error {
245245
return nil
246246
}
247247

248-
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) error {
248+
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) error {
249249
restConfig := mgr.GetConfig()
250250
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
251-
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
251+
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(ctx, dynamicClientSet, 0)
252252
controlPlaneKubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
253253

254254
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
255255
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
256256
sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0)
257257
serviceLister := sharedFactory.Core().V1().Services().Lister()
258-
sharedFactory.Start(stopChan)
259-
sharedFactory.WaitForCacheSync(stopChan)
258+
sharedFactory.Start(ctx.Done())
259+
sharedFactory.WaitForCacheSync(ctx.Done())
260260

261261
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
262262
if err := mgr.Add(resourceInterpreter); err != nil {
@@ -285,7 +285,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
285285
CertRotationRemainingTimeThreshold: opts.CertRotationRemainingTimeThreshold,
286286
KarmadaKubeconfigNamespace: opts.KarmadaKubeconfigNamespace,
287287
},
288-
StopChan: stopChan,
288+
Context: ctx,
289289
ResourceInterpreter: resourceInterpreter,
290290
}
291291

@@ -295,7 +295,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
295295

296296
// Ensure the InformerManager stops when the stop channel closes
297297
go func() {
298-
<-stopChan
298+
<-ctx.Done()
299299
genericmanager.StopInstance()
300300
}()
301301

@@ -310,7 +310,6 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
310310
PredicateFunc: helper.NewClusterPredicateOnAgent(ctx.Opts.ClusterName),
311311
TypedInformerManager: typedmanager.GetInstance(),
312312
GenericInformerManager: genericmanager.GetInstance(),
313-
StopChan: ctx.StopChan,
314313
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
315314
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
316315
ClusterClientOption: &util.ClientOption{QPS: ctx.Opts.ClusterAPIQPS, Burst: ctx.Opts.ClusterAPIBurst},
@@ -351,7 +350,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
351350
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
352351
RESTMapper: ctx.Mgr.GetRESTMapper(),
353352
InformerManager: genericmanager.GetInstance(),
354-
StopChan: ctx.StopChan,
353+
Context: ctx.Context,
355354
ObjectWatcher: ctx.ObjectWatcher,
356355
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
357356
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
@@ -373,7 +372,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
373372
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
374373
RESTMapper: ctx.Mgr.GetRESTMapper(),
375374
InformerManager: genericmanager.GetInstance(),
376-
StopChan: ctx.StopChan,
375+
Context: ctx.Context,
377376
WorkerNumber: 3,
378377
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
379378
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
@@ -399,7 +398,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
399398
Client: ctx.Mgr.GetClient(),
400399
RESTMapper: ctx.Mgr.GetRESTMapper(),
401400
InformerManager: genericmanager.GetInstance(),
402-
StopChan: ctx.StopChan,
401+
Context: ctx.Context,
403402
WorkerNumber: 3,
404403
PredicateFunc: helper.NewPredicateForEndpointSliceCollectControllerOnAgent(opts.ClusterName),
405404
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,

cmd/controller-manager/app/controllermanager.go

+14-16
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func Run(ctx context.Context, opts *options.Options) error {
202202
klog.Fatalf("Failed to register index for Work based on ClusterResourceBinding ID: %v", err)
203203
}
204204

205-
setupControllers(controllerManager, opts, ctx.Done())
205+
setupControllers(ctx, controllerManager, opts)
206206

207207
// blocks until the context is done.
208208
if err := controllerManager.Start(ctx); err != nil {
@@ -289,7 +289,6 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e
289289
func startClusterStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
290290
mgr := ctx.Mgr
291291
opts := ctx.Opts
292-
stopChan := ctx.StopChan
293292
clusterPredicateFunc := predicate.Funcs{
294293
CreateFunc: func(createEvent event.CreateEvent) bool {
295294
obj := createEvent.Object.(*clusterv1alpha1.Cluster)
@@ -329,7 +328,6 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
329328
PredicateFunc: clusterPredicateFunc,
330329
TypedInformerManager: typedmanager.GetInstance(),
331330
GenericInformerManager: genericmanager.GetInstance(),
332-
StopChan: stopChan,
333331
ClusterClientSetFunc: util.NewClusterClientSet,
334332
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
335333
ClusterClientOption: &util.ClientOption{QPS: opts.ClusterAPIQPS, Burst: opts.ClusterAPIBurst},
@@ -432,7 +430,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
432430
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
433431
RESTMapper: ctx.Mgr.GetRESTMapper(),
434432
InformerManager: genericmanager.GetInstance(),
435-
StopChan: ctx.StopChan,
433+
Context: ctx.Context,
436434
ObjectWatcher: ctx.ObjectWatcher,
437435
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
438436
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
@@ -470,7 +468,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
470468
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
471469
RESTMapper: ctx.Mgr.GetRESTMapper(),
472470
InformerManager: genericmanager.GetInstance(),
473-
StopChan: ctx.StopChan,
471+
Context: ctx.Context,
474472
WorkerNumber: 3,
475473
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
476474
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
@@ -496,7 +494,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
496494
Client: ctx.Mgr.GetClient(),
497495
RESTMapper: ctx.Mgr.GetRESTMapper(),
498496
InformerManager: genericmanager.GetInstance(),
499-
StopChan: ctx.StopChan,
497+
Context: ctx.Context,
500498
WorkerNumber: 3,
501499
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
502500
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
@@ -639,7 +637,7 @@ func startFederatedHorizontalPodAutoscalerController(ctx controllerscontext.Cont
639637
go custom_metrics.PeriodicallyInvalidate(
640638
apiVersionsGetter,
641639
ctx.Opts.HPAControllerConfiguration.HorizontalPodAutoscalerSyncPeriod.Duration,
642-
ctx.StopChan)
640+
ctx.Context.Done())
643641
metricsClient := metricsclient.NewRESTMetricsClient(
644642
resourceclient.NewForConfigOrDie(ctx.Mgr.GetConfig()),
645643
custom_metrics.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), apiVersionsGetter),
@@ -758,7 +756,7 @@ func startAgentCSRApprovingController(ctx controllerscontext.Context) (enabled b
758756
}
759757

760758
// setupControllers initialize controllers and setup one by one.
761-
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
759+
func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) {
762760
restConfig := mgr.GetConfig()
763761
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
764762
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
@@ -771,13 +769,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
771769
return
772770
}
773771

774-
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, opts.ResyncPeriod.Duration, stopChan)
772+
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(ctx, dynamicClientSet, opts.ResyncPeriod.Duration)
775773
// We need a service lister to build a resource interpreter with `ClusterIPServiceResolver`
776774
// witch allows connection to the customized interpreter webhook without a cluster DNS service.
777775
sharedFactory := informers.NewSharedInformerFactory(kubeClientSet, opts.ResyncPeriod.Duration)
778776
serviceLister := sharedFactory.Core().V1().Services().Lister()
779-
sharedFactory.Start(stopChan)
780-
sharedFactory.WaitForCacheSync(stopChan)
777+
sharedFactory.Start(ctx.Done())
778+
sharedFactory.WaitForCacheSync(ctx.Done())
781779

782780
resourceInterpreter := resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
783781
if err := mgr.Add(resourceInterpreter); err != nil {
@@ -821,7 +819,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
821819
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
822820
}
823821
}
824-
setupClusterAPIClusterDetector(mgr, opts, stopChan)
822+
setupClusterAPIClusterDetector(ctx, mgr, opts)
825823
controllerContext := controllerscontext.Context{
826824
Mgr: mgr,
827825
ObjectWatcher: objectWatcher,
@@ -847,7 +845,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
847845
EnableClusterResourceModeling: opts.EnableClusterResourceModeling,
848846
HPAControllerConfiguration: opts.HPAControllerConfiguration,
849847
},
850-
StopChan: stopChan,
848+
Context: ctx,
851849
DynamicClientSet: dynamicClientSet,
852850
KubeClientSet: kubeClientSet,
853851
OverrideManager: overrideManager,
@@ -861,13 +859,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
861859

862860
// Ensure the InformerManager stops when the stop channel closes
863861
go func() {
864-
<-stopChan
862+
<-ctx.Done()
865863
genericmanager.StopInstance()
866864
}()
867865
}
868866

869867
// setupClusterAPIClusterDetector initialize Cluster detector with the cluster-api management cluster.
870-
func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
868+
func setupClusterAPIClusterDetector(ctx context.Context, mgr controllerruntime.Manager, opts *options.Options) {
871869
if len(opts.ClusterAPIKubeconfig) == 0 {
872870
return
873871
}
@@ -888,7 +886,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options
888886
ControllerPlaneConfig: mgr.GetConfig(),
889887
ClusterAPIConfig: clusterAPIRestConfig,
890888
ClusterAPIClient: clusterAPIClient,
891-
InformerManager: genericmanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan),
889+
InformerManager: genericmanager.NewSingleClusterInformerManager(ctx, dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0),
892890
ConcurrentReconciles: 3,
893891
}
894892
if err := mgr.Add(clusterAPIClusterDetector); err != nil {

cmd/descheduler/app/descheduler.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ const (
7979
)
8080

8181
// NewDeschedulerCommand creates a *cobra.Command object with default parameters
82-
func NewDeschedulerCommand(stopChan <-chan struct{}) *cobra.Command {
82+
func NewDeschedulerCommand(ctx context.Context) *cobra.Command {
8383
opts := options.NewOptions()
8484

8585
cmd := &cobra.Command{
@@ -92,7 +92,7 @@ karmada-scheduler-estimator to get replica status.`,
9292
if errs := opts.Validate(); len(errs) != 0 {
9393
return errs.ToAggregate()
9494
}
95-
if err := run(opts, stopChan); err != nil {
95+
if err := run(ctx, opts); err != nil {
9696
return err
9797
}
9898
return nil
@@ -125,7 +125,7 @@ karmada-scheduler-estimator to get replica status.`,
125125
return cmd
126126
}
127127

128-
func run(opts *options.Options, stopChan <-chan struct{}) error {
128+
func run(ctx context.Context, opts *options.Options) error {
129129
klog.Infof("karmada-descheduler version: %s", version.Get())
130130
klog.Infof("Please make sure the karmada-scheduler-estimator of all member clusters has been deployed")
131131

@@ -144,12 +144,6 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
144144
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
145145
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
146146

147-
ctx, cancel := context.WithCancel(context.Background())
148-
go func() {
149-
<-stopChan
150-
cancel()
151-
}()
152-
153147
desched := descheduler.NewDescheduler(karmadaClient, kubeClient, opts)
154148
if !opts.LeaderElection.LeaderElect {
155149
desched.Run(ctx)

cmd/descheduler/app/descheduler_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package app
1818

1919
import (
20+
"context"
2021
"net/http"
2122
"testing"
2223
"time"
@@ -29,8 +30,8 @@ import (
2930
)
3031

3132
func TestNewDeschedulerCommand(t *testing.T) {
32-
stopCh := make(chan struct{})
33-
cmd := NewDeschedulerCommand(stopCh)
33+
ctx := context.Background()
34+
cmd := NewDeschedulerCommand(ctx)
3435

3536
assert.NotNil(t, cmd)
3637
assert.Equal(t, names.KarmadaDeschedulerComponentName, cmd.Use)
@@ -51,8 +52,8 @@ func TestDeschedulerCommandFlagParsing(t *testing.T) {
5152
}
5253
for _, tc := range testCases {
5354
t.Run(tc.name, func(t *testing.T) {
54-
stopCh := make(chan struct{})
55-
cmd := NewDeschedulerCommand(stopCh)
55+
ctx := context.Background()
56+
cmd := NewDeschedulerCommand(ctx)
5657
cmd.SetArgs(tc.args)
5758
err := cmd.ParseFlags(tc.args)
5859
if tc.expectError {

cmd/descheduler/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
)
2828

2929
func main() {
30-
stopChan := controllerruntime.SetupSignalHandler().Done()
31-
command := app.NewDeschedulerCommand(stopChan)
30+
ctx := controllerruntime.SetupSignalHandler()
31+
command := app.NewDeschedulerCommand(ctx)
3232
code := cli.Run(command)
3333
os.Exit(code)
3434
}

cmd/karmada-search/app/karmada-search.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ func run(ctx context.Context, o *options.Options, registryOptions ...Option) err
140140
if config.ExtraConfig.Controller != nil {
141141
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
142142
// start ResourceRegistry controller
143-
config.ExtraConfig.Controller.Start(context.Done())
143+
config.ExtraConfig.Controller.Start(context)
144144
return nil
145145
})
146146
}
147147

148148
if config.ExtraConfig.ProxyController != nil {
149149
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
150-
config.ExtraConfig.ProxyController.Start(context.Done())
150+
config.ExtraConfig.ProxyController.Start(context)
151151
return nil
152152
})
153153

cmd/metrics-adapter/app/options/options.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
124124
}
125125

126126
// Config returns config for the metrics-adapter server given Options
127-
func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer, error) {
127+
func (o *Options) Config(ctx context.Context) (*metricsadapter.MetricsServer, error) {
128128
restConfig, err := clientcmd.BuildConfigFromFlags("", o.KubeConfig)
129129
if err != nil {
130130
klog.Errorf("Unable to build restConfig: %v", err)
@@ -136,7 +136,7 @@ func (o *Options) Config(stopCh <-chan struct{}) (*metricsadapter.MetricsServer,
136136
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
137137
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
138138
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
139-
metricsController := metricsadapter.NewMetricsController(stopCh, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
139+
metricsController := metricsadapter.NewMetricsController(ctx, restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
140140
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
141141
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
142142
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
@@ -186,8 +186,7 @@ func (o *Options) Run(ctx context.Context) error {
186186

187187
profileflag.ListenAndServe(o.ProfileOpts)
188188

189-
stopCh := ctx.Done()
190-
metricsServer, err := o.Config(stopCh)
189+
metricsServer, err := o.Config(ctx)
191190
if err != nil {
192191
return err
193192
}

cmd/scheduler-estimator/app/scheduler-estimator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func run(ctx context.Context, opts *options.Options) error {
132132
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
133133
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
134134

135-
e, err := server.NewEstimatorServer(kubeClient, dynamicClient, discoveryClient, opts, ctx.Done())
135+
e, err := server.NewEstimatorServer(ctx, kubeClient, dynamicClient, discoveryClient, opts)
136136
if err != nil {
137137
klog.Errorf("Fail to create estimator server: %v", err)
138138
return err

cmd/scheduler/app/scheduler.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func WithPlugin(name string, factory runtime.PluginFactory) Option {
9191
}
9292

9393
// NewSchedulerCommand creates a *cobra.Command object with default parameters
94-
func NewSchedulerCommand(stopChan <-chan struct{}, registryOptions ...Option) *cobra.Command {
94+
func NewSchedulerCommand(ctx context.Context, registryOptions ...Option) *cobra.Command {
9595
opts := options.NewOptions()
9696

9797
cmd := &cobra.Command{
@@ -105,7 +105,7 @@ the most suitable cluster.`,
105105
if errs := opts.Validate(); len(errs) != 0 {
106106
return errs.ToAggregate()
107107
}
108-
if err := run(opts, stopChan, registryOptions...); err != nil {
108+
if err := run(ctx, opts, registryOptions...); err != nil {
109109
return err
110110
}
111111
return nil
@@ -138,7 +138,7 @@ the most suitable cluster.`,
138138
return cmd
139139
}
140140

141-
func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Option) error {
141+
func run(ctx context.Context, opts *options.Options, registryOptions ...Option) error {
142142
klog.Infof("karmada-scheduler version: %s", version.Get())
143143

144144
ctrlmetrics.Registry.MustRegister(versionmetrics.NewBuildInfoCollector())
@@ -156,11 +156,6 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt
156156
karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
157157
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
158158

159-
ctx, cancel := context.WithCancel(context.Background())
160-
go func() {
161-
<-stopChan
162-
cancel()
163-
}()
164159
outOfTreeRegistry := make(runtime.Registry)
165160
for _, option := range registryOptions {
166161
if err := option(outOfTreeRegistry); err != nil {

0 commit comments

Comments
 (0)