From 3b7c581712f6ae616964b75852e70016c9fee218 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Thu, 6 Feb 2025 15:10:22 -0500 Subject: [PATCH] enhance: expose GVK-based threadiness to top-level router This allows callers to set the threadiness based on GVKs. Signed-off-by: Donnie Adams --- pkg/leader/leader.go | 2 +- pkg/log/log.go | 6 +++--- pkg/runtime/backend.go | 4 ++-- pkg/runtime/clients.go | 23 +++++++++++++++++------ pkg/runtime/sharedcontrollerfactory.go | 24 ++++++++++-------------- router.go | 15 +++++++++------ 6 files changed, 42 insertions(+), 32 deletions(-) diff --git a/pkg/leader/leader.go b/pkg/leader/leader.go index fd205c1..baafeb0 100644 --- a/pkg/leader/leader.go +++ b/pkg/leader/leader.go @@ -30,7 +30,7 @@ type ElectionConfig struct { func NewDefaultElectionConfig(namespace, name string, cfg *rest.Config) *ElectionConfig { ttl := defaultLeaderTTL - if os.Getenv("BAAAH_DEV_MODE") != "" { + if os.Getenv("NAH_DEV_MODE") != "" { ttl = devLeaderTTL } return &ElectionConfig{ diff --git a/pkg/log/log.go b/pkg/log/log.go index acfda04..862a6c4 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -17,13 +17,13 @@ var ( //log.Printf("INFO: "+message+"\n", obj...) } Warnf = func(message string, obj ...interface{}) { - log.Printf("WARN [BAAAH]: "+message+"\n", obj...) + log.Printf("WARN [NAH]: "+message+"\n", obj...) } Errorf = func(message string, obj ...interface{}) { - log.Printf("ERROR[BAAAH]: "+message+"\n", obj...) + log.Printf("ERROR[NAH]: "+message+"\n", obj...) } Fatalf = func(message string, obj ...interface{}) { - log.Fatalf("FATAL[BAAAH]: "+message+"\n", obj...) + log.Fatalf("FATAL[NAH]: "+message+"\n", obj...) } Debugf = func(message string, obj ...interface{}) { //log.Printf("DEBUG: "+message+"\n", obj...) diff --git a/pkg/runtime/backend.go b/pkg/runtime/backend.go index b7c9119..86b9895 100644 --- a/pkg/runtime/backend.go +++ b/pkg/runtime/backend.go @@ -24,7 +24,7 @@ import ( var DefaultThreadiness = 5 func init() { - i, _ := strconv.Atoi(os.Getenv("BAAAH_THREADINESS")) + i, _ := strconv.Atoi(os.Getenv("NAH_THREADINESS")) if i > 0 { DefaultThreadiness = i } @@ -67,7 +67,7 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) { if preloadOnly { err = b.cacheFactory.Preload(ctx) } else { - err = b.cacheFactory.Start(ctx, DefaultThreadiness) + err = b.cacheFactory.Start(ctx) } if err != nil { return err diff --git a/pkg/runtime/clients.go b/pkg/runtime/clients.go index 36430dd..34165d4 100644 --- a/pkg/runtime/clients.go +++ b/pkg/runtime/clients.go @@ -6,6 +6,7 @@ import ( "github.com/obot-platform/nah/pkg/mapper" "github.com/obot-platform/nah/pkg/runtime/multi" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -17,23 +18,32 @@ type Runtime struct { } type Config struct { + GroupConfig + GVKThreadiness map[schema.GroupVersionKind]int +} + +type GroupConfig struct { Rest *rest.Config Namespace string } func NewRuntime(cfg *rest.Config, scheme *runtime.Scheme) (*Runtime, error) { - return NewRuntimeWithConfig(Config{Rest: cfg}, scheme) + return NewRuntimeWithConfig(Config{ + GroupConfig: GroupConfig{ + Rest: cfg, + }, + }, scheme) } func NewRuntimeForNamespace(cfg *rest.Config, namespace string, scheme *runtime.Scheme) (*Runtime, error) { - return NewRuntimeWithConfigs(Config{Rest: cfg, Namespace: namespace}, nil, scheme) + return NewRuntimeWithConfigs(Config{GroupConfig: GroupConfig{Rest: cfg, Namespace: namespace}}, nil, scheme) } func NewRuntimeWithConfig(cfg Config, scheme *runtime.Scheme) (*Runtime, error) { return NewRuntimeWithConfigs(cfg, nil, scheme) } -func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Config, scheme *runtime.Scheme) (*Runtime, error) { +func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]GroupConfig, scheme *runtime.Scheme) (*Runtime, error) { clients := make(map[string]client.WithWatch, len(apiGroupConfigs)) cachedClients := make(map[string]client.Client, len(apiGroupConfigs)) caches := make(map[string]cache.Cache, len(apiGroupConfigs)) @@ -49,7 +59,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf cachedClients[key] = cachedClient } - uncachedClient, cachedClient, theCache, err := getClients(defaultConfig, scheme) + uncachedClient, cachedClient, theCache, err := getClients(defaultConfig.GroupConfig, scheme) if err != nil { return nil, err } @@ -59,7 +69,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf aggCache := multi.NewCache(scheme, theCache, caches) factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{ - // In baaah this is only invoked when a key fails to process + KindWorkers: defaultConfig.GVKThreadiness, + // In nah this is only invoked when a key fails to process DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter( // This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes workqueue.NewTypedItemExponentialFailureRateLimiter[any](500*time.Millisecond, 15*time.Minute), @@ -71,7 +82,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf }, nil } -func getClients(cfg Config, scheme *runtime.Scheme) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) { +func getClients(cfg GroupConfig, scheme *runtime.Scheme) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) { mapper, err := mapper.New(cfg.Rest) if err != nil { return nil, nil, nil, err diff --git a/pkg/runtime/sharedcontrollerfactory.go b/pkg/runtime/sharedcontrollerfactory.go index 781928e..32b0f9b 100644 --- a/pkg/runtime/sharedcontrollerfactory.go +++ b/pkg/runtime/sharedcontrollerfactory.go @@ -13,7 +13,7 @@ import ( type SharedControllerFactory interface { ForKind(gvk schema.GroupVersionKind) (SharedController, error) Preload(ctx context.Context) error - Start(ctx context.Context, workers int) error + Start(ctx context.Context) error } type SharedControllerFactoryOptions struct { @@ -58,19 +58,19 @@ func applyDefaultSharedOptions(opts *SharedControllerFactoryOptions) *SharedCont newOpts = *opts } if newOpts.DefaultWorkers == 0 { - newOpts.DefaultWorkers = 5 + newOpts.DefaultWorkers = DefaultThreadiness } return &newOpts } func (s *sharedControllerFactory) Preload(ctx context.Context) error { - return s.start(ctx, 0) + return s.loadAndStart(ctx, false) } -func (s *sharedControllerFactory) Start(ctx context.Context, defaultWorkers int) error { - return s.start(ctx, defaultWorkers) +func (s *sharedControllerFactory) Start(ctx context.Context) error { + return s.loadAndStart(ctx, true) } -func (s *sharedControllerFactory) start(ctx context.Context, defaultWorkers int) error { +func (s *sharedControllerFactory) loadAndStart(ctx context.Context, start bool) error { s.controllerLock.Lock() defer s.controllerLock.Unlock() @@ -99,9 +99,9 @@ func (s *sharedControllerFactory) start(ctx context.Context, defaultWorkers int) s.cache.WaitForCacheSync(ctx) s.controllerLock.Lock() - if defaultWorkers != 0 { + if start { for gvk, controller := range controllersCopy { - w, err := s.getWorkers(gvk, defaultWorkers) + w, err := s.getWorkers(gvk) if err != nil { return err } @@ -150,14 +150,10 @@ func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedCo return controllerResult, nil } -func (s *sharedControllerFactory) getWorkers(gvk schema.GroupVersionKind, workers int) (int, error) { - w, ok := s.kindWorkers[gvk] - if ok { +func (s *sharedControllerFactory) getWorkers(gvk schema.GroupVersionKind) (int, error) { + if w, ok := s.kindWorkers[gvk]; ok { return w, nil } - if workers > 0 { - return workers, nil - } return s.workers, nil } diff --git a/router.go b/router.go index b2880c7..7b5fdba 100644 --- a/router.go +++ b/router.go @@ -1,4 +1,4 @@ -package baaah +package nah import ( "fmt" @@ -7,8 +7,9 @@ import ( "github.com/obot-platform/nah/pkg/leader" "github.com/obot-platform/nah/pkg/restconfig" "github.com/obot-platform/nah/pkg/router" - bruntime "github.com/obot-platform/nah/pkg/runtime" + nruntime "github.com/obot-platform/nah/pkg/runtime" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" ) @@ -25,11 +26,13 @@ type Options struct { Scheme *runtime.Scheme // APIGroupConfigs are keyed by an API group. This indicates to the router that all actions on this group should use the // given Config. This is useful for routers that watch different objects on different API servers. - APIGroupConfigs map[string]bruntime.Config + APIGroupConfigs map[string]nruntime.GroupConfig // ElectionConfig being nil represents no leader election for the router. ElectionConfig *leader.ElectionConfig // Defaults to 8888 HealthzPort int + // Change the threadedness per GVK + GVKThreadiness map[schema.GroupVersionKind]int } func (o *Options) complete() (*Options, error) { @@ -58,8 +61,8 @@ func (o *Options) complete() (*Options, error) { } } - defaultConfig := bruntime.Config{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace} - backend, err := bruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme) + defaultConfig := nruntime.Config{GroupConfig: nruntime.GroupConfig{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}, GVKThreadiness: result.GVKThreadiness} + backend, err := nruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme) if err != nil { return nil, err } @@ -75,7 +78,7 @@ func DefaultOptions(routerName string, scheme *runtime.Scheme) (*Options, error) if err != nil { return nil, err } - rt, err := bruntime.NewRuntimeForNamespace(cfg, "", scheme) + rt, err := nruntime.NewRuntimeForNamespace(cfg, "", scheme) if err != nil { return nil, err }