Skip to content

✨ [Warm Replicas] Extract startWatches into helper method. #3190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 60 additions & 54 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,60 +179,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intended
// caches.
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})

if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
}
didStartSyncingSource := &atomic.Bool{}
errGroup.Go(func() error {
// Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
go func() {
defer close(sourceStartErrChan)
log.Info("Starting EventSource")
if err := watch.Start(ctx, c.Queue); err != nil {
sourceStartErrChan <- err
return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
}
}()

select {
case err := <-sourceStartErrChan:
return err
case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
})
}
if err := errGroup.Wait(); err != nil {
if err := c.startEventSources(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -271,6 +218,65 @@ func (c *Controller[request]) Start(ctx context.Context) error {
return nil
}

// startEventSources launches all the sources registered with this controller and waits
// for them to sync. It returns an error if any of the sources fail to start or sync.
func (c *Controller[request]) startEventSources(ctx context.Context) error {
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})

if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
}
didStartSyncingSource := &atomic.Bool{}
errGroup.Go(func() error {
// Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
go func() {
defer close(sourceStartErrChan)
log.Info("Starting EventSource")
if err := watch.Start(ctx, c.Queue); err != nil {
sourceStartErrChan <- err
return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
}
}()

select {
case err := <-sourceStartErrChan:
return err
case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
})
}
return errGroup.Wait()
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
Expand Down
117 changes: 117 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,123 @@ var _ = Describe("controller", func() {
})
})

Describe("startEventSources", func() {
It("should return nil when no sources are provided", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.startWatches = []source.TypedSource[reconcile.Request]{}
err := ctrl.startEventSources(ctx)
Expect(err).NotTo(HaveOccurred())
})

It("should return an error if a source fails to start", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

expectedErr := fmt.Errorf("failed to start source")
src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
// Return the error immediately so we don't get a timeout
return expectedErr
})

// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
ctrl.CacheSyncTimeout = 5 * time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
err := ctrl.startEventSources(ctx)
Expect(err).To(Equal(expectedErr))
})

It("should return an error if a source fails to sync", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Kind(&informertest.FakeInformers{Synced: ptr.To(false)}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
}
ctrl.Name = "test-controller"
ctrl.CacheSyncTimeout = 5 * time.Second

err := ctrl.startEventSources(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to wait for test-controller caches to sync"))
})

It("should not return an error when sources start and sync successfully", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a source that starts and syncs successfully
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Kind(&informertest.FakeInformers{Synced: ptr.To(true)}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
}
ctrl.Name = "test-controller"
ctrl.CacheSyncTimeout = 5 * time.Second

err := ctrl.startEventSources(ctx)
Expect(err).NotTo(HaveOccurred())
})

It("should not return an error when context is cancelled during source sync", func() {
sourceCtx, sourceCancel := context.WithCancel(context.Background())
defer sourceCancel()

ctrl.CacheSyncTimeout = 5 * time.Second

// Create a bisignallingSource to control the test flow
src := &bisignallingSource[reconcile.Request]{
startCall: make(chan workqueue.TypedRateLimitingInterface[reconcile.Request]),
startDone: make(chan error, 1),
waitCall: make(chan struct{}),
waitDone: make(chan error, 1),
}

ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}

// Start the sources in a goroutine
startErrCh := make(chan error)
go func() {
startErrCh <- ctrl.startEventSources(sourceCtx)
}()

// Allow source to start successfully
Eventually(src.startCall).Should(Receive())
src.startDone <- nil

// Wait for WaitForSync to be called
Eventually(src.waitCall).Should(BeClosed())

// Return context.Canceled from WaitForSync
src.waitDone <- context.Canceled

// Also cancel the context
sourceCancel()

// We expect to receive the context.Canceled error
err := <-startErrCh
Expect(err).To(MatchError(context.Canceled))
})

It("should timeout if source Start blocks for too long", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = 1 * time.Millisecond

// Create a source that blocks forever in Start
blockingSrc := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
<-ctx.Done()
return ctx.Err()
})

ctrl.startWatches = []source.TypedSource[reconcile.Request]{blockingSrc}

err := ctrl.startEventSources(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("timed out waiting for source"))
})
})

Describe("Processing queue items from a Controller", func() {
It("should call Reconciler if an item is enqueued", func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading