Skip to content

Commit

Permalink
enhance: expose GVK-based threadiness to top-level router
Browse files Browse the repository at this point in the history
This allows callers to set the threadiness based on GVKs.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Feb 7, 2025
1 parent 1d4b332 commit 3b7c581
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ElectionConfig struct {

func NewDefaultElectionConfig(namespace, name string, cfg *rest.Config) *ElectionConfig {
ttl := defaultLeaderTTL
if os.Getenv("BAAAH_DEV_MODE") != "" {
if os.Getenv("NAH_DEV_MODE") != "" {
ttl = devLeaderTTL
}
return &ElectionConfig{
Expand Down
6 changes: 3 additions & 3 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ var (
//log.Printf("INFO: "+message+"\n", obj...)
}
Warnf = func(message string, obj ...interface{}) {
log.Printf("WARN [BAAAH]: "+message+"\n", obj...)
log.Printf("WARN [NAH]: "+message+"\n", obj...)
}
Errorf = func(message string, obj ...interface{}) {
log.Printf("ERROR[BAAAH]: "+message+"\n", obj...)
log.Printf("ERROR[NAH]: "+message+"\n", obj...)
}
Fatalf = func(message string, obj ...interface{}) {
log.Fatalf("FATAL[BAAAH]: "+message+"\n", obj...)
log.Fatalf("FATAL[NAH]: "+message+"\n", obj...)
}
Debugf = func(message string, obj ...interface{}) {
//log.Printf("DEBUG: "+message+"\n", obj...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var DefaultThreadiness = 5

func init() {
i, _ := strconv.Atoi(os.Getenv("BAAAH_THREADINESS"))
i, _ := strconv.Atoi(os.Getenv("NAH_THREADINESS"))
if i > 0 {
DefaultThreadiness = i
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func (b *Backend) start(ctx context.Context, preloadOnly bool) (err error) {
if preloadOnly {
err = b.cacheFactory.Preload(ctx)
} else {
err = b.cacheFactory.Start(ctx, DefaultThreadiness)
err = b.cacheFactory.Start(ctx)
}
if err != nil {
return err
Expand Down
23 changes: 17 additions & 6 deletions pkg/runtime/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/obot-platform/nah/pkg/mapper"
"github.com/obot-platform/nah/pkg/runtime/multi"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -17,23 +18,32 @@ type Runtime struct {
}

type Config struct {
GroupConfig
GVKThreadiness map[schema.GroupVersionKind]int
}

type GroupConfig struct {
Rest *rest.Config
Namespace string
}

func NewRuntime(cfg *rest.Config, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfig(Config{Rest: cfg}, scheme)
return NewRuntimeWithConfig(Config{
GroupConfig: GroupConfig{
Rest: cfg,
},
}, scheme)
}

func NewRuntimeForNamespace(cfg *rest.Config, namespace string, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfigs(Config{Rest: cfg, Namespace: namespace}, nil, scheme)
return NewRuntimeWithConfigs(Config{GroupConfig: GroupConfig{Rest: cfg, Namespace: namespace}}, nil, scheme)
}

func NewRuntimeWithConfig(cfg Config, scheme *runtime.Scheme) (*Runtime, error) {
return NewRuntimeWithConfigs(cfg, nil, scheme)
}

func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Config, scheme *runtime.Scheme) (*Runtime, error) {
func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]GroupConfig, scheme *runtime.Scheme) (*Runtime, error) {
clients := make(map[string]client.WithWatch, len(apiGroupConfigs))
cachedClients := make(map[string]client.Client, len(apiGroupConfigs))
caches := make(map[string]cache.Cache, len(apiGroupConfigs))
Expand All @@ -49,7 +59,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf
cachedClients[key] = cachedClient
}

uncachedClient, cachedClient, theCache, err := getClients(defaultConfig, scheme)
uncachedClient, cachedClient, theCache, err := getClients(defaultConfig.GroupConfig, scheme)
if err != nil {
return nil, err
}
Expand All @@ -59,7 +69,8 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf
aggCache := multi.NewCache(scheme, theCache, caches)

factory := NewSharedControllerFactory(aggUncachedClient, aggCache, &SharedControllerFactoryOptions{
// In baaah this is only invoked when a key fails to process
KindWorkers: defaultConfig.GVKThreadiness,
// In nah this is only invoked when a key fails to process
DefaultRateLimiter: workqueue.NewTypedMaxOfRateLimiter(
// This will go .5, 1, 2, 4, 8 seconds, etc up until 15 minutes
workqueue.NewTypedItemExponentialFailureRateLimiter[any](500*time.Millisecond, 15*time.Minute),
Expand All @@ -71,7 +82,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf
}, nil
}

func getClients(cfg Config, scheme *runtime.Scheme) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) {
func getClients(cfg GroupConfig, scheme *runtime.Scheme) (uncachedClient client.WithWatch, cachedClient client.Client, theCache cache.Cache, err error) {
mapper, err := mapper.New(cfg.Rest)
if err != nil {
return nil, nil, nil, err
Expand Down
24 changes: 10 additions & 14 deletions pkg/runtime/sharedcontrollerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type SharedControllerFactory interface {
ForKind(gvk schema.GroupVersionKind) (SharedController, error)
Preload(ctx context.Context) error
Start(ctx context.Context, workers int) error
Start(ctx context.Context) error
}

type SharedControllerFactoryOptions struct {
Expand Down Expand Up @@ -58,19 +58,19 @@ func applyDefaultSharedOptions(opts *SharedControllerFactoryOptions) *SharedCont
newOpts = *opts
}
if newOpts.DefaultWorkers == 0 {
newOpts.DefaultWorkers = 5
newOpts.DefaultWorkers = DefaultThreadiness
}
return &newOpts
}
func (s *sharedControllerFactory) Preload(ctx context.Context) error {
return s.start(ctx, 0)
return s.loadAndStart(ctx, false)
}

func (s *sharedControllerFactory) Start(ctx context.Context, defaultWorkers int) error {
return s.start(ctx, defaultWorkers)
func (s *sharedControllerFactory) Start(ctx context.Context) error {
return s.loadAndStart(ctx, true)
}

func (s *sharedControllerFactory) start(ctx context.Context, defaultWorkers int) error {
func (s *sharedControllerFactory) loadAndStart(ctx context.Context, start bool) error {
s.controllerLock.Lock()
defer s.controllerLock.Unlock()

Expand Down Expand Up @@ -99,9 +99,9 @@ func (s *sharedControllerFactory) start(ctx context.Context, defaultWorkers int)
s.cache.WaitForCacheSync(ctx)
s.controllerLock.Lock()

if defaultWorkers != 0 {
if start {
for gvk, controller := range controllersCopy {
w, err := s.getWorkers(gvk, defaultWorkers)
w, err := s.getWorkers(gvk)
if err != nil {
return err
}
Expand Down Expand Up @@ -150,14 +150,10 @@ func (s *sharedControllerFactory) ForKind(gvk schema.GroupVersionKind) (SharedCo
return controllerResult, nil
}

func (s *sharedControllerFactory) getWorkers(gvk schema.GroupVersionKind, workers int) (int, error) {
w, ok := s.kindWorkers[gvk]
if ok {
func (s *sharedControllerFactory) getWorkers(gvk schema.GroupVersionKind) (int, error) {
if w, ok := s.kindWorkers[gvk]; ok {
return w, nil
}
if workers > 0 {
return workers, nil
}
return s.workers, nil
}

Expand Down
15 changes: 9 additions & 6 deletions router.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package baaah
package nah

import (
"fmt"
Expand All @@ -7,8 +7,9 @@ import (
"github.com/obot-platform/nah/pkg/leader"
"github.com/obot-platform/nah/pkg/restconfig"
"github.com/obot-platform/nah/pkg/router"
bruntime "github.com/obot-platform/nah/pkg/runtime"
nruntime "github.com/obot-platform/nah/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)

Expand All @@ -25,11 +26,13 @@ type Options struct {
Scheme *runtime.Scheme
// APIGroupConfigs are keyed by an API group. This indicates to the router that all actions on this group should use the
// given Config. This is useful for routers that watch different objects on different API servers.
APIGroupConfigs map[string]bruntime.Config
APIGroupConfigs map[string]nruntime.GroupConfig
// ElectionConfig being nil represents no leader election for the router.
ElectionConfig *leader.ElectionConfig
// Defaults to 8888
HealthzPort int
// Change the threadedness per GVK
GVKThreadiness map[schema.GroupVersionKind]int
}

func (o *Options) complete() (*Options, error) {
Expand Down Expand Up @@ -58,8 +61,8 @@ func (o *Options) complete() (*Options, error) {
}
}

defaultConfig := bruntime.Config{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}
backend, err := bruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme)
defaultConfig := nruntime.Config{GroupConfig: nruntime.GroupConfig{Rest: result.DefaultRESTConfig, Namespace: result.DefaultNamespace}, GVKThreadiness: result.GVKThreadiness}
backend, err := nruntime.NewRuntimeWithConfigs(defaultConfig, result.APIGroupConfigs, result.Scheme)
if err != nil {
return nil, err
}
Expand All @@ -75,7 +78,7 @@ func DefaultOptions(routerName string, scheme *runtime.Scheme) (*Options, error)
if err != nil {
return nil, err
}
rt, err := bruntime.NewRuntimeForNamespace(cfg, "", scheme)
rt, err := nruntime.NewRuntimeForNamespace(cfg, "", scheme)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3b7c581

Please sign in to comment.