diff --git a/pkg/config/controller.go b/pkg/config/controller.go index a5655593ef..60c010025a 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -60,6 +60,10 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + // EnableWarmup indicates whether the controller needs to use warm up. + // Defaults to false, which means the controller will not use warm up. + NeedWarmup *bool + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9de959b48f..f1d8c0e428 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -93,6 +93,12 @@ type TypedOptions[request comparable] struct { // // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue *bool + + // NeedWarmup specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + NeedWarmup *bool } // DefaultFromConfig defaults the config from a config.Controller @@ -124,6 +130,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller if options.NeedLeaderElection == nil { options.NeedLeaderElection = config.NeedLeaderElection } + + if options.NeedWarmup == nil { + options.NeedWarmup = config.NeedWarmup + } } // Controller implements an API. A Controller manages a work queue fed reconcile.Requests @@ -253,6 +263,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, LeaderElected: options.NeedLeaderElection, + NeedWarmup: options.NeedWarmup, }, nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 1c5b11d709..53d923fc77 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -474,5 +474,75 @@ var _ = Describe("controller.Controller", func() { _, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) Expect(ok).To(BeFalse()) }) + + It("should set ShouldWarmupWithoutLeadership correctly", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + // Test with ShouldWarmupWithoutLeadership set to true + ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + NeedWarmup: ptr.To(true), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue()) + + // Test with ShouldWarmupWithoutLeadership set to false + ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + NeedWarmup: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse()) + + // Test with ShouldWarmupWithoutLeadership not set (should default to nil) + ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil()) + }) + + It("should inherit ShouldWarmupWithoutLeadership from manager config", func() { + // Test with manager default setting ShouldWarmupWithoutLeadership to true + managerWithWarmup, err := manager.New(cfg, manager.Options{ + Controller: config.Controller{ + NeedWarmup: ptr.To(true), + }, + }) + Expect(err).NotTo(HaveOccurred()) + ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue()) + + // Test that explicit controller setting overrides manager setting + ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + NeedWarmup: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse()) + }) }) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 9fa7ec71e1..6a632c3825 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -83,6 +83,9 @@ type Controller[request comparable] struct { // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []source.TypedSource[request] + // didStartEventSources is used to indicate whether the event sources have been started. + didStartEventSources atomic.Bool + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it @@ -95,6 +98,12 @@ type Controller[request comparable] struct { // LeaderElected indicates whether the controller is leader elected or always running. LeaderElected *bool + + // NeedWarmup specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + NeedWarmup *bool } // Reconcile implements reconcile.Reconciler. @@ -144,6 +153,15 @@ func (c *Controller[request]) NeedLeaderElection() bool { return *c.LeaderElected } +// Warmup implements the manager.WarmupRunnable interface. +func (c *Controller[request]) Warmup(ctx context.Context) error { + if c.NeedWarmup == nil || !*c.NeedWarmup { + return nil + } + + return c.startEventSources(ctx) +} + // Start implements controller.Controller. func (c *Controller[request]) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -221,6 +239,13 @@ func (c *Controller[request]) Start(ctx context.Context) error { // startEventSources launches all the sources registered with this controller and waits // for them to sync. It returns an error if any of the sources fail to start or sync. func (c *Controller[request]) startEventSources(ctx context.Context) error { + // CAS returns false if value is already true, so early exit since another goroutine must have + // called startEventSources previously + if !c.didStartEventSources.CompareAndSwap(false, true) { + c.LogConstructor(nil).Info("Skipping starting event sources since it was already started") + return nil + } + errGroup := &errgroup.Group{} for _, watch := range c.startWatches { log := c.LogConstructor(nil) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 3fde5da9c8..cb15a60d7b 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -403,7 +403,7 @@ var _ = Describe("controller", func() { return expectedErr }) - // // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned + // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned ctrl.CacheSyncTimeout = 5 * time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} err := ctrl.startEventSources(ctx) @@ -1014,6 +1014,130 @@ var _ = Describe("controller", func() { }) }) }) + + Describe("Warmup", func() { + It("should start event sources when NeedWarmup is true", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that we can verify was started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.NeedWarmup = ptr.To(true) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeTrue(), "Event source should have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set") + }) + + It("should not start event sources when NeedWarmup is false", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that should not be started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.NeedWarmup = ptr.To(false) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeFalse(), "Event source should not have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set") + }) + + It("should not start event sources when NeedWarmup is nil", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that should not be started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.NeedWarmup = nil + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeFalse(), "Event source should not have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set") + }) + + It("should not start event sources twice when called multiple times", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that counts how many times it's started + startCount := 0 + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + startCount++ + return nil + }) + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.NeedWarmup = ptr.To(true) + + // Act + err1 := ctrl.Warmup(ctx) + err2 := ctrl.Warmup(ctx) + + // Assert + Expect(err1).NotTo(HaveOccurred()) + Expect(err2).NotTo(HaveOccurred()) + Expect(startCount).To(Equal(1), "Event source should have been started only once") + Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set") + }) + + It("should propagate errors from event sources", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that returns an error + expectedErr := errors.New("test error") + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return expectedErr + }) + + ctrl.CacheSyncTimeout = time.Second + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.NeedWarmup = ptr.To(true) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).To(MatchError(expectedErr)) + }) + }) }) var _ = Describe("ReconcileIDFromContext function", func() { diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index e5204a7506..261bccac9d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { return fmt.Errorf("failed to start other runnables: %w", err) } + // Start and wait for sources to start. + if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil { + return fmt.Errorf("failed to start warmup runnables: %w", err) + } + // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) @@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e cm.runnables.LeaderElection.startOnce.Do(func() {}) cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx) + // Stop the warmup runnables + cm.logger.Info("Stopping and waiting for warmup runnables") + cm.runnables.Warmup.StopAndWait(cm.shutdownCtx) + // Stop the caches before the leader election runnables, this is an important // step to make sure that we don't race with the reconcilers by receiving more events // from the API servers and enqueueing them. diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c3ae317b04..013321366c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -314,6 +314,12 @@ type LeaderElectionRunnable interface { NeedLeaderElection() bool } +// WarmupRunnable knows if a Runnable should be a warmup runnable. +type WarmupRunnable interface { + // Warmup returns true if the Runnable should be run as warmup. + Warmup(context.Context) error +} + // New returns a new Manager for creating Controllers. // Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf" // will be used for all built-in resources of Kubernetes, and "application/json" is for other types diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 247a33f9dc..822a3b6200 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1928,6 +1928,75 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.GetAPIReader()).NotTo(BeNil()) }) + + It("should run warmup runnables before leader election is won", func() { + By("Creating channels to track execution order") + warmupCalled := make(chan struct{}) + leaderElectionRunnableCalled := make(chan struct{}) + + By("Creating a manager with leader election enabled") + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-warmup", + newResourceLock: fakeleaderelection.NewResourceLock, + HealthProbeBindAddress: "0", + Metrics: metricsserver.Options{BindAddress: "0"}, + PprofBindAddress: "0", + }) + Expect(err).NotTo(HaveOccurred()) + + // Override onStoppedLeading to prevent os.Exit + cm := m.(*controllerManager) + cm.onStoppedLeading = func() {} + + By("Creating a runnable that implements WarmupRunnable interface") + // Create a warmup runnable + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(ctx context.Context) error { + // This is the main runnable that will be executed after leader election + <-ctx.Done() + return nil + }, + WarmupFunc: func(ctx context.Context) error { + // This should be called during startup before leader election + close(warmupCalled) + return nil + }, + } + Expect(m.Add(warmupRunnable)).To(Succeed()) + + By("Creating a runnable that requires leader election") + leaderElectionRunnable := LeaderElectionRunnableFunc{ + RunFunc: func(ctx context.Context) error { + // This will only be called after leader election is won + close(leaderElectionRunnableCalled) + <-ctx.Done() + return nil + }, + NeedLeaderElectionFunc: func() bool { + return true + }, + } + Expect(m.Add(leaderElectionRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + By("Waiting for the warmup runnable to be called") + <-warmupCalled + + By("Waiting for leader election to be won") + <-m.Elected() + + By("Verifying the leader election runnable is called after election") + <-leaderElectionRunnableCalled + }) }) type runnableError struct { diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index db5cda7c88..3d0d825fbe 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -32,6 +32,7 @@ type runnables struct { Webhooks *runnableGroup Caches *runnableGroup LeaderElection *runnableGroup + Warmup *runnableGroup Others *runnableGroup } @@ -42,6 +43,7 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { Webhooks: newRunnableGroup(baseContext, errChan), Caches: newRunnableGroup(baseContext, errChan), LeaderElection: newRunnableGroup(baseContext, errChan), + Warmup: newRunnableGroup(baseContext, errChan), Others: newRunnableGroup(baseContext, errChan), } } @@ -65,8 +67,20 @@ func (r *runnables) Add(fn Runnable) error { }) case webhook.Server: return r.Webhooks.Add(fn, nil) - case LeaderElectionRunnable: - if !runnable.NeedLeaderElection() { + case WarmupRunnable, LeaderElectionRunnable: + if warmupRunnable, ok := fn.(WarmupRunnable); ok { + if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil { + return err + } + } + + leaderElectionRunnable, ok := fn.(LeaderElectionRunnable) + if !ok { + // If the runnable is not a LeaderElectionRunnable, add it to the leader election group for backwards compatibility + return r.LeaderElection.Add(fn, nil) + } + + if !leaderElectionRunnable.NeedLeaderElection() { return r.Others.Add(fn, nil) } return r.LeaderElection.Add(fn, nil) diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index f2f4119ba6..a211e5119f 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -53,6 +53,130 @@ var _ = Describe("runnables", func() { Expect(r.Add(runnable)).To(Succeed()) Expect(r.LeaderElection.startQueue).To(HaveLen(1)) }) + + It("should add WarmupRunnable to the Warmup and LeaderElection group", func() { + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + }) + + It("should add WarmupRunnable that doesn't needs leader election to warmup group only", func() { + warmupRunnable := CombinedRunnable{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + NeedLeaderElectionFunc: func() bool { + return false + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(BeEmpty()) + }) + + It("should add WarmupRunnable that needs leader election to Warmup and LeaderElection group, not Others", func() { + warmupRunnable := CombinedRunnable{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + NeedLeaderElectionFunc: func() bool { + return true + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) + }) + + It("should execute the Warmup function when Warmup group is started", func() { + var warmupExecuted atomic.Bool + + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + warmupExecuted.Store(true) + return nil + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group + Expect(r.Warmup.Start(ctx)).To(Succeed()) + + // Verify warmup function was called + Expect(warmupExecuted.Load()).To(BeTrue()) + }) + + It("should propagate errors from Warmup function to error channel", func() { + expectedErr := fmt.Errorf("expected warmup error") + + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return expectedErr + }, + } + + testErrChan := make(chan error, 1) + r := newRunnables(defaultBaseContext, testErrChan) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group in a goroutine + go func() { + Expect(r.Warmup.Start(ctx)).To(Succeed()) + }() + + // Error from Warmup should be sent to error channel + var receivedErr error + Eventually(func() error { + select { + case receivedErr = <-testErrChan: + return receivedErr + default: + return nil + } + }).Should(Equal(expectedErr)) + }) }) var _ = Describe("runnableGroup", func() { @@ -224,3 +348,52 @@ var _ = Describe("runnableGroup", func() { } }) }) + +// LeaderElectionRunnableFunc is a helper struct that implements LeaderElectionRunnable +// for testing purposes. +type LeaderElectionRunnableFunc struct { + RunFunc func(context.Context) error + NeedLeaderElectionFunc func() bool +} + +func (r LeaderElectionRunnableFunc) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r LeaderElectionRunnableFunc) NeedLeaderElection() bool { + return r.NeedLeaderElectionFunc() +} + +// WarmupRunnableFunc is a helper struct that implements WarmupRunnable +// for testing purposes. +type WarmupRunnableFunc struct { + RunFunc func(context.Context) error + WarmupFunc func(context.Context) error +} + +func (r WarmupRunnableFunc) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r WarmupRunnableFunc) Warmup(ctx context.Context) error { + return r.WarmupFunc(ctx) +} + +// CombinedRunnable implements both WarmupRunnable and LeaderElectionRunnable +type CombinedRunnable struct { + RunFunc func(context.Context) error + WarmupFunc func(context.Context) error + NeedLeaderElectionFunc func() bool +} + +func (r CombinedRunnable) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r CombinedRunnable) Warmup(ctx context.Context) error { + return r.WarmupFunc(ctx) +} + +func (r CombinedRunnable) NeedLeaderElection() bool { + return r.NeedLeaderElectionFunc() +}