From 8477886f9cd63da7b05ac6fbb37475f57c7a1b30 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Fri, 7 Feb 2025 14:05:04 -0500 Subject: [PATCH] enhance: start worker goroutines on demand Instead of having a bunch of waiting goroutines, this change will introduce one goroutine per type that waits for new tasks. That goroutine will spin up new goroutines, up to the desired limit, to handle tasks as they come in. Signed-off-by: Donnie Adams --- pkg/runtime/controller.go | 53 +++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index 6132f16..46005b8 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" clientgocache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -155,13 +154,8 @@ func (c *controller) run(ctx context.Context, workers int) { // Start the informer factories to begin populating the informer caches log.Infof("Starting %s controller", c.name) - for i := 0; i < workers; i++ { - go wait.Until(func() { - c.runWorker(ctx) - }, time.Second, ctx.Done()) - } + c.runWorkers(ctx, workers) - <-ctx.Done() c.startLock.Lock() defer c.startLock.Unlock() c.started = false @@ -217,26 +211,41 @@ func (c *controller) Start(ctx context.Context, workers int) error { return nil } -func (c *controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} +func (c *controller) runWorkers(ctx context.Context, workers int) { + wait := sync.WaitGroup{} + running := make(chan struct{}, workers) + + defer func() { + defer wait.Wait() + }() -func (c *controller) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.workqueue.Get() + defer close(running) - if shutdown { - return false - } + go func() { + <-ctx.Done() + c.workqueue.ShutDown() + }() + + for { + obj, shutdown := c.workqueue.Get() - if err := c.processSingleItem(ctx, obj); err != nil { - if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { - log.Errorf("%v", err) + if shutdown { + return } - return true - } - return true + running <- struct{}{} + wait.Add(1) + + go func() { + if err := c.processSingleItem(ctx, obj); err != nil { + if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") { + log.Errorf("%v", err) + } + } + <-running + wait.Done() + }() + } } func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {