Skip to content

Commit 5db61c7

Browse files
committed
[Warm Replicas] Implement warm replica support for controllers.
1 parent 6ad5c1d commit 5db61c7

File tree

10 files changed

+550
-27
lines changed

10 files changed

+550
-27
lines changed

Diff for: pkg/config/controller.go

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type Controller struct {
6060
// Defaults to true, which means the controller will use leader election.
6161
NeedLeaderElection *bool
6262

63+
// NeedWarmUp indicates whether the controller needs to use warm up.
64+
// Defaults to false, which means the controller will not use warm up.
65+
NeedWarmUp *bool
66+
6367
// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
6468
// priority queue.
6569
//

Diff for: pkg/controller/controller.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ type TypedOptions[request comparable] struct {
9393
//
9494
// Note: This flag is disabled by default until a future version. It's currently in beta.
9595
UsePriorityQueue *bool
96+
97+
// ShouldWarmupWithoutLeadership specifies whether the controller should start its sources
98+
// when the manager is not the leader.
99+
// Defaults to false, which means that the controller will wait for leader election to start
100+
// before starting sources.
101+
ShouldWarmupWithoutLeadership *bool
96102
}
97103

98104
// DefaultFromConfig defaults the config from a config.Controller
@@ -244,15 +250,16 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
244250

245251
// Create controller with dependencies set
246252
return &controller.Controller[request]{
247-
Do: options.Reconciler,
248-
RateLimiter: options.RateLimiter,
249-
NewQueue: options.NewQueue,
250-
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
251-
CacheSyncTimeout: options.CacheSyncTimeout,
252-
Name: name,
253-
LogConstructor: options.LogConstructor,
254-
RecoverPanic: options.RecoverPanic,
255-
LeaderElected: options.NeedLeaderElection,
253+
Do: options.Reconciler,
254+
RateLimiter: options.RateLimiter,
255+
NewQueue: options.NewQueue,
256+
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
257+
CacheSyncTimeout: options.CacheSyncTimeout,
258+
Name: name,
259+
LogConstructor: options.LogConstructor,
260+
RecoverPanic: options.RecoverPanic,
261+
LeaderElected: options.NeedLeaderElection,
262+
ShouldWarmupWithoutLeadership: options.ShouldWarmupWithoutLeadership,
256263
}, nil
257264
}
258265

Diff for: pkg/controller/controller_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -474,5 +474,78 @@ var _ = Describe("controller.Controller", func() {
474474
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
475475
Expect(ok).To(BeFalse())
476476
})
477+
478+
It("should set ShouldWarmupWithoutLeadership correctly", func() {
479+
m, err := manager.New(cfg, manager.Options{})
480+
Expect(err).NotTo(HaveOccurred())
481+
482+
// Test with ShouldWarmupWithoutLeadership set to true
483+
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
484+
Reconciler: reconcile.Func(nil),
485+
ShouldWarmupWithoutLeadership: ptr.To(true),
486+
})
487+
Expect(err).NotTo(HaveOccurred())
488+
489+
internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
490+
Expect(ok).To(BeTrue())
491+
Expect(internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil())
492+
Expect(*internalCtrlWithWarmup.ShouldWarmupWithoutLeadership).To(BeTrue())
493+
494+
// Test with ShouldWarmupWithoutLeadership set to false
495+
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
496+
Reconciler: reconcile.Func(nil),
497+
ShouldWarmupWithoutLeadership: ptr.To(false),
498+
})
499+
Expect(err).NotTo(HaveOccurred())
500+
501+
internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
502+
Expect(ok).To(BeTrue())
503+
Expect(internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil())
504+
Expect(*internalCtrlWithoutWarmup.ShouldWarmupWithoutLeadership).To(BeFalse())
505+
506+
// Test with ShouldWarmupWithoutLeadership not set (should default to nil)
507+
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
508+
Reconciler: reconcile.Func(nil),
509+
})
510+
Expect(err).NotTo(HaveOccurred())
511+
512+
internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
513+
Expect(ok).To(BeTrue())
514+
Expect(internalCtrlWithDefaultWarmup.ShouldWarmupWithoutLeadership).To(BeNil())
515+
})
516+
517+
It("should inherit ShouldWarmupWithoutLeadership from manager config", func() {
518+
// Test with manager default setting ShouldWarmupWithoutLeadership to true
519+
managerWithWarmup, err := manager.New(cfg, manager.Options{
520+
Controller: config.Controller{
521+
NeedWarmUp: ptr.To(true),
522+
},
523+
})
524+
Expect(err).NotTo(HaveOccurred())
525+
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
526+
Reconciler: reconcile.Func(nil),
527+
})
528+
Expect(err).NotTo(HaveOccurred())
529+
530+
internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
531+
Expect(ok).To(BeTrue())
532+
// Note: This test will fail until the DefaultFromConfig method is updated to set
533+
// ShouldWarmupWithoutLeadership from config.Controller.NeedWarmUp
534+
// This test demonstrates that the feature needs to be completed
535+
Expect(internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil())
536+
Expect(*internalCtrlInheritingWarmup.ShouldWarmupWithoutLeadership).To(BeTrue())
537+
538+
// Test that explicit controller setting overrides manager setting
539+
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
540+
Reconciler: reconcile.Func(nil),
541+
ShouldWarmupWithoutLeadership: ptr.To(false),
542+
})
543+
Expect(err).NotTo(HaveOccurred())
544+
545+
internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
546+
Expect(ok).To(BeTrue())
547+
Expect(internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).NotTo(BeNil())
548+
Expect(*internalCtrlOverridingWarmup.ShouldWarmupWithoutLeadership).To(BeFalse())
549+
})
477550
})
478551
})

Diff for: pkg/internal/controller/controller.go

+25
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type Controller[request comparable] struct {
8383
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
8484
startWatches []source.TypedSource[request]
8585

86+
// didStartEventSources is used to indicate whether the event sources have been started.
87+
didStartEventSources atomic.Bool
88+
8689
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
8790
// or for example when a watch is started.
8891
// Note: LogConstructor has to be able to handle nil requests as we are also using it
@@ -95,6 +98,12 @@ type Controller[request comparable] struct {
9598

9699
// LeaderElected indicates whether the controller is leader elected or always running.
97100
LeaderElected *bool
101+
102+
// ShouldWarmupWithoutLeadership specifies whether the controller should start its sources
103+
// when the manager is not the leader.
104+
// Defaults to false, which means that the controller will wait for leader election to start
105+
// before starting sources.
106+
ShouldWarmupWithoutLeadership *bool
98107
}
99108

100109
// Reconcile implements reconcile.Reconciler.
@@ -144,6 +153,15 @@ func (c *Controller[request]) NeedLeaderElection() bool {
144153
return *c.LeaderElected
145154
}
146155

156+
// Warmup implements the manager.WarmupRunnable interface.
157+
func (c *Controller[request]) Warmup(ctx context.Context) error {
158+
if c.ShouldWarmupWithoutLeadership == nil || !*c.ShouldWarmupWithoutLeadership {
159+
return nil
160+
}
161+
162+
return c.startEventSources(ctx)
163+
}
164+
147165
// Start implements controller.Controller.
148166
func (c *Controller[request]) Start(ctx context.Context) error {
149167
// use an IIFE to get proper lock handling
@@ -221,6 +239,13 @@ func (c *Controller[request]) Start(ctx context.Context) error {
221239
// startEventSources launches all the sources registered with this controller and waits
222240
// for them to sync. It returns an error if any of the sources fail to start or sync.
223241
func (c *Controller[request]) startEventSources(ctx context.Context) error {
242+
// CAS returns false if value is already true, so early exit since another goroutine must have
243+
// called startEventSources previously
244+
if !c.didStartEventSources.CompareAndSwap(false, true) {
245+
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started")
246+
return nil
247+
}
248+
224249
errGroup := &errgroup.Group{}
225250
for _, watch := range c.startWatches {
226251
log := c.LogConstructor(nil)

Diff for: pkg/internal/controller/controller_test.go

+121
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,127 @@ var _ = Describe("controller", func() {
10141014
})
10151015
})
10161016
})
1017+
1018+
Describe("Warmup", func() {
1019+
It("should start event sources when ShouldWarmupWithoutLeadership is true", func() {
1020+
// Setup
1021+
ctx, cancel := context.WithCancel(context.Background())
1022+
defer cancel()
1023+
1024+
// Create a mock source that we can verify was started
1025+
sourceStarted := false
1026+
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1027+
sourceStarted = true
1028+
return nil
1029+
})
1030+
1031+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1032+
ctrl.ShouldWarmupWithoutLeadership = ptr.To(true)
1033+
1034+
// Act
1035+
err := ctrl.Warmup(ctx)
1036+
1037+
// Assert
1038+
Expect(err).NotTo(HaveOccurred())
1039+
Expect(sourceStarted).To(BeTrue(), "Event source should have been started")
1040+
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
1041+
})
1042+
1043+
It("should not start event sources when ShouldWarmupWithoutLeadership is false", func() {
1044+
// Setup
1045+
ctx, cancel := context.WithCancel(context.Background())
1046+
defer cancel()
1047+
1048+
// Create a mock source that should not be started
1049+
sourceStarted := false
1050+
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1051+
sourceStarted = true
1052+
return nil
1053+
})
1054+
1055+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1056+
ctrl.ShouldWarmupWithoutLeadership = ptr.To(false)
1057+
1058+
// Act
1059+
err := ctrl.Warmup(ctx)
1060+
1061+
// Assert
1062+
Expect(err).NotTo(HaveOccurred())
1063+
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
1064+
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
1065+
})
1066+
1067+
It("should not start event sources when ShouldWarmupWithoutLeadership is nil", func() {
1068+
// Setup
1069+
ctx, cancel := context.WithCancel(context.Background())
1070+
defer cancel()
1071+
1072+
// Create a mock source that should not be started
1073+
sourceStarted := false
1074+
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1075+
sourceStarted = true
1076+
return nil
1077+
})
1078+
1079+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1080+
ctrl.ShouldWarmupWithoutLeadership = nil
1081+
1082+
// Act
1083+
err := ctrl.Warmup(ctx)
1084+
1085+
// Assert
1086+
Expect(err).NotTo(HaveOccurred())
1087+
Expect(sourceStarted).To(BeFalse(), "Event source should not have been started")
1088+
Expect(ctrl.didStartEventSources.Load()).To(BeFalse(), "didStartEventSources flag should not be set")
1089+
})
1090+
1091+
It("should not start event sources twice when called multiple times", func() {
1092+
// Setup
1093+
ctx, cancel := context.WithCancel(context.Background())
1094+
defer cancel()
1095+
1096+
// Create a mock source that counts how many times it's started
1097+
startCount := 0
1098+
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1099+
startCount++
1100+
return nil
1101+
})
1102+
1103+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1104+
ctrl.ShouldWarmupWithoutLeadership = ptr.To(true)
1105+
1106+
// Act
1107+
err1 := ctrl.Warmup(ctx)
1108+
err2 := ctrl.Warmup(ctx)
1109+
1110+
// Assert
1111+
Expect(err1).NotTo(HaveOccurred())
1112+
Expect(err2).NotTo(HaveOccurred())
1113+
Expect(startCount).To(Equal(1), "Event source should have been started only once")
1114+
Expect(ctrl.didStartEventSources.Load()).To(BeTrue(), "didStartEventSources flag should be set")
1115+
})
1116+
1117+
It("should propagate errors from event sources", func() {
1118+
// Setup
1119+
ctx, cancel := context.WithCancel(context.Background())
1120+
defer cancel()
1121+
1122+
// Create a mock source that returns an error
1123+
expectedErr := errors.New("test error")
1124+
mockSource := source.Func(func(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1125+
return expectedErr
1126+
})
1127+
1128+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{mockSource}
1129+
ctrl.ShouldWarmupWithoutLeadership = ptr.To(true)
1130+
1131+
// Act
1132+
err := ctrl.Warmup(ctx)
1133+
1134+
// Assert
1135+
Expect(err).To(MatchError(expectedErr))
1136+
})
1137+
})
10171138
})
10181139

