Skip to content

Commit

Permalink
feat: add the ability to use multiple worker queues per GVK
Browse files Browse the repository at this point in the history
A QueueSplitter interface is added that allows the caller to have
multiple worker queues per GVK. This allows for a sort of priority-based
queuing mechanism.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Feb 10, 2025
1 parent 1f6217c commit 764e330
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Callback func(gvk schema.GroupVersionKind, key string, obj runtime.Object) (runtime.Object, error)

type Trigger interface {
Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error
Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error
}

type Watcher interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *HandlerSet) checkDelay(gvk schema.GroupVersionKind, key string) bool {
m.limiterLock.Lock()
defer m.limiterLock.Unlock()
delete(m.waiting, lKey)
_ = m.backend.Trigger(gvk, ReplayPrefix+key, 0)
_ = m.backend.Trigger(m.ctx, gvk, ReplayPrefix+key, 0)
}()
return false
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO
req.Object = newObj

if resp.delay > 0 {
if err := m.backend.Trigger(gvk, key, resp.delay); err != nil {
if err := m.backend.Trigger(m.ctx, gvk, key, resp.delay); err != nil {
return nil, err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *triggers) invokeTriggers(req Request) {
for _, matcher := range matchers {
if matcher.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v]", et.key, et.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(et.gvk, et.key, 0)
_ = m.trigger.Trigger(req.Ctx, et.gvk, et.key, 0)
break
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (m *triggers) UnregisterAndTrigger(req Request) {
}
if targetGVK == req.GVK && mt.Match(req.Namespace, req.Name, req.Object) {
log.Debugf("Triggering [%s] [%v] from [%s] [%v] on delete", target.key, target.gvk, req.Key, req.GVK)
_ = m.trigger.Trigger(target.gvk, target.key, 0)
_ = m.trigger.Trigger(req.Ctx, target.gvk, target.key, 0)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) {
return nil
}

func (b *Backend) Trigger(gvk schema.GroupVersionKind, key string, delay time.Duration) error {
controller, err := b.cacheFactory.ForKind(gvk)
func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key string, delay time.Duration) error {
controller, err := b.cacheFactory.ForKind(ctx, gvk)
if err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e
}

func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error {
c, err := b.cacheFactory.ForKind(gvk)
c, err := b.cacheFactory.ForKind(ctx, gvk)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/runtime/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type Runtime struct {

type Config struct {
GroupConfig
GVKThreadiness map[schema.GroupVersionKind]int
GVKThreadiness map[schema.GroupVersionKind]int
GVKQueueSplitters map[schema.GroupVersionKind]WorkerQueueSplitter
}

type GroupConfig struct {
Expand Down Expand Up @@ -69,7 +70,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Grou
aggCache := multi.NewCache(scheme, theCache, caches)

factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{
KindWorkers: defaultConfig.GVKThreadiness,
KindWorkers: defaultConfig.GVKThreadiness,
KindQueueSplitter: defaultConfig.GVKQueueSplitters,
// In nah this is only invoked when a key fails to process
DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter(
// This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes
Expand Down
132 changes: 84 additions & 48 deletions pkg/runtime/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type controller struct {
startLock sync.Mutex

name string
workqueue workqueue.TypedRateLimitingInterface[any]
workqueues []workqueue.TypedRateLimitingInterface[any]
rateLimiter workqueue.TypedRateLimiter[any]
informer cache.Informer
handler Handler
Expand All @@ -58,6 +58,7 @@ type controller struct {
registration clientgocache.ResourceEventHandlerRegistration
obj runtime.Object
cache cache.Cache
splitter WorkerQueueSplitter
}

type startKey struct {
Expand All @@ -66,18 +67,34 @@ type startKey struct {
}

type Options struct {
RateLimiter workqueue.TypedRateLimiter[any]
RateLimiter workqueue.TypedRateLimiter[any]
QueueSplitter WorkerQueueSplitter
}

func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) {
type WorkerQueueSplitter interface {
Queues() int
Split(key string) int
}

type singleWorkerQueueSplitter struct{}

func (*singleWorkerQueueSplitter) Queues() int {
return 1
}

func (*singleWorkerQueueSplitter) Split(string) int {
return 0
}

func New(ctx context.Context, gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache, handler Handler, opts *Options) (Controller, error) {
opts = applyDefaultOptions(opts)

obj, err := newObject(scheme, gvk)
if err != nil {
return nil, err
}

informer, err := cache.GetInformerForKind(context.TODO(), gvk)
informer, err := cache.GetInformerForKind(ctx, gvk)
if err != nil {
return nil, err
}
Expand All @@ -90,6 +107,7 @@ func New(gvk schema.GroupVersionKind, scheme *runtime.Scheme, cache cache.Cache,
obj: obj,
rateLimiter: opts.RateLimiter,
informer: informer,
splitter: opts.QueueSplitter,
}

return controller, nil
Expand All @@ -114,6 +132,9 @@ func applyDefaultOptions(opts *Options) *Options {
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 30*time.Second),
)
}
if newOpts.QueueSplitter == nil {
newOpts.QueueSplitter = (*singleWorkerQueueSplitter)(nil)
}
return &newOpts
}

Expand All @@ -135,12 +156,15 @@ func (c *controller) run(ctx context.Context, workers int) {
// will create a goroutine under the hood. It we instantiate a workqueue we must have
// a mechanism to Shutdown it down. Without the stopCh we don't know when to shutdown
// the queue and release the goroutine
c.workqueue = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: c.name})
c.workqueues = make([]workqueue.TypedRateLimitingInterface[any], c.splitter.Queues())
for i := range c.workqueues {
c.workqueues[i] = workqueue.NewTypedRateLimitingQueueWithConfig(c.rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{Name: fmt.Sprintf("%s-%d", c.name, i)})
}
for _, start := range c.startKeys {
if start.after == 0 {
c.workqueue.Add(start.key)
c.workqueues[c.splitter.Split(start.key)].Add(start.key)
} else {
c.workqueue.AddAfter(start.key, start.after)
c.workqueues[c.splitter.Split(start.key)].AddAfter(start.key, start.after)
}
}
c.startKeys = nil
Expand Down Expand Up @@ -210,63 +234,71 @@ func (c *controller) Start(ctx context.Context, workers int) error {

func (c *controller) runWorkers(ctx context.Context, workers int) {
wait := sync.WaitGroup{}
running := make(chan struct{}, workers)
workers = workers / len(c.workqueues)
if workers == 0 {
workers = 1
}

defer func() {
defer wait.Wait()
}()

defer close(running)

go func() {
<-ctx.Done()
c.workqueue.ShutDown()
}()

for {
obj, shutdown := c.workqueue.Get()

if shutdown {
return
}
for _, queue := range c.workqueues {
go func() {
running := make(chan struct{}, workers)
defer close(running)

running <- struct{}{}
wait.Add(1)
for {
obj, shutdown := queue.Get()

go func() {
defer func() {
<-running
wait.Done()
}()

if err := c.processSingleItem(ctx, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
if shutdown {
return
}

running <- struct{}{}
wait.Add(1)

go func() {
defer func() {
<-running
wait.Done()
}()

if err := c.processSingleItem(ctx, queue, obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
}
}()
}
}()
}

<-ctx.Done()
for i := range c.workqueues {
c.workqueues[i].ShutDown()
}
}

func (c *controller) processSingleItem(ctx context.Context, obj interface{}) error {
func (c *controller) processSingleItem(ctx context.Context, queue workqueue.TypedRateLimitingInterface[any], obj interface{}) error {
var (
key string
ok bool
)

defer c.workqueue.Done(obj)
defer queue.Done(obj)

if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
queue.Forget(obj)
log.Errorf("expected string in workqueue but got %#v", obj)
return nil
}
if err := c.syncHandler(ctx, key); err != nil {
c.workqueue.AddRateLimited(key)
queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}

c.workqueue.Forget(obj)
queue.Forget(obj)
return nil
}

Expand All @@ -280,7 +312,7 @@ func (c *controller) syncHandler(ctx context.Context, key string) error {
return c.handler.OnChange(key, nil)
}

ns, name := keyParse(key)
ns, name := KeyParse(key)
obj := c.obj.DeepCopyObject().(kclient.Object)
err := c.cache.Get(ctx, kclient.ObjectKey{
Name: name,
Expand All @@ -299,10 +331,10 @@ func (c *controller) EnqueueKey(key string) {
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
c.workqueues[c.splitter.Split(key)].Add(key)
}
}

Expand All @@ -312,10 +344,10 @@ func (c *controller) Enqueue(namespace, name string) {
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.AddRateLimited(key)
c.workqueues[c.splitter.Split(key)].AddRateLimited(key)
}
}

Expand All @@ -325,15 +357,19 @@ func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration
c.startLock.Lock()
defer c.startLock.Unlock()

if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key, after: duration})
} else {
c.workqueue.AddAfter(key, duration)
c.workqueues[c.splitter.Split(key)].AddAfter(key, duration)
}
}

func keyParse(key string) (namespace string, name string) {
var ok bool
func KeyParse(key string) (namespace string, name string) {
special, key, ok := strings.Cut(key, " ")
if !ok {
key = special
}

namespace, name, ok = strings.Cut(key, "/")
if !ok {
name = namespace
Expand All @@ -357,10 +393,10 @@ func (c *controller) enqueue(obj interface{}) {
return
}
c.startLock.Lock()
if c.workqueue == nil {
if c.workqueues == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
c.workqueues[c.splitter.Split(key)].Add(key)
}
c.startLock.Unlock()
}
Expand Down
Loading

0 comments on commit 764e330

Please sign in to comment.