Skip to content

⚠️ [Warm Replicas] Implement warm replica support for controllers. #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Controller struct {
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// EnableWarmup indicates whether the controller needs to use warm up.
// Defaults to false, which means the controller will not use warm up.
NeedWarmup *bool

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type TypedOptions[request comparable] struct {
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool

// NeedWarmup specifies whether the controller should start its sources
// when the manager is not the leader.
// Defaults to false, which means that the controller will wait for leader election to start
// before starting sources.
NeedWarmup *bool
}

// DefaultFromConfig defaults the config from a config.Controller
Expand Down Expand Up @@ -124,6 +130,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller
if options.NeedLeaderElection == nil {
options.NeedLeaderElection = config.NeedLeaderElection
}

if options.NeedWarmup == nil {
options.NeedWarmup = config.NeedWarmup
}
}

// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -253,6 +263,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
NeedWarmup: options.NeedWarmup,
}, nil
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,75 @@ var _ = Describe("controller.Controller", func() {
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})

It("should set ShouldWarmupWithoutLeadership correctly", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

// Test with ShouldWarmupWithoutLeadership set to true
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(true),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue())

// Test with ShouldWarmupWithoutLeadership set to false
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse())

// Test with ShouldWarmupWithoutLeadership not set (should default to nil)
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil())
})

It("should inherit ShouldWarmupWithoutLeadership from manager config", func() {
// Test with manager default setting ShouldWarmupWithoutLeadership to true
managerWithWarmup, err := manager.New(cfg, manager.Options{
Controller: config.Controller{
NeedWarmup: ptr.To(true),
},
})
Expect(err).NotTo(HaveOccurred())
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue())

// Test that explicit controller setting overrides manager setting
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse())
})
})
})
25 changes: 25 additions & 0 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Controller[request comparable] struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []source.TypedSource[request]

// didStartEventSources is used to indicate whether the event sources have been started.
didStartEventSources atomic.Bool

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
Expand All @@ -95,6 +98,12 @@ type Controller[request comparable] struct {

// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected *bool

// NeedWarmup specifies whether the controller should start its sources
// when the manager is not the leader.
// Defaults to false, which means that the controller will wait for leader election to start
// before starting sources.
NeedWarmup *bool
}

// Reconcile implements reconcile.Reconciler.
Expand Down Expand Up @@ -144,6 +153,15 @@ func (c *Controller[request]) NeedLeaderElection() bool {
return *c.LeaderElected
}

// Warmup implements the manager.WarmupRunnable interface.
func (c *Controller[request]) Warmup(ctx context.Context) error {
if c.NeedWarmup == nil || !*c.NeedWarmup {
return nil
}

return c.startEventSources(ctx)
}

// Start implements controller.Controller.
func (c *Controller[request]) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
Expand Down Expand Up @@ -221,6 +239,13 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// startEventSources launches all the sources registered with this controller and waits
// for them to sync. It returns an error if any of the sources fail to start or sync.
func (c *Controller[request]) startEventSources(ctx context.Context) error {
// CAS returns false if value is already true, so early exit since another goroutine must have
// called startEventSources previously
if !c.didStartEventSources.CompareAndSwap(false, true) {
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started")
return nil
}

errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
Expand Down
126 changes: 125 additions & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ var _ = Describe("controller", func() {
return expectedErr
})

// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
ctrl.CacheSyncTimeout = 5 * time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
err := ctrl.startEventSources(ctx)
Expand Down Expand Up @@ -1014,6 +1014,130 @@ var _ = Describe("controller", func() {
})
})
})

Describe("Warmup", func() {
It("should start event sources when NeedWarmup is true", func() {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a mock source that we can verify was started
sourceStarted := false
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
sourceStarted = true
return nil
})

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
ctrl.NeedWarmup = ptr.To(true)

// Act
err := ctrl.Warmup(ctx)

// Assert
Expect(err).NotTo(HaveOccurred())
Expect(sourceStarted).To(BeTrue(), "Event source should have been started")
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
})

It("should not start event sources when NeedWarmup is false", func() {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a mock source that should not be started
sourceStarted := false
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
sourceStarted = true
return nil
})

ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
ctrl.NeedWarmup = ptr.To(false)

// Act
err := ctrl.Warmup(ctx)

// Assert
Expect(err).NotTo(HaveOccurred())
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
})

It("should not start event sources when NeedWarmup is nil", func() {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a mock source that should not be started
sourceStarted := false
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
sourceStarted = true
return nil
})

ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
ctrl.NeedWarmup = nil

// Act
err := ctrl.Warmup(ctx)

// Assert
Expect(err).NotTo(HaveOccurred())
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
})

It("should not start event sources twice when called multiple times", func() {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a mock source that counts how many times it's started
startCount := 0
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
startCount++
return nil
})

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
ctrl.NeedWarmup = ptr.To(true)

// Act
err1 := ctrl.Warmup(ctx)
err2 := ctrl.Warmup(ctx)

// Assert
Expect(err1).NotTo(HaveOccurred())
Expect(err2).NotTo(HaveOccurred())
Expect(startCount).To(Equal(1), "Event source should have been started only once")
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
})

It("should propagate errors from event sources", func() {
// Setup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a mock source that returns an error
expectedErr := errors.New("test error")
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
return expectedErr
})

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
ctrl.NeedWarmup = ptr.To(true)

// Act
err := ctrl.Warmup(ctx)

// Assert
Expect(err).To(MatchError(expectedErr))
})
})
})

var _ = Describe("ReconcileIDFromContext function", func() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
return fmt.Errorf("failed to start other runnables: %w", err)
}

// Start and wait for sources to start.
if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should come right after the caches and the comment shouldn't make assumptions about what the Warmup internally does.

The other issue: This needs to block until the Warmup has terminated, otherwise we may end up starting the controller before the sources are started, as the check we added only checks if we started to start the sources, not if we finished doing so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change to coming right after the caches, but don't completely understand the significance of having it after vs. before the non-leader election runnables; mind explaining a bit?

Copy link
Contributor Author

@godwinpang godwinpang Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to block until the Warmup has terminated, otherwise we may end up starting the controller before the sources are started, as the check we added only checks if we started to start the sources, not if we finished doing so

Is this a big problem? Replicas will only start the controller after they win leader election so I don't see an issue in the leader election failover case; are you saying that in non-leader election cases the behavior of warmup should be that it completely blocks controller startup?

return fmt.Errorf("failed to start warmup runnables: %w", err)
}

// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.runnables.LeaderElection.startOnce.Do(func() {})
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)

// Stop the warmup runnables
cm.logger.Info("Stopping and waiting for warmup runnables")
cm.runnables.Warmup.StopAndWait(cm.shutdownCtx)

// Stop the caches before the leader election runnables, this is an important
// step to make sure that we don't race with the reconcilers by receiving more events
// from the API servers and enqueueing them.
Expand Down
6 changes: 6 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ type LeaderElectionRunnable interface {
NeedLeaderElection() bool
}

// WarmupRunnable knows if a Runnable should be a warmup runnable.
type WarmupRunnable interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain the purpose of it in the godoc

// Warmup returns true if the Runnable should be run as warmup.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has no bool return

Warmup(context.Context) error
}

// New returns a new Manager for creating Controllers.
// Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf"
// will be used for all built-in resources of Kubernetes, and "application/json" is for other types
Expand Down
Loading
Loading