10191140
var _ = Describe("ReconcileIDFromContext function", func() {

Diff for: pkg/manager/internal.go

+9
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
439439
return fmt.Errorf("failed to start other runnables: %w", err)
440440
}
441441

442+
// Start and wait for sources to start.
443+
if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil {
444+
return fmt.Errorf("failed to start warmup runnables: %w", err)
445+
}
446+
442447
// Start the leader election and all required runnables.
443448
{
444449
ctx, cancel := context.WithCancel(context.Background())
@@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
544549
cm.runnables.LeaderElection.startOnce.Do(func() {})
545550
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)
546551

552+
// Stop the warmup runnables
553+
cm.logger.Info("Stopping and waiting for warmup runnables")
554+
cm.runnables.Warmup.StopAndWait(cm.shutdownCtx)
555+
547556
// Stop the caches before the leader election runnables, this is an important
548557
// step to make sure that we don't race with the reconcilers by receiving more events
549558
// from the API servers and enqueueing them.

Diff for: pkg/manager/manager.go

+6
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ type LeaderElectionRunnable interface {
314314
NeedLeaderElection() bool
315315
}
316316

317+
// WarmupRunnable knows if a Runnable should be a warmup runnable.
318+
type WarmupRunnable interface {
319+
// Warmup returns true if the Runnable should be run as warmup.
320+
Warmup(context.Context) error
321+
}
322+
317323
// New returns a new Manager for creating Controllers.
318324
// Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf"
319325
// will be used for all built-in resources of Kubernetes, and "application/json" is for other types

0 commit comments

Comments
 (0)