Skip to content

Commit

Permalink
enhance: start worker goroutines on demand
Browse files Browse the repository at this point in the history
Instead of having a bunch of waiting goroutines, this change will
introduce one goroutine per type that waits for new tasks. That
goroutine will spin up new goroutines, up to the desired limit, to
handle tasks as they come in.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Feb 7, 2025
1 parent 3b7c581 commit 8477886
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions pkg/runtime/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientgocache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -155,13 +154,8 @@ func (c *controller) run(ctx context.Context, workers int) {
// Start the informer factories to begin populating the informer caches
log.Infof("Starting %s controller", c.name)

for i := 0; i < workers; i++ {
go wait.Until(func() {
c.runWorker(ctx)
}, time.Second, ctx.Done())
}
c.runWorkers(ctx, workers)

<-ctx.Done()
c.startLock.Lock()
defer c.startLock.Unlock()
c.started = false
Expand Down Expand Up @@ -217,26 +211,41 @@ func (c *controller) Start(ctx context.Context, workers int) error {
return nil
}

func (c *controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *controller) runWorkers(ctx context.Context, workers int) {
wait := sync.WaitGroup{}
running := make(chan struct{}, workers)

defer func() {
defer wait.Wait()
}()

func (c *controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
defer close(running)

if shutdown {
return false
}
go func() {
<-ctx.Done()
c.workqueue.ShutDown()
}()

for {
obj, shutdown := c.workqueue.Get()

if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
if shutdown {
return
}
return true
}

return true
running <- struct{}{}
wait.Add(1)

go func() {
if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
}
<-running
wait.Done()
}()
}
}

func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {
Expand Down

0 comments on commit 8477886

Please sign in to comment.