diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..46005b8 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -155,13 +154,8 @@ func (c *controller) run(ctx context.Context, workers int) { // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,26 +211,41 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + running := make(chan struct{}, workers) + + defer func() { + defer wait.Wait() + }() -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer close(running) - if shutdown { - return false - } + go func() { + <-ctx.Done() + c.workqueue.ShutDown() + }() + + for { + obj, shutdown := c.workqueue.Get() - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + if shutdown { + return } - return true - } - return true + running <- struct{}{} + wait.Add(1) + + go func() { + if err := c.processSingleItem(ctx, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + <-running + wait.Done() + }() + } } func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {