From 8239300b69af3137e5994558257375c678560756 Mon Sep 17 00:00:00 2001 From: godwinpang Date: Tue, 8 Apr 2025 22:45:05 -0700 Subject: [PATCH 1/5] [Warm Replicas] Implement warm replica support for controllers. --- pkg/config/controller.go | 4 + pkg/controller/controller.go | 25 +-- pkg/controller/controller_test.go | 73 +++++++++ pkg/internal/controller/controller.go | 25 +++ pkg/internal/controller/controller_test.go | 121 ++++++++++++++ pkg/manager/internal.go | 9 ++ pkg/manager/manager.go | 6 + pkg/manager/manager_test.go | 69 ++++++++ pkg/manager/runnable_group.go | 72 ++++++--- pkg/manager/runnable_group_test.go | 173 +++++++++++++++++++++ 10 files changed, 550 insertions(+), 27 deletions(-) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index a5655593ef..60b805e4e2 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -60,6 +60,10 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + // NeedWarmUp indicates whether the controller needs to use warm up. + // Defaults to false, which means the controller will not use warm up. + NeedWarmUp *bool + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9de959b48f..1ce9ddbf23 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -93,6 +93,12 @@ type TypedOptions[request comparable] struct { // // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue *bool + + // ShouldWarmupWithoutLeadership specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + ShouldWarmupWithoutLeadership *bool } // DefaultFromConfig defaults the config from a config.Controller @@ -244,15 +250,16 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req // Create controller with dependencies set return &controller.Controller[request]{ - Do: options.Reconciler, - RateLimiter: options.RateLimiter, - NewQueue: options.NewQueue, - MaxConcurrentReconciles: options.MaxConcurrentReconciles, - CacheSyncTimeout: options.CacheSyncTimeout, - Name: name, - LogConstructor: options.LogConstructor, - RecoverPanic: options.RecoverPanic, - LeaderElected: options.NeedLeaderElection, + Do: options.Reconciler, + RateLimiter: options.RateLimiter, + NewQueue: options.NewQueue, + MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, + Name: name, + LogConstructor: options.LogConstructor, + RecoverPanic: options.RecoverPanic, + LeaderElected: options.NeedLeaderElection, + ShouldWarmupWithoutLeadership: options.ShouldWarmupWithoutLeadership, }, nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 1c5b11d709..48736b07b6 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -474,5 +474,78 @@ var _ = Describe("controller.Controller", func() { _, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) Expect(ok).To(BeFalse()) }) + + It("should set ShouldWarmupWithoutLeadership correctly", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + // Test with ShouldWarmupWithoutLeadership set to true + ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + ShouldWarmupWithoutLeadership: ptr.To(true), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) + Expect(*internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).To(BeTrue()) + + // Test with ShouldWarmupWithoutLeadership set to false + ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + ShouldWarmupWithoutLeadership: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) + Expect(*internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).To(BeFalse()) + + // Test with ShouldWarmupWithoutLeadership not set (should default to nil) + ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlWithDefaultWarmup.ShouldWarmupWithoutLeadership).To(BeNil()) + }) + + It("should inherit ShouldWarmupWithoutLeadership from manager config", func() { + // Test with manager default setting ShouldWarmupWithoutLeadership to true + managerWithWarmup, err := manager.New(cfg, manager.Options{ + Controller: config.Controller{ + NeedWarmUp: ptr.To(true), + }, + }) + Expect(err).NotTo(HaveOccurred()) + ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + // Note: This test will fail until the DefaultFromConfig method is updated to set + // ShouldWarmupWithoutLeadership from config.Controller.NeedWarmUp + // This test demonstrates that the feature needs to be completed + Expect(internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) + Expect(*internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).To(BeTrue()) + + // Test that explicit controller setting overrides manager setting + ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ + Reconciler: reconcile.Func(nil), + ShouldWarmupWithoutLeadership: ptr.To(false), + }) + Expect(err).NotTo(HaveOccurred()) + + internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + Expect(internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) + Expect(*internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).To(BeFalse()) + }) }) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 9fa7ec71e1..35949e0928 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -83,6 +83,9 @@ type Controller[request comparable] struct { // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []source.TypedSource[request] + // didStartEventSources is used to indicate whether the event sources have been started. + didStartEventSources atomic.Bool + // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it @@ -95,6 +98,12 @@ type Controller[request comparable] struct { // LeaderElected indicates whether the controller is leader elected or always running. LeaderElected *bool + + // ShouldWarmupWithoutLeadership specifies whether the controller should start its sources + // when the manager is not the leader. + // Defaults to false, which means that the controller will wait for leader election to start + // before starting sources. + ShouldWarmupWithoutLeadership *bool } // Reconcile implements reconcile.Reconciler. @@ -144,6 +153,15 @@ func (c *Controller[request]) NeedLeaderElection() bool { return *c.LeaderElected } +// Warmup implements the manager.WarmupRunnable interface. +func (c *Controller[request]) Warmup(ctx context.Context) error { + if c.ShouldWarmupWithoutLeadership == nil || !*c.ShouldWarmupWithoutLeadership { + return nil + } + + return c.startEventSources(ctx) +} + // Start implements controller.Controller. func (c *Controller[request]) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -221,6 +239,13 @@ func (c *Controller[request]) Start(ctx context.Context) error { // startEventSources launches all the sources registered with this controller and waits // for them to sync. It returns an error if any of the sources fail to start or sync. func (c *Controller[request]) startEventSources(ctx context.Context) error { + // CAS returns false if value is already true, so early exit since another goroutine must have + // called startEventSources previously + if !c.didStartEventSources.CompareAndSwap(false, true) { + c.LogConstructor(nil).Info("Skipping starting event sources since it was already started") + return nil + } + errGroup := &errgroup.Group{} for _, watch := range c.startWatches { log := c.LogConstructor(nil) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 3fde5da9c8..b7c3989fd2 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -1014,6 +1014,127 @@ var _ = Describe("controller", func() { }) }) }) + + Describe("Warmup", func() { + It("should start event sources when ShouldWarmupWithoutLeadership is true", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that we can verify was started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeTrue(), "Event source should have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set") + }) + + It("should not start event sources when ShouldWarmupWithoutLeadership is false", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that should not be started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.ShouldWarmupWithoutLeadership = ptr.To(false) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeFalse(), "Event source should not have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set") + }) + + It("should not start event sources when ShouldWarmupWithoutLeadership is nil", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that should not be started + sourceStarted := false + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + sourceStarted = true + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.ShouldWarmupWithoutLeadership = nil + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).NotTo(HaveOccurred()) + Expect(sourceStarted).To(BeFalse(), "Event source should not have been started") + Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set") + }) + + It("should not start event sources twice when called multiple times", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that counts how many times it's started + startCount := 0 + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + startCount++ + return nil + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + + // Act + err1 := ctrl.Warmup(ctx) + err2 := ctrl.Warmup(ctx) + + // Assert + Expect(err1).NotTo(HaveOccurred()) + Expect(err2).NotTo(HaveOccurred()) + Expect(startCount).To(Equal(1), "Event source should have been started only once") + Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set") + }) + + It("should propagate errors from event sources", func() { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a mock source that returns an error + expectedErr := errors.New("test error") + mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + return expectedErr + }) + + ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} + ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + + // Act + err := ctrl.Warmup(ctx) + + // Assert + Expect(err).To(MatchError(expectedErr)) + }) + }) }) var _ = Describe("ReconcileIDFromContext function", func() { diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index e5204a7506..261bccac9d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { return fmt.Errorf("failed to start other runnables: %w", err) } + // Start and wait for sources to start. + if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil { + return fmt.Errorf("failed to start warmup runnables: %w", err) + } + // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) @@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e cm.runnables.LeaderElection.startOnce.Do(func() {}) cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx) + // Stop the warmup runnables + cm.logger.Info("Stopping and waiting for warmup runnables") + cm.runnables.Warmup.StopAndWait(cm.shutdownCtx) + // Stop the caches before the leader election runnables, this is an important // step to make sure that we don't race with the reconcilers by receiving more events // from the API servers and enqueueing them. diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c3ae317b04..013321366c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -314,6 +314,12 @@ type LeaderElectionRunnable interface { NeedLeaderElection() bool } +// WarmupRunnable knows if a Runnable should be a warmup runnable. +type WarmupRunnable interface { + // Warmup returns true if the Runnable should be run as warmup. + Warmup(context.Context) error +} + // New returns a new Manager for creating Controllers. // Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf" // will be used for all built-in resources of Kubernetes, and "application/json" is for other types diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 247a33f9dc..822a3b6200 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1928,6 +1928,75 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.GetAPIReader()).NotTo(BeNil()) }) + + It("should run warmup runnables before leader election is won", func() { + By("Creating channels to track execution order") + warmupCalled := make(chan struct{}) + leaderElectionRunnableCalled := make(chan struct{}) + + By("Creating a manager with leader election enabled") + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-warmup", + newResourceLock: fakeleaderelection.NewResourceLock, + HealthProbeBindAddress: "0", + Metrics: metricsserver.Options{BindAddress: "0"}, + PprofBindAddress: "0", + }) + Expect(err).NotTo(HaveOccurred()) + + // Override onStoppedLeading to prevent os.Exit + cm := m.(*controllerManager) + cm.onStoppedLeading = func() {} + + By("Creating a runnable that implements WarmupRunnable interface") + // Create a warmup runnable + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(ctx context.Context) error { + // This is the main runnable that will be executed after leader election + <-ctx.Done() + return nil + }, + WarmupFunc: func(ctx context.Context) error { + // This should be called during startup before leader election + close(warmupCalled) + return nil + }, + } + Expect(m.Add(warmupRunnable)).To(Succeed()) + + By("Creating a runnable that requires leader election") + leaderElectionRunnable := LeaderElectionRunnableFunc{ + RunFunc: func(ctx context.Context) error { + // This will only be called after leader election is won + close(leaderElectionRunnableCalled) + <-ctx.Done() + return nil + }, + NeedLeaderElectionFunc: func() bool { + return true + }, + } + Expect(m.Add(leaderElectionRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + By("Waiting for the warmup runnable to be called") + <-warmupCalled + + By("Waiting for leader election to be won") + <-m.Elected() + + By("Verifying the leader election runnable is called after election") + <-leaderElectionRunnableCalled + }) }) type runnableError struct { diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index db5cda7c88..fd700173d4 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -32,6 +32,7 @@ type runnables struct { Webhooks *runnableGroup Caches *runnableGroup LeaderElection *runnableGroup + Warmup *runnableGroup Others *runnableGroup } @@ -42,6 +43,7 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { Webhooks: newRunnableGroup(baseContext, errChan), Caches: newRunnableGroup(baseContext, errChan), LeaderElection: newRunnableGroup(baseContext, errChan), + Warmup: newRunnableGroup(baseContext, errChan), Others: newRunnableGroup(baseContext, errChan), } } @@ -65,8 +67,20 @@ func (r *runnables) Add(fn Runnable) error { }) case webhook.Server: return r.Webhooks.Add(fn, nil) - case LeaderElectionRunnable: - if !runnable.NeedLeaderElection() { + case WarmupRunnable, LeaderElectionRunnable: + if warmupRunnable, ok := fn.(WarmupRunnable); ok { + if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil { + return err + } + } + + leaderElectionRunnable, ok := fn.(LeaderElectionRunnable) + if !ok { + // If the runnable is not a LeaderElectionRunnable, add it to the leader election group for backwards compatibility + return r.LeaderElection.Add(fn, nil) + } + + if !leaderElectionRunnable.NeedLeaderElection() { return r.Others.Add(fn, nil) } return r.LeaderElection.Add(fn, nil) @@ -208,23 +222,50 @@ func (r *runnableGroup) reconcile() { // Start the runnable. go func(rn *readyRunnable) { + // If we return, the runnable ended cleanly + // or returned an error to the channel. + // + // We should always decrement the WaitGroup here. + defer r.wg.Done() + + // Track the ready check in the same WaitGroup to prevent goroutine leaks + done := make(chan struct{}) + + // Launch the ready check but make sure it doesn't outlive this goroutine go func() { + defer close(done) if rn.Check(r.ctx) { if rn.signalReady { - r.startReadyCh <- rn + // Use non-blocking send to avoid leaking this goroutine if the channel is never read + select { + case r.startReadyCh <- rn: + // Successfully sent + case <-r.ctx.Done(): + // Context canceled, exit without blocking + } } } }() - // If we return, the runnable ended cleanly - // or returned an error to the channel. - // - // We should always decrement the WaitGroup here. - defer r.wg.Done() - // Start the runnable. - if err := rn.Start(r.ctx); err != nil { - r.errChan <- err + err := rn.Start(r.ctx) + + // Now that the runnable is done, clean up the ready check goroutine if still running + select { + case <-done: + // Ready check already completed, nothing to do + case <-r.ctx.Done(): + // Context was canceled, ready check should exit soon + } + + // Send any error from the runnable + if err != nil { + select { + case r.errChan <- err: + // Error sent successfully + default: + // Channel full or closed, can't send the error + } } }(runnable) } @@ -283,18 +324,13 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { // StopAndWait waits for all the runnables to finish before returning. func (r *runnableGroup) StopAndWait(ctx context.Context) { r.stopOnce.Do(func() { - // Close the reconciler channel once we're done. - defer func() { - r.stop.Lock() - close(r.ch) - r.stop.Unlock() - }() - _ = r.Start(ctx) r.stop.Lock() // Store the stopped variable so we don't accept any new // runnables for the time being. r.stopped = true + // Close the channel to signal the reconcile goroutine to exit + close(r.ch) r.stop.Unlock() // Cancel the internal channel. diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index f2f4119ba6..35373aa8f8 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -53,6 +53,130 @@ var _ = Describe("runnables", func() { Expect(r.Add(runnable)).To(Succeed()) Expect(r.LeaderElection.startQueue).To(HaveLen(1)) }) + + It("should add WarmupRunnable to the Warmup and LeaderElection group", func() { + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + }) + + It("should add WarmupRunnable that doesn't needs leader election to warmup group only", func() { + warmupRunnable := CombinedRunnable{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + NeedLeaderElectionFunc: func() bool { + return false + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(BeEmpty()) + }) + + It("should add WarmupRunnable that needs leader election to Warmup and LeaderElection group, not Others", func() { + warmupRunnable := CombinedRunnable{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return nil + }, + NeedLeaderElectionFunc: func() bool { + return true + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + Expect(r.Warmup.startQueue).To(HaveLen(1)) + Expect(r.LeaderElection.startQueue).To(HaveLen(1)) + Expect(r.Others.startQueue).To(BeEmpty()) + }) + + It("should execute the Warmup function when Warmup group is started", func() { + warmupExecuted := false + + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + warmupExecuted = true + return nil + }, + } + + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group + Expect(r.Warmup.Start(ctx)).To(Succeed()) + + // Verify warmup function was called + Expect(warmupExecuted).To(BeTrue()) + }) + + It("should propagate errors from Warmup function to error channel", func() { + expectedErr := fmt.Errorf("expected warmup error") + + warmupRunnable := WarmupRunnableFunc{ + RunFunc: func(c context.Context) error { + <-c.Done() + return nil + }, + WarmupFunc: func(c context.Context) error { + return expectedErr + }, + } + + testErrChan := make(chan error, 1) + r := newRunnables(defaultBaseContext, testErrChan) + Expect(r.Add(warmupRunnable)).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the Warmup group in a goroutine + go func() { + Expect(r.Warmup.Start(ctx)).To(Succeed()) + }() + + // Error from Warmup should be sent to error channel + var receivedErr error + Eventually(func() error { + select { + case receivedErr = <-testErrChan: + return receivedErr + default: + return nil + } + }).Should(Equal(expectedErr)) + }) }) var _ = Describe("runnableGroup", func() { @@ -224,3 +348,52 @@ var _ = Describe("runnableGroup", func() { } }) }) + +// LeaderElectionRunnableFunc is a helper struct that implements LeaderElectionRunnable +// for testing purposes. +type LeaderElectionRunnableFunc struct { + RunFunc func(context.Context) error + NeedLeaderElectionFunc func() bool +} + +func (r LeaderElectionRunnableFunc) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r LeaderElectionRunnableFunc) NeedLeaderElection() bool { + return r.NeedLeaderElectionFunc() +} + +// WarmupRunnableFunc is a helper struct that implements WarmupRunnable +// for testing purposes. +type WarmupRunnableFunc struct { + RunFunc func(context.Context) error + WarmupFunc func(context.Context) error +} + +func (r WarmupRunnableFunc) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r WarmupRunnableFunc) Warmup(ctx context.Context) error { + return r.WarmupFunc(ctx) +} + +// CombinedRunnable implements both WarmupRunnable and LeaderElectionRunnable +type CombinedRunnable struct { + RunFunc func(context.Context) error + WarmupFunc func(context.Context) error + NeedLeaderElectionFunc func() bool +} + +func (r CombinedRunnable) Start(ctx context.Context) error { + return r.RunFunc(ctx) +} + +func (r CombinedRunnable) Warmup(ctx context.Context) error { + return r.WarmupFunc(ctx) +} + +func (r CombinedRunnable) NeedLeaderElection() bool { + return r.NeedLeaderElectionFunc() +} From 73fc8fa13fb92480c4c0e2a36127eb58b94c742b Mon Sep 17 00:00:00 2001 From: godwinpang Date: Sun, 13 Apr 2025 22:23:13 -0700 Subject: [PATCH 2/5] Remove irrelevant runnable_group.go code. --- pkg/manager/runnable_group.go | 56 +++++++++++------------------------ 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index fd700173d4..21b349529e 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -222,51 +222,24 @@ func (r *runnableGroup) reconcile() { // Start the runnable. go func(rn *readyRunnable) { - // If we return, the runnable ended cleanly - // or returned an error to the channel. - // - // We should always decrement the WaitGroup here. - defer r.wg.Done() - - // Track the ready check in the same WaitGroup to prevent goroutine leaks - done := make(chan struct{}) - - // Launch the ready check but make sure it doesn't outlive this goroutine go func() { - defer close(done) if rn.Check(r.ctx) { if rn.signalReady { - // Use non-blocking send to avoid leaking this goroutine if the channel is never read - select { - case r.startReadyCh <- rn: - // Successfully sent - case <-r.ctx.Done(): - // Context canceled, exit without blocking - } + r.startReadyCh <- rn } } }() - // Start the runnable. - err := rn.Start(r.ctx) - - // Now that the runnable is done, clean up the ready check goroutine if still running - select { - case <-done: - // Ready check already completed, nothing to do - case <-r.ctx.Done(): - // Context was canceled, ready check should exit soon - } + // If we return, the runnable ended cleanly + // or returned an error to the channel. + // + // We should always decrement the WaitGroup here. + defer r.wg.Done() - // Send any error from the runnable - if err != nil { - select { - case r.errChan <- err: - // Error sent successfully - default: - // Channel full or closed, can't send the error - } - } + // Start the runnable. + if err := rn.Start(r.ctx); err != nil { + r.errChan <- err + } }(runnable) } } @@ -324,13 +297,18 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { // StopAndWait waits for all the runnables to finish before returning. func (r *runnableGroup) StopAndWait(ctx context.Context) { r.stopOnce.Do(func() { + // Close the reconciler channel once we're done. + defer func() { + r.stop.Lock() + close(r.ch) + r.stop.Unlock() + }() + _ = r.Start(ctx) r.stop.Lock() // Store the stopped variable so we don't accept any new // runnables for the time being. r.stopped = true - // Close the channel to signal the reconcile goroutine to exit - close(r.ch) r.stop.Unlock() // Cancel the internal channel. From be1b1c2b3bed3ba8ceff20f57c020e6a2fe8f457 Mon Sep 17 00:00:00 2001 From: godwinpang Date: Sun, 13 Apr 2025 22:24:18 -0700 Subject: [PATCH 3/5] Rename ShouldWarmup. --- pkg/config/controller.go | 4 +-- pkg/controller/controller.go | 10 +++++--- pkg/controller/controller_test.go | 29 ++++++++++------------ pkg/internal/controller/controller.go | 6 ++--- pkg/internal/controller/controller_test.go | 21 +++++++++------- 5 files changed, 37 insertions(+), 33 deletions(-) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index 60b805e4e2..60c010025a 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -60,9 +60,9 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool - // NeedWarmUp indicates whether the controller needs to use warm up. + // EnableWarmup indicates whether the controller needs to use warm up. // Defaults to false, which means the controller will not use warm up. - NeedWarmUp *bool + NeedWarmup *bool // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1ce9ddbf23..2220d04c57 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -94,11 +94,11 @@ type TypedOptions[request comparable] struct { // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue *bool - // ShouldWarmupWithoutLeadership specifies whether the controller should start its sources + // NeedWarmup specifies whether the controller should start its sources // when the manager is not the leader. // Defaults to false, which means that the controller will wait for leader election to start // before starting sources. - ShouldWarmupWithoutLeadership *bool + NeedWarmup *bool } // DefaultFromConfig defaults the config from a config.Controller @@ -130,6 +130,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller if options.NeedLeaderElection == nil { options.NeedLeaderElection = config.NeedLeaderElection } + + if options.NeedWarmup == nil { + options.NeedWarmup = config.NeedWarmup + } } // Controller implements an API. A Controller manages a work queue fed reconcile.Requests @@ -259,7 +263,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, LeaderElected: options.NeedLeaderElection, - ShouldWarmupWithoutLeadership: options.ShouldWarmupWithoutLeadership, + NeedWarmup: options.NeedWarmup, }, nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 48736b07b6..13a2c4046e 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -482,26 +482,26 @@ var _ = Describe("controller.Controller", func() { // Test with ShouldWarmupWithoutLeadership set to true ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ Reconciler: reconcile.Func(nil), - ShouldWarmupWithoutLeadership: ptr.To(true), + NeedWarmup: ptr.To(true), }) Expect(err).NotTo(HaveOccurred()) internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) - Expect(internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) - Expect(*internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).To(BeTrue()) + Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue()) // Test with ShouldWarmupWithoutLeadership set to false ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ Reconciler: reconcile.Func(nil), - ShouldWarmupWithoutLeadership: ptr.To(false), + NeedWarmup: ptr.To(false), }) Expect(err).NotTo(HaveOccurred()) internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) - Expect(internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) - Expect(*internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).To(BeFalse()) + Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse()) // Test with ShouldWarmupWithoutLeadership not set (should default to nil) ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{ @@ -511,14 +511,14 @@ var _ = Describe("controller.Controller", func() { internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) - Expect(internalCtrlWithDefaultWarmup.ShouldWarmupWithoutLeadership).To(BeNil()) + Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil()) }) It("should inherit ShouldWarmupWithoutLeadership from manager config", func() { // Test with manager default setting ShouldWarmupWithoutLeadership to true managerWithWarmup, err := manager.New(cfg, manager.Options{ Controller: config.Controller{ - NeedWarmUp: ptr.To(true), + NeedWarmup: ptr.To(true), }, }) Expect(err).NotTo(HaveOccurred()) @@ -529,23 +529,20 @@ var _ = Describe("controller.Controller", func() { internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) - // Note: This test will fail until the DefaultFromConfig method is updated to set - // ShouldWarmupWithoutLeadership from config.Controller.NeedWarmUp - // This test demonstrates that the feature needs to be completed - Expect(internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) - Expect(*internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).To(BeTrue()) + Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue()) // Test that explicit controller setting overrides manager setting ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ Reconciler: reconcile.Func(nil), - ShouldWarmupWithoutLeadership: ptr.To(false), + NeedWarmup: ptr.To(false), }) Expect(err).NotTo(HaveOccurred()) internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request]) Expect(ok).To(BeTrue()) - Expect(internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil()) - Expect(*internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).To(BeFalse()) + Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil()) + Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse()) }) }) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 35949e0928..6a632c3825 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -99,11 +99,11 @@ type Controller[request comparable] struct { // LeaderElected indicates whether the controller is leader elected or always running. LeaderElected *bool - // ShouldWarmupWithoutLeadership specifies whether the controller should start its sources + // NeedWarmup specifies whether the controller should start its sources // when the manager is not the leader. // Defaults to false, which means that the controller will wait for leader election to start // before starting sources. - ShouldWarmupWithoutLeadership *bool + NeedWarmup *bool } // Reconcile implements reconcile.Reconciler. @@ -155,7 +155,7 @@ func (c *Controller[request]) NeedLeaderElection() bool { // Warmup implements the manager.WarmupRunnable interface. func (c *Controller[request]) Warmup(ctx context.Context) error { - if c.ShouldWarmupWithoutLeadership == nil || !*c.ShouldWarmupWithoutLeadership { + if c.NeedWarmup == nil || !*c.NeedWarmup { return nil } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index b7c3989fd2..cb15a60d7b 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -403,7 +403,7 @@ var _ = Describe("controller", func() { return expectedErr }) - // // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned + // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned ctrl.CacheSyncTimeout = 5 * time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{src} err := ctrl.startEventSources(ctx) @@ -1016,7 +1016,7 @@ var _ = Describe("controller", func() { }) Describe("Warmup", func() { - It("should start event sources when ShouldWarmupWithoutLeadership is true", func() { + It("should start event sources when NeedWarmup is true", func() { // Setup ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1028,8 +1028,9 @@ var _ = Describe("controller", func() { return nil }) + ctrl.CacheSyncTimeout = time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} - ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + ctrl.NeedWarmup = ptr.To(true) // Act err := ctrl.Warmup(ctx) @@ -1040,7 +1041,7 @@ var _ = Describe("controller", func() { Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set") }) - It("should not start event sources when ShouldWarmupWithoutLeadership is false", func() { + It("should not start event sources when NeedWarmup is false", func() { // Setup ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1053,7 +1054,7 @@ var _ = Describe("controller", func() { }) ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} - ctrl.ShouldWarmupWithoutLeadership = ptr.To(false) + ctrl.NeedWarmup = ptr.To(false) // Act err := ctrl.Warmup(ctx) @@ -1064,7 +1065,7 @@ var _ = Describe("controller", func() { Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set") }) - It("should not start event sources when ShouldWarmupWithoutLeadership is nil", func() { + It("should not start event sources when NeedWarmup is nil", func() { // Setup ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1077,7 +1078,7 @@ var _ = Describe("controller", func() { }) ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} - ctrl.ShouldWarmupWithoutLeadership = nil + ctrl.NeedWarmup = nil // Act err := ctrl.Warmup(ctx) @@ -1100,8 +1101,9 @@ var _ = Describe("controller", func() { return nil }) + ctrl.CacheSyncTimeout = time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} - ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + ctrl.NeedWarmup = ptr.To(true) // Act err1 := ctrl.Warmup(ctx) @@ -1125,8 +1127,9 @@ var _ = Describe("controller", func() { return expectedErr }) + ctrl.CacheSyncTimeout = time.Second ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource} - ctrl.ShouldWarmupWithoutLeadership = ptr.To(true) + ctrl.NeedWarmup = ptr.To(true) // Act err := ctrl.Warmup(ctx) From c9b99eb13d7f7d9b49a354460e1fe888280c1635 Mon Sep 17 00:00:00 2001 From: godwinpang Date: Sun, 13 Apr 2025 23:27:50 -0700 Subject: [PATCH 4/5] fmt --- pkg/controller/controller.go | 26 +++++++++++++------------- pkg/controller/controller_test.go | 6 +++--- pkg/manager/runnable_group.go | 10 +++++----- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2220d04c57..f1d8c0e428 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -131,9 +131,9 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller options.NeedLeaderElection = config.NeedLeaderElection } - if options.NeedWarmup == nil { - options.NeedWarmup = config.NeedWarmup - } + if options.NeedWarmup == nil { + options.NeedWarmup = config.NeedWarmup + } } // Controller implements an API. A Controller manages a work queue fed reconcile.Requests @@ -254,16 +254,16 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req // Create controller with dependencies set return &controller.Controller[request]{ - Do: options.Reconciler, - RateLimiter: options.RateLimiter, - NewQueue: options.NewQueue, - MaxConcurrentReconciles: options.MaxConcurrentReconciles, - CacheSyncTimeout: options.CacheSyncTimeout, - Name: name, - LogConstructor: options.LogConstructor, - RecoverPanic: options.RecoverPanic, - LeaderElected: options.NeedLeaderElection, - NeedWarmup: options.NeedWarmup, + Do: options.Reconciler, + RateLimiter: options.RateLimiter, + NewQueue: options.NewQueue, + MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, + Name: name, + LogConstructor: options.LogConstructor, + RecoverPanic: options.RecoverPanic, + LeaderElected: options.NeedLeaderElection, + NeedWarmup: options.NeedWarmup, }, nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 13a2c4046e..53d923fc77 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -481,7 +481,7 @@ var _ = Describe("controller.Controller", func() { // Test with ShouldWarmupWithoutLeadership set to true ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{ - Reconciler: reconcile.Func(nil), + Reconciler: reconcile.Func(nil), NeedWarmup: ptr.To(true), }) Expect(err).NotTo(HaveOccurred()) @@ -493,7 +493,7 @@ var _ = Describe("controller.Controller", func() { // Test with ShouldWarmupWithoutLeadership set to false ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{ - Reconciler: reconcile.Func(nil), + Reconciler: reconcile.Func(nil), NeedWarmup: ptr.To(false), }) Expect(err).NotTo(HaveOccurred()) @@ -534,7 +534,7 @@ var _ = Describe("controller.Controller", func() { // Test that explicit controller setting overrides manager setting ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{ - Reconciler: reconcile.Func(nil), + Reconciler: reconcile.Func(nil), NeedWarmup: ptr.To(false), }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index 21b349529e..3d0d825fbe 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -230,16 +230,16 @@ func (r *runnableGroup) reconcile() { } }() - // If we return, the runnable ended cleanly + // If we return, the runnable ended cleanly // or returned an error to the channel. // // We should always decrement the WaitGroup here. defer r.wg.Done() // Start the runnable. - if err := rn.Start(r.ctx); err != nil { - r.errChan <- err - } + if err := rn.Start(r.ctx); err != nil { + r.errChan <- err + } }(runnable) } } @@ -297,7 +297,7 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { // StopAndWait waits for all the runnables to finish before returning. func (r *runnableGroup) StopAndWait(ctx context.Context) { r.stopOnce.Do(func() { - // Close the reconciler channel once we're done. + // Close the reconciler channel once we're done. defer func() { r.stop.Lock() close(r.ch) From e7a2bbfa72d31c033557118c52441261d1ae5b01 Mon Sep 17 00:00:00 2001 From: godwinpang Date: Sun, 13 Apr 2025 23:51:24 -0700 Subject: [PATCH 5/5] Change to atomic.Bool to avoid race in test. --- pkg/manager/runnable_group_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 35373aa8f8..a211e5119f 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -115,7 +115,7 @@ var _ = Describe("runnables", func() { }) It("should execute the Warmup function when Warmup group is started", func() { - warmupExecuted := false + var warmupExecuted atomic.Bool warmupRunnable := WarmupRunnableFunc{ RunFunc: func(c context.Context) error { @@ -123,7 +123,7 @@ var _ = Describe("runnables", func() { return nil }, WarmupFunc: func(c context.Context) error { - warmupExecuted = true + warmupExecuted.Store(true) return nil }, } @@ -138,7 +138,7 @@ var _ = Describe("runnables", func() { Expect(r.Warmup.Start(ctx)).To(Succeed()) // Verify warmup function was called - Expect(warmupExecuted).To(BeTrue()) + Expect(warmupExecuted.Load()).To(BeTrue()) }) It("should propagate errors from Warmup function to error channel", func() {