From 405f7f436085e6be45086e81c7d0d91673357798 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Thu, 12 Dec 2024 17:27:33 +0000 Subject: [PATCH 1/5] First pass at watch bookmarks. --- libcalico-go/lib/backend/api/api.go | 4 +- .../lib/backend/k8s/resources/profile.go | 7 ++ .../lib/backend/k8s/resources/resources.go | 5 +- .../lib/backend/k8s/resources/watcher.go | 12 +++- .../lib/backend/k8s/resources/watcher_test.go | 2 +- .../lib/backend/watchersyncer/watchercache.go | 72 ++++++++++++++----- 6 files changed, 78 insertions(+), 24 deletions(-) diff --git a/libcalico-go/lib/backend/api/api.go b/libcalico-go/lib/backend/api/api.go index d6b45441bb9..fa3fb4bbbfb 100644 --- a/libcalico-go/lib/backend/api/api.go +++ b/libcalico-go/lib/backend/api/api.go @@ -126,7 +126,8 @@ type Client interface { } type WatchOptions struct { - Revision string + Revision string + AllowWatchBookmarks bool } type Syncer interface { @@ -200,6 +201,7 @@ const ( WatchModified WatchEventType = "MODIFIED" WatchDeleted WatchEventType = "DELETED" WatchError WatchEventType = "ERROR" + WatchBookmark WatchEventType = "BOOKMARK" ) // Event represents a single event to a watched resource. diff --git a/libcalico-go/lib/backend/k8s/resources/profile.go b/libcalico-go/lib/backend/k8s/resources/profile.go index c984c859d9c..16c553503e4 100644 --- a/libcalico-go/lib/backend/k8s/resources/profile.go +++ b/libcalico-go/lib/backend/k8s/resources/profile.go @@ -457,6 +457,13 @@ func (pw *profileWatcher) processProfileEvents() { switch e.Type { case api.WatchModified, api.WatchAdded: value = e.New.Value + case api.WatchBookmark: + if isNsEvent { + pw.k8sNSRev = e.New.Revision + } else { + pw.k8sSARev = e.New.Revision + } + e.New.Revision = pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev) case api.WatchDeleted: value = e.Old.Value } diff --git a/libcalico-go/lib/backend/k8s/resources/resources.go b/libcalico-go/lib/backend/k8s/resources/resources.go index 3231bad203f..5d569f1afac 100644 --- a/libcalico-go/lib/backend/k8s/resources/resources.go +++ b/libcalico-go/lib/backend/k8s/resources/resources.go @@ -325,7 +325,8 @@ func ConvertK8sResourceToCalicoResource(res Resource) error { func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions { return metav1.ListOptions{ - ResourceVersion: wo.Revision, - Watch: true, + ResourceVersion: wo.Revision, + Watch: true, + AllowWatchBookmarks: wo.AllowWatchBookmarks, } } diff --git a/libcalico-go/lib/backend/k8s/resources/watcher.go b/libcalico-go/lib/backend/k8s/resources/watcher.go index 63fb38086be..33219627bab 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher.go @@ -166,7 +166,17 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv } return crw.buildEventsFromKVPs(kvps, kevent.Type) - + case kwatch.Bookmark: + // For bookmarks we send an empty KVPair with the current resource + // version only. + k8sRes := kevent.Object.(Resource) + revision := k8sRes.GetObjectMeta().GetResourceVersion() + return []*api.WatchEvent{{ + Type: api.WatchBookmark, + New: &model.KVPair{ + Revision: revision, + }, + }} default: return []*api.WatchEvent{{ Type: api.WatchError, diff --git a/libcalico-go/lib/backend/k8s/resources/watcher_test.go b/libcalico-go/lib/backend/k8s/resources/watcher_test.go index 2552d91f142..ed148e3564c 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher_test.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher_test.go @@ -48,7 +48,7 @@ var _ = Describe("Resources watcher ", func() { It("should return error WatchEvent with unexpected kwatch event type", func() { events := kwc.convertEvent(kwatch.Event{ - Type: kwatch.Bookmark, + Type: "GARBAGE", }) Expect(events).To(HaveLen(1)) Expect(events[0].Type).To(Equal(api.WatchError)) diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index 41975d230d3..61df6cc4a15 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -36,18 +36,23 @@ import ( // - An api.Update // - A api.SyncStatus (only for the very first InSync notification) type watcherCache struct { - logger *logrus.Entry - client api.Client - watch api.WatchInterface - resources map[string]cacheEntry - oldResources map[string]cacheEntry - results chan<- interface{} - hasSynced bool - resourceType ResourceType - currentWatchRevision string - resyncBlockedUntil time.Time + logger *logrus.Entry + client api.Client + watch api.WatchInterface + resources map[string]cacheEntry + oldResources map[string]cacheEntry + results chan<- interface{} + hasSynced bool + resourceType ResourceType + currentWatchRevision string + errorCountAtCurrentRev int + resyncBlockedUntil time.Time } +const ( + maxErrorsPerRevision = 5 +) + var ( MinResyncInterval = 500 * time.Millisecond ListRetryInterval = 1000 * time.Millisecond @@ -117,13 +122,22 @@ mainLoop: } kvp.Value = nil wc.handleWatchListEvent(kvp) + case api.WatchBookmark: + wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received") + wc.currentWatchRevision = event.New.Revision + wc.errorCountAtCurrentRev = 0 case api.WatchError: // Handle a WatchError. This error triggered from upstream, all type // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info // because errors may occur due to compaction causing revisions to no longer be valid - in this case // we simply need to do a full resync. - wc.logger.WithError(event.Error).Infof("Watch error received from Upstream") - wc.currentWatchRevision = "0" + wc.logger.WithError(event.Error).Info("Watch error event received, restarting the watch.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Too many errors at the current revision, trigger a full resync. + wc.logger.Warn("Too many errors at current revision, triggering full resync") + wc.resetWatchRevisionForFullResync() + } wc.resyncAndCreateWatcher(ctx) default: // Unknown event type - not much we can do other than log. @@ -159,7 +173,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { wc.cleanExistingWatcher() // If we don't have a currentWatchRevision then we need to perform a full resync. - performFullResync := wc.currentWatchRevision == "0" + var performFullResync bool for { select { case <-ctx.Done(): @@ -177,6 +191,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // watch immediately ends. wc.resyncBlockedUntil = time.Now().Add(MinResyncInterval) + performFullResync = performFullResync || wc.currentWatchRevision == "0" if performFullResync { wc.logger.Info("Full resync is required") @@ -195,7 +210,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if errors.IsResourceExpired(err) { // Our current watch revision is too old. Start again without a revision. wc.logger.Info("Clearing cached watch revision for next List call") - wc.currentWatchRevision = "0" + wc.resetWatchRevisionForFullResync() } wc.resyncBlockedUntil = time.Now().Add(ListRetryInterval) continue @@ -233,6 +248,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { wc.logger.Panic("BUG: List returned items with empty/zero revision. Watch would be inconsistent.") } wc.currentWatchRevision = l.Revision + wc.errorCountAtCurrentRev = 0 // Mark the resync as complete. performFullResync = false @@ -241,9 +257,17 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // And now start watching from the revision returned by the List, or from a previous watch event // (depending on whether we were performing a full resync). w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{ - Revision: wc.currentWatchRevision, + Revision: wc.currentWatchRevision, + AllowWatchBookmarks: true, }) if err != nil { + if errors.IsResourceExpired(err) { + // Our current watch revision is too old. Start again without a revision. + wc.logger.Info("Watch has expired, queueing full resync.") + wc.resetWatchRevisionForFullResync() + continue + } + // Failed to create the watcher - we'll need to retry. switch err.(type) { case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist: @@ -251,7 +275,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // doesn't support it, or because there are no resources to watch yet (and Kubernetes won't // let us watch if there are no resources yet). Pause for the watch poll interval. // This loop effectively becomes a poll loop for this resource type. - wc.logger.Debug("Watch operation not supported") + wc.logger.Debug("Watch operation not supported; reverting to poll.") wc.resyncBlockedUntil = time.Now().Add(WatchPollInterval) // Make sure we force a re-list of the resource even if the watch previously succeeded @@ -260,9 +284,13 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { continue } - // We hit an error creating the Watch. Trigger a full resync. - wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher") - performFullResync = true + wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn( + "Failed to create watcher; will retry.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Hitting repeated errors, try a full resync next time. + performFullResync = true + } continue } @@ -273,6 +301,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { } } +func (wc *watcherCache) resetWatchRevisionForFullResync() { + wc.currentWatchRevision = "0" + wc.errorCountAtCurrentRev = 0 +} + var closedTimeC = make(chan time.Time) func init() { @@ -338,6 +371,7 @@ func (wc *watcherCache) finishResync() { func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { // Track the resource version from this watch/list event. wc.currentWatchRevision = kvp.Revision + wc.errorCountAtCurrentRev = 0 if wc.resourceType.UpdateProcessor == nil { // No update processor - handle immediately. From 508aedb9f343a7f3ae5f4dbb4cd9c66ab10de112 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Fri, 13 Dec 2024 17:45:21 +0000 Subject: [PATCH 2/5] Tune error handling in watchercache. - Respond to too old errors on the watch stream. - Handle connection refused without resetting the watch. --- .../lib/backend/k8s/resources/profile.go | 2 +- .../lib/backend/watchersyncer/watchercache.go | 38 +++++++++++++------ libcalico-go/lib/errors/errors.go | 20 ++++++++++ 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/libcalico-go/lib/backend/k8s/resources/profile.go b/libcalico-go/lib/backend/k8s/resources/profile.go index 16c553503e4..f5abfd22f42 100644 --- a/libcalico-go/lib/backend/k8s/resources/profile.go +++ b/libcalico-go/lib/backend/k8s/resources/profile.go @@ -484,7 +484,7 @@ func (pw *profileWatcher) processProfileEvents() { } oma.GetObjectMeta().SetResourceVersion(pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev)) } - } else if e.Error == nil { + } else if e.Error == nil && e.Type != api.WatchBookmark { log.WithField("event", e).Warning("Event without error or value") } diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index 61df6cc4a15..f3432f3671e 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -16,10 +16,12 @@ package watchersyncer import ( "context" + "errors" + "syscall" "time" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" @@ -131,12 +133,19 @@ mainLoop: // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info // because errors may occur due to compaction causing revisions to no longer be valid - in this case // we simply need to do a full resync. - wc.logger.WithError(event.Error).Info("Watch error event received, restarting the watch.") - wc.errorCountAtCurrentRev++ - if wc.errorCountAtCurrentRev > maxErrorsPerRevision { - // Too many errors at the current revision, trigger a full resync. - wc.logger.Warn("Too many errors at current revision, triggering full resync") + if kerrors.IsResourceExpired(event.Error) { + // Our current watch revision is too old. We hit this path after the API server restarts + // (and presumably does an immediate compaction). + wc.logger.WithError(event.Error).Info("Watch has expired, triggering full resync.") wc.resetWatchRevisionForFullResync() + } else { + wc.logger.WithError(event.Error).Warn("Unknown watch error event received, restarting the watch.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Too many errors at the current revision, trigger a full resync. + wc.logger.Warn("Too many errors at current revision, triggering full resync") + wc.resetWatchRevisionForFullResync() + } } wc.resyncAndCreateWatcher(ctx) default: @@ -207,7 +216,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if err != nil { // Failed to perform the list. Pause briefly (so we don't tight loop) and retry. wc.logger.WithError(err).Info("Failed to perform list of current data during resync") - if errors.IsResourceExpired(err) { + if kerrors.IsResourceExpired(err) { // Our current watch revision is too old. Start again without a revision. wc.logger.Info("Clearing cached watch revision for next List call") wc.resetWatchRevisionForFullResync() @@ -261,16 +270,23 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { AllowWatchBookmarks: true, }) if err != nil { - if errors.IsResourceExpired(err) { + if kerrors.IsResourceExpired(err) { // Our current watch revision is too old. Start again without a revision. wc.logger.Info("Watch has expired, queueing full resync.") wc.resetWatchRevisionForFullResync() continue } - // Failed to create the watcher - we'll need to retry. - switch err.(type) { - case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist: + if errors.Is(err, syscall.ECONNREFUSED) { + // Server is down, retry after a short delay. We don't want to + // reset the watch revision since the server is down; it's not our + // fault. + wc.logger.WithError(err).Warn("API server refused connection, will retry.") + continue + } + + if errors.Is(err, cerrors.ErrorOperationNotSupported{}) || + errors.Is(err, cerrors.ErrorResourceDoesNotExist{}) { // Watch is not supported on this resource type, either because the type fundamentally // doesn't support it, or because there are no resources to watch yet (and Kubernetes won't // let us watch if there are no resources yet). Pause for the watch poll interval. diff --git a/libcalico-go/lib/errors/errors.go b/libcalico-go/lib/errors/errors.go index 39072438485..e51669ec327 100644 --- a/libcalico-go/lib/errors/errors.go +++ b/libcalico-go/lib/errors/errors.go @@ -29,6 +29,10 @@ type ErrorDatastoreError struct { Identifier interface{} } +func (e ErrorDatastoreError) Unwrap() error { + return e.Err +} + func (e ErrorDatastoreError) Error() string { return e.Err.Error() } @@ -61,6 +65,10 @@ func (e ErrorResourceDoesNotExist) Error() string { return fmt.Sprintf("resource does not exist: %v with error: %v", e.Identifier, e.Err) } +func (e ErrorResourceDoesNotExist) Unwrap() error { + return e.Err +} + // Error indicating an operation is not supported. type ErrorOperationNotSupported struct { Operation string @@ -83,6 +91,10 @@ type ErrorResourceAlreadyExists struct { Identifier interface{} } +func (e ErrorResourceAlreadyExists) Unwrap() error { + return e.Err +} + func (e ErrorResourceAlreadyExists) Error() string { return fmt.Sprintf("resource already exists: %v", e.Identifier) } @@ -92,6 +104,10 @@ type ErrorConnectionUnauthorized struct { Err error } +func (e ErrorConnectionUnauthorized) Unwrap() error { + return e.Err +} + func (e ErrorConnectionUnauthorized) Error() string { return fmt.Sprintf("connection is unauthorized: %v", e.Err) } @@ -151,6 +167,10 @@ type ErrorResourceUpdateConflict struct { Identifier interface{} } +func (e ErrorResourceUpdateConflict) Unwrap() error { + return e.Err +} + func (e ErrorResourceUpdateConflict) Error() string { return fmt.Sprintf("update conflict: %v", e.Identifier) } From 07163f8a4b8ab851706d95bd7ad3d6bc7ebfd031 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Fri, 20 Dec 2024 13:30:01 +0000 Subject: [PATCH 3/5] Improve error handling in watchercache. --- .../backend/watchersyncer/reflector_ports.go | 54 +++++++++++++++++++ .../lib/backend/watchersyncer/watchercache.go | 20 +++---- 2 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 libcalico-go/lib/backend/watchersyncer/reflector_ports.go diff --git a/libcalico-go/lib/backend/watchersyncer/reflector_ports.go b/libcalico-go/lib/backend/watchersyncer/reflector_ports.go new file mode 100644 index 00000000000..85f690920e7 --- /dev/null +++ b/libcalico-go/lib/backend/watchersyncer/reflector_ports.go @@ -0,0 +1,54 @@ +// Extracted from on Kubernetes reflector.go, Copyright 2014 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watchersyncer + +import ( + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Copied from client-go's reflector.go +func isTooLargeResourceVersionError(err error) bool { + if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) { + return true + } + // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to + // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource + // version is larger than the largest currently available resource version. To ensure backward + // compatibility with these server versions we also need to detect the error based on the content + // of the error message field. + if !apierrors.IsTimeout(err) { + return false + } + apierr, ok := err.(apierrors.APIStatus) + if !ok || apierr == nil || apierr.Status().Details == nil { + return false + } + for _, cause := range apierr.Status().Details.Causes { + // Matches the message returned by api server 1.17.0-1.18.5 for this error condition + if cause.Message == "Too large resource version" { + return true + } + } + + // Matches the message returned by api server before 1.17.0 + if strings.Contains(apierr.Status().Message, "Too large resource version") { + return true + } + + return false +} diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index f3432f3671e..476e8bcad60 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -17,11 +17,11 @@ package watchersyncer import ( "context" "errors" - "syscall" "time" "github.com/sirupsen/logrus" kerrors "k8s.io/apimachinery/pkg/api/errors" + utilnet "k8s.io/apimachinery/pkg/util/net" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" @@ -270,23 +270,25 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { AllowWatchBookmarks: true, }) if err != nil { - if kerrors.IsResourceExpired(err) { - // Our current watch revision is too old. Start again without a revision. + if kerrors.IsResourceExpired(err) || kerrors.IsGone(err) || isTooLargeResourceVersionError(err) { + // Our current watch revision is too old (or too new!). Start again + // without a revision. Condition cribbed from client-go's reflector. wc.logger.Info("Watch has expired, queueing full resync.") wc.resetWatchRevisionForFullResync() continue } - if errors.Is(err, syscall.ECONNREFUSED) { - // Server is down, retry after a short delay. We don't want to - // reset the watch revision since the server is down; it's not our - // fault. + if utilnet.IsConnectionRefused(err) || kerrors.IsTooManyRequests(err) { + // Connection-related error, we can just retry without resetting + // the watch. Condition cribbed from client-go's reflector. wc.logger.WithError(err).Warn("API server refused connection, will retry.") continue } - if errors.Is(err, cerrors.ErrorOperationNotSupported{}) || - errors.Is(err, cerrors.ErrorResourceDoesNotExist{}) { + var errNotSupp cerrors.ErrorOperationNotSupported + var errNotExist cerrors.ErrorResourceDoesNotExist + if errors.As(err, &errNotSupp) || + errors.As(err, &errNotExist) { // Watch is not supported on this resource type, either because the type fundamentally // doesn't support it, or because there are no resources to watch yet (and Kubernetes won't // let us watch if there are no resources yet). Pause for the watch poll interval. From 2f8942443fb7d0385ae18adea74f04b6a34a6aec Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Wed, 12 Feb 2025 18:12:06 +0000 Subject: [PATCH 4/5] Fix up watcher syncer tests. --- .../lib/backend/watchersyncer/watchercache.go | 44 ++-- .../backend/watchersyncer/watchersyncer.go | 2 +- .../watchersyncer/watchersyncer_test.go | 241 +++++++++++++++--- libcalico-go/lib/testutils/syncertester.go | 3 +- 4 files changed, 231 insertions(+), 59 deletions(-) diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index 476e8bcad60..592d32fec4e 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -52,7 +52,7 @@ type watcherCache struct { } const ( - maxErrorsPerRevision = 5 + MaxErrorsPerRevision = 5 ) var ( @@ -125,26 +125,24 @@ mainLoop: kvp.Value = nil wc.handleWatchListEvent(kvp) case api.WatchBookmark: - wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received") - wc.currentWatchRevision = event.New.Revision - wc.errorCountAtCurrentRev = 0 + wc.handleWatchBookmark(event) case api.WatchError: - // Handle a WatchError. This error triggered from upstream, all type - // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info - // because errors may occur due to compaction causing revisions to no longer be valid - in this case - // we simply need to do a full resync. if kerrors.IsResourceExpired(event.Error) { - // Our current watch revision is too old. We hit this path after the API server restarts - // (and presumably does an immediate compaction). + // Our current watch revision is too old. Even with watch bookmarks, we hit this path after the + // API server restarts (and presumably does an immediate compaction). wc.logger.WithError(event.Error).Info("Watch has expired, triggering full resync.") wc.resetWatchRevisionForFullResync() } else { - wc.logger.WithError(event.Error).Warn("Unknown watch error event received, restarting the watch.") + // Unknown error, default is to just try restarting the watch on assumption that it's + // a connectivity issue. Note that, if the error recurs when recreating the watch, we will + // check for various expected connectivity failure conditions and handle them there. wc.errorCountAtCurrentRev++ - if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + if wc.errorCountAtCurrentRev >= MaxErrorsPerRevision { // Too many errors at the current revision, trigger a full resync. - wc.logger.Warn("Too many errors at current revision, triggering full resync") + wc.logger.WithError(event.Error).Warn("Watch repeatedly failed without making progress, triggering full resync") wc.resetWatchRevisionForFullResync() + } else { + wc.logger.WithError(event.Error).Info("Watch of resource finished. Attempting to restart it...") } } wc.resyncAndCreateWatcher(ctx) @@ -184,6 +182,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // If we don't have a currentWatchRevision then we need to perform a full resync. var performFullResync bool for { + start := time.Now() select { case <-ctx.Done(): wc.logger.Debug("Context is done. Returning") @@ -193,7 +192,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // Start the resync. This processing loops until we create the watcher. If the // watcher continuously fails then this loop effectively becomes a polling based // syncer. - wc.logger.Debug("Starting main resync loop") + wc.logger.Debugf("Starting main resync loop after delay %v", time.Since(start)) } // Avoid tight loop in unexpected failure scenarios. For example, if creating the watch succeeds but the @@ -265,6 +264,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // And now start watching from the revision returned by the List, or from a previous watch event // (depending on whether we were performing a full resync). + wc.logger.WithField("revision", wc.currentWatchRevision).Debug("Starting watch from revision") w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{ Revision: wc.currentWatchRevision, AllowWatchBookmarks: true, @@ -302,10 +302,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { continue } - wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn( - "Failed to create watcher; will retry.") + // None of our expected errors, retry a few times before we give up and try a full resync. wc.errorCountAtCurrentRev++ - if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + wc.logger.WithError(err).WithField("performFullResync", performFullResync).WithField("errorsWithoutProgress", wc.errorCountAtCurrentRev).Warn( + "Failed to create watcher; will retry.") + if wc.errorCountAtCurrentRev >= MaxErrorsPerRevision { // Hitting repeated errors, try a full resync next time. performFullResync = true } @@ -409,6 +410,15 @@ func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { } } +// handleWatchBookmark handles a bookmark event from the API server, these +// update the revision that we should be watching from without sending +// a KVP. This prevents datastore compactions from invalidating our watches. +func (wc *watcherCache) handleWatchBookmark(event api.WatchEvent) { + wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received") + wc.currentWatchRevision = event.New.Revision + wc.errorCountAtCurrentRev = 0 +} + // handleConvertedWatchEvent handles a converted watch event fanning out // to the add/mod or delete processing as necessary. func (wc *watcherCache) handleConvertedWatchEvent(kvp *model.KVPair) { diff --git a/libcalico-go/lib/backend/watchersyncer/watchersyncer.go b/libcalico-go/lib/backend/watchersyncer/watchersyncer.go index 439befeafa8..83720515098 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchersyncer.go +++ b/libcalico-go/lib/backend/watchersyncer/watchersyncer.go @@ -32,7 +32,7 @@ const ( // ResourceType groups together the watch and conversion information for a // specific resource type. type ResourceType struct { - // ListInterface specifies the resource type to watch\. + // ListInterface specifies the resource type to watch. ListInterface model.ListInterface // UpdateProcessor converts the raw KVPairs returned from the datastore into the appropriate diff --git a/libcalico-go/lib/backend/watchersyncer/watchersyncer_test.go b/libcalico-go/lib/backend/watchersyncer/watchersyncer_test.go index 475411e3ea5..0d293955829 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchersyncer_test.go +++ b/libcalico-go/lib/backend/watchersyncer/watchersyncer_test.go @@ -25,6 +25,8 @@ import ( . "github.com/onsi/gomega" apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" log "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + apierrors "k8s.io/apimachinery/pkg/api/errors" kerrors "k8s.io/apimachinery/pkg/api/errors" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" @@ -76,7 +78,8 @@ var ( } notSupported = cerrors.ErrorOperationNotSupported{} notExists = cerrors.ErrorResourceDoesNotExist{} - tooOldRV = kerrors.NewResourceExpired("test error") + k8sTooOldRV = kerrors.NewResourceExpired("test error") + caliTooOldRV = apierrors.FromObject(&k8sTooOldRV.ErrStatus) // Our wrapper around above error. genError = errors.New("Generic error") ) @@ -143,18 +146,20 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { // Temporarily reduce the watch and list poll interval to make the tests faster. // Since we are timing the processing, we still need the interval to be sufficiently // large to make the measurements more accurate. - defer setWatchIntervals(watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) - setWatchIntervals(500*time.Millisecond, 2000*time.Millisecond) + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + By("Getting to the initial in-sync") rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) rs.ExpectStatusUpdate(api.WaitForDatastore) - rs.clientListResponse(r1, tooOldRV) + rs.clientListResponse(r1, k8sTooOldRV) rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) rs.ExpectStatusUpdate(api.InSync) // Send a watch error. This will trigger a re-list from the revision // the watcher cache has stored. + By("Sending watch error but no too-old error") rs.clientWatchResponse(r1, notSupported) rs.clientListResponse(r1, emptyList) @@ -165,8 +170,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { // Send a watch error, followed by a resource version too old error // on the list. This should trigger the watcher cache to retry the list // without a revision. - rs.clientWatchResponse(r1, genError) - rs.clientListResponse(r1, tooOldRV) + By("Sending watch error and too-old error") + rs.clientWatchResponse(r1, k8sTooOldRV) + rs.clientListResponse(r1, k8sTooOldRV) Eventually(rs.fc.getLatestListRevision, 5*time.Second, 100*time.Millisecond).Should(Equal("0")) // Simulate a successful list using the 0 revision - we should see the watch started from the correct @@ -176,14 +182,14 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { }) It("should handle reconnection if watchers fail to be created", func() { - rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) - rs.ExpectStatusUpdate(api.WaitForDatastore) - // Temporarily reduce the watch and list poll interval to make the tests faster. // Since we are timing the processing, we still need the interval to be sufficiently // large to make the measurements more accurate. - defer setWatchIntervals(watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) - setWatchIntervals(500*time.Millisecond, 2000*time.Millisecond) + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) + rs.ExpectStatusUpdate(api.WaitForDatastore) // All of the events should have been consumed within a time frame dictated by the // list retry and poll timers. @@ -212,12 +218,12 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { before := time.Now() rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.clientWatchResponse(r1, genError) + rs.clientWatchResponse(r1, caliTooOldRV) // Has to be a too-old error to trigger immediate re-list. rs.ExpectStatusUnchanged() rs.clientListResponse(r1, emptyList) rs.ExpectStatusUnchanged() - // rs.ExpectStatusUpdate(api.InSync) rs.clientWatchResponse(r1, nil) + rs.clientListResponse(r2, emptyList) rs.clientWatchResponse(r2, notSupported) rs.clientListResponse(r2, emptyList) @@ -271,9 +277,67 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { Expect(duration).To(BeNumerically("<", maxDuration)) }) + It("should require several unknown watch failures to trigger re-List()", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + // Since we are timing the processing, we still need the interval to be sufficiently + // large to make the measurements more accurate. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) + rs.ExpectStatusUpdate(api.WaitForDatastore) + + // All of the events should have been consumed within a time frame dictated by the + // list retry and poll timers. + // + // For resource 1, the client responses should be: + // - list succeeds + // - watch fails gen error x 5 (short retry) + // - list succeeds + // - watch succeeds ... + By("Failing watch with an unknown error repeatedly.") + expectedDuration := watchersyncer.MinResyncInterval * 5 + minDuration := 70 * expectedDuration / 100 + maxDuration := 150 * expectedDuration / 100 + before := time.Now() + rs.clientListResponse(r1, emptyList) + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUpdate(api.InSync) + // Expect MaxErrorsPerRevision unknown errors before triggering a re-list. + for range watchersyncer.MaxErrorsPerRevision { + rs.clientWatchResponse(r1, genError) + } + rs.ExpectStatusUnchanged() + rs.clientListResponse(r1, emptyList) + rs.ExpectStatusUnchanged() + rs.clientWatchResponse(r1, nil) + + By("Expecting the time for all events to be handled is within a sensible window") + for i := time.Duration(0); i < maxDuration/(10*time.Millisecond); i++ { + if rs.allEventsHandled() { + break + } + time.Sleep(minDuration / 50) + } + duration := time.Now().Sub(before) + rs.expectAllEventsHandled() + Expect(duration).To(BeNumerically(">", minDuration)) + Expect(duration).To(BeNumerically("<", maxDuration)) + rs.ExpectStatusUnchanged() + }) + It("Should handle reconnection and syncing when the watcher sends a watch terminated error", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + // Since we are timing the processing, we still need the interval to be sufficiently + // large to make the measurements more accurate. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) rs.ExpectStatusUpdate(api.WaitForDatastore) + + before := time.Now() + rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) rs.clientWatchResponse(r1, nil) @@ -282,33 +346,126 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.ExpectStatusUnchanged() rs.clientListResponse(r3, emptyList) rs.ExpectStatusUpdate(api.InSync) - rs.clientWatchResponse(r3, nil) - rs.sendEvent(r3, api.WatchEvent{ - Type: api.WatchError, - Error: dsError, - }) + for range watchersyncer.MaxErrorsPerRevision { + By("Starting a watch and then sending a terminating error.") + rs.clientWatchResponse(r3, nil) + rs.sendEvent(r3, api.WatchEvent{ + Type: api.WatchError, + Error: dsError, + }) + } rs.ExpectStatusUnchanged() rs.clientListResponse(r3, emptyList) rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) rs.ExpectStatusUnchanged() + By("Expecting the time for all events to be handled is within a sensible window") + expectedDuration := watchersyncer.MinResyncInterval * 5 + minDuration := 70 * expectedDuration / 100 + maxDuration := 150 * expectedDuration / 100 + for time.Since(before) < maxDuration { + if rs.allEventsHandled() { + break + } + time.Sleep(10 * time.Millisecond) + } + duration := time.Now().Sub(before) + // Watch fails, but gets created again immediately. This should happen without // additional pauses. rs.expectAllEventsHandled() + Expect(duration).To(BeNumerically(">", minDuration)) + Expect(duration).To(BeNumerically("<", maxDuration)) + }) + + It("Should restart from the most recent watch bookmark after a watch failure", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + // Since we are timing the processing, we still need the interval to be sufficiently + // large to make the measurements more accurate. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) + rs.ExpectStatusUpdate(api.WaitForDatastore) + + rs.clientListResponse(r1, emptyList) + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUpdate(api.InSync) + rs.clientWatchResponse(r1, nil) + + By("Sending a bookmark.") + rs.sendEvent(r1, api.WatchEvent{ + Type: api.WatchBookmark, + New: &model.KVPair{ + Revision: "bookmarkRevision", + }, + }) + By("Sending a watch terminated error.") + rs.sendEvent(r1, api.WatchEvent{ + Type: api.WatchError, + Error: dsError, + }) + + rs.clientWatchResponse(r1, nil) + rs.ExpectStatusUnchanged() + + Eventually(rs.fc.getLatestWatchRevision, watchersyncer.MinResyncInterval*2, 10*time.Millisecond).Should( + Equal("bookmarkRevision")) + }) + + It("Should retry the same revision on connection refused", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + // Since we are timing the processing, we still need the interval to be sufficiently + // large to make the measurements more accurate. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 500*time.Millisecond, 2000*time.Millisecond) + + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) + rs.ExpectStatusUpdate(api.WaitForDatastore) + + rs.clientListResponse(r1, emptyList) + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUpdate(api.InSync) + rs.clientWatchResponse(r1, nil) + + By("Sending a bookmark.") + rs.sendEvent(r1, api.WatchEvent{ + Type: api.WatchBookmark, + New: &model.KVPair{ + Revision: "bookmarkRevision", + }, + }) + By("Sending a watch terminated error.") + rs.sendEvent(r1, api.WatchEvent{ + Type: api.WatchError, + Error: dsError, + }) + + for range watchersyncer.MaxErrorsPerRevision * 2 { + rs.clientWatchResponse(r1, cerrors.ErrorDatastoreError{ + Err: unix.ECONNREFUSED, + }) + Eventually(rs.allEventsHandled, watchersyncer.MinResyncInterval*2, time.Millisecond).Should(BeTrue()) + } + rs.clientWatchResponse(r1, nil) + rs.ExpectStatusUnchanged() + Eventually(rs.allEventsHandled, watchersyncer.MinResyncInterval*2, time.Millisecond).Should(BeTrue()) + + Expect(rs.fc.getLatestWatchRevision()).To(Equal("bookmarkRevision")) }) It("Should handle receiving events while one watcher fails and fails to recreate", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 100*time.Millisecond, 500*time.Millisecond) + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) eventL1Added1 := addEvent(l1Key1) eventL2Added1 := addEvent(l2Key1) eventL2Added2 := addEvent(l2Key2) eventL3Added1 := addEvent(l3Key1) - // Temporarily reduce the watch and list poll interval to make the tests faster. - defer setWatchIntervals(watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) - setWatchIntervals(100*time.Millisecond, 500*time.Millisecond) - By("Syncing a single result for resource 1 and creating the watch") rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r1, &model.KVPairList{ @@ -329,8 +486,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.clientListResponse(r2, emptyList) rs.clientWatchResponse(r2, genError) rs.ExpectStatusUnchanged() - rs.clientListResponse(r2, emptyList) - rs.ExpectStatusUnchanged() rs.clientWatchResponse(r2, nil) time.Sleep(130 * watchersyncer.WatchPollInterval / 100) rs.expectAllEventsHandled() @@ -348,8 +503,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { Error: dsError, }) rs.ExpectStatusUnchanged() - rs.clientListResponse(r3, emptyList) - rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) @@ -386,6 +539,10 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { }) It("Should not resend add events during a resync and should delete stale entries", func() { + // Temporarily reduce the watch and list poll interval to make the tests faster. + defer setWatchIntervals(watchersyncer.MinResyncInterval, watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) + setWatchIntervals(100*time.Millisecond, 100*time.Millisecond, 500*time.Millisecond) + rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) eventL1Added1 := addEvent(l1Key1) eventL1Deleted1 := deleteEvent(l1Key1) @@ -395,10 +552,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { eventL1Modified4 := modifiedEvent(l1Key4) eventL1Modified4_2 := modifiedEvent(l1Key4) - // Temporarily reduce the watch and list poll interval to make the tests faster. - defer setWatchIntervals(watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval) - setWatchIntervals(100*time.Millisecond, 500*time.Millisecond) - By("returning a sync list with three entries and then failing the watch") rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r1, &model.KVPairList{ @@ -413,7 +566,8 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.ExpectStatusUpdate(api.InSync) // The retry thread will be blocked for the watch poll interval. - rs.clientWatchResponse(r1, genError) + // Need to use a too-old error to trigger re-List. + rs.clientWatchResponse(r1, caliTooOldRV) time.Sleep(watchersyncer.WatchPollInterval) rs.ExpectStatusUnchanged() @@ -469,7 +623,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { By("Failing the watch, and resyncing with another modified entry") rs.sendEvent(r1, api.WatchEvent{ Type: api.WatchError, - Error: dsError, + Error: caliTooOldRV, }) rs.ExpectStatusUnchanged() rs.clientListResponse(r1, &model.KVPairList{ @@ -741,7 +895,8 @@ var ( // Set the list interval and watch interval in the WatcherSyncer. We do this to reduce // the test time. -func setWatchIntervals(listRetryInterval, watchPollInterval time.Duration) { +func setWatchIntervals(minRetryInterval, listRetryInterval, watchPollInterval time.Duration) { + watchersyncer.MinResyncInterval = minRetryInterval watchersyncer.ListRetryInterval = listRetryInterval watchersyncer.WatchPollInterval = watchPollInterval } @@ -896,9 +1051,9 @@ type watcherSyncerTester struct { func (rst *watcherSyncerTester) expectAllEventsHandled() { log.Infof("Expecting all events to have been handled") for _, l := range rst.lws { - Expect(l.listCallResults).To(HaveLen(0), "pending list results to be processed") - Expect(l.stopEvents).To(HaveLen(0), "pending stop events to be processed") - Expect(l.results).To(HaveLen(0), "pending watch results to be processed") + ExpectWithOffset(1, l.listCallResults).To(HaveLen(0), "pending list results to be processed") + ExpectWithOffset(1, l.stopEvents).To(HaveLen(0), "pending stop events to be processed") + ExpectWithOffset(1, l.results).To(HaveLen(0), "pending watch results to be processed") } } @@ -909,6 +1064,7 @@ func (rst *watcherSyncerTester) allEventsHandled() bool { eventsHandled := true for _, l := range rst.lws { eventsHandled = eventsHandled && (len(l.listCallResults) == 0) + eventsHandled = eventsHandled && (len(l.watchCallError) == 0) eventsHandled = eventsHandled && (len(l.stopEvents) == 0) eventsHandled = eventsHandled && (len(l.results) == 0) } @@ -934,10 +1090,12 @@ func (rst *watcherSyncerTester) sendEvent(r watchersyncer.ResourceType, event ap // in the watcher which will be decremented once the old one has fully terminated. log.WithField("Name", name).Info("Watcher error will trigger restart - increment termination count") rst.lws[name].termWg.Add(1) - } - log.WithField("Name", name).Info("Sending event") + log.WithFields(log.Fields{ + "name": name, + "event": event, + }).Info("Sending event") rst.lws[name].results <- event if event.Type == api.WatchError { @@ -953,7 +1111,7 @@ func (rst *watcherSyncerTester) sendEvent(r watchersyncer.ResourceType, event ap func (rst *watcherSyncerTester) expectStop(r watchersyncer.ResourceType) { name := model.ListOptionsToDefaultPathRoot(r.ListInterface) log.WithField("Name", name).Infof("Expecting Stop") - Eventually(func() bool { + EventuallyWithOffset(1, func() bool { return len(rst.lws[name].stopEvents) > 0 }).Should(BeTrue()) @@ -1063,7 +1221,10 @@ func (c *fakeClient) List(ctx context.Context, list model.ListInterface, revisio func (c *fakeClient) Watch(ctx context.Context, list model.ListInterface, options api.WatchOptions) (api.WatchInterface, error) { // Create a fake watcher keyed off the ListOptions (root path). name := model.ListOptionsToDefaultPathRoot(list) - log.WithField("Name", name).Info("Watch request") + log.WithFields(log.Fields{ + "name": name, + "revision": options.Revision, + }).Info("Watch request") if l, ok := c.lws[name]; !ok || l == nil { panic("Watch for unhandled resource type") } else { diff --git a/libcalico-go/lib/testutils/syncertester.go b/libcalico-go/lib/testutils/syncertester.go index 925422f12ef..1ee0bd72739 100644 --- a/libcalico-go/lib/testutils/syncertester.go +++ b/libcalico-go/lib/testutils/syncertester.go @@ -77,6 +77,7 @@ type SyncerTester struct { func (st *SyncerTester) OnStatusUpdated(status api.SyncStatus) { defer GinkgoRecover() st.lock.Lock() + log.WithField("status", status).Info("OnStatusUpdated") current := st.status st.status = status st.statusChanged = true @@ -177,7 +178,7 @@ func (st *SyncerTester) ExpectStatusUpdate(status api.SyncStatus, timeout ...tim } else { EventuallyWithOffset(1, cs, timeout[0], "1ms").Should(Equal(status)) } - ConsistentlyWithOffset(1, cs).Should(Equal(status)) + ConsistentlyWithOffset(1, cs, "10ms", "1ms").Should(Equal(status)) log.Infof("Status is at expected status: %s", status) From d6dbc592052fdc6c8e30af038b4ca188bd016d08 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Tue, 18 Feb 2025 18:01:53 +0000 Subject: [PATCH 5/5] Refactor watchercache main loop: - Use extra methods and defer cleanups. - Do the resync in one place at the top of the loop. - Tune logging of events. --- libcalico-go/lib/backend/api/api.go | 28 ++++++ .../lib/backend/watchersyncer/watchercache.go | 89 +++++++++++-------- 2 files changed, 82 insertions(+), 35 deletions(-) diff --git a/libcalico-go/lib/backend/api/api.go b/libcalico-go/lib/backend/api/api.go index fa3fb4bbbfb..e8e3f17bd7d 100644 --- a/libcalico-go/lib/backend/api/api.go +++ b/libcalico-go/lib/backend/api/api.go @@ -221,6 +221,34 @@ type WatchEvent struct { Error error } +func (w WatchEvent) String() string { + switch w.Type { + case WatchAdded: + if w.New == nil { + break // Malformed event, but still want to be able to print it! + } + return fmt.Sprintf("Added:%v@%v", w.New.Key, w.New.Revision) + case WatchModified: + if w.New == nil { + break + } + return fmt.Sprintf("Modified:%v@%v", w.New.Key, w.New.Revision) + case WatchDeleted: + if w.Old == nil { + break + } + return fmt.Sprintf("Deleted:%v@%v", w.Old.Key, w.Old.Revision) + case WatchBookmark: + if w.New == nil { + break + } + return fmt.Sprintf("Bookmark:%v", w.New.Revision) + case WatchError: + return fmt.Sprintf("Error:%v", w.Error) + } + return fmt.Sprintf("WatchEvent{Type: %s, Old: %v, New: %v, Error: %v}", w.Type, w.Old, w.New, w.Error) +} + // FakeWatcher is inspired by apimachinery (watch) FakeWatcher type FakeWatcher struct { result chan WatchEvent diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index 592d32fec4e..2030d5c5a73 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -17,6 +17,7 @@ package watchersyncer import ( "context" "errors" + "strings" "time" "github.com/sirupsen/logrus" @@ -73,7 +74,7 @@ type cacheEntry struct { // Create a new watcherCache. func newWatcherCache(client api.Client, resourceType ResourceType, results chan<- interface{}) *watcherCache { return &watcherCache{ - logger: logrus.WithField("ListRoot", model.ListOptionsToDefaultPathRoot(resourceType.ListInterface)), + logger: logrus.WithField("ListRoot", listRootForLog(resourceType.ListInterface)), client: client, resourceType: resourceType, results: results, @@ -83,32 +84,51 @@ func newWatcherCache(client api.Client, resourceType ResourceType, results chan< } } +func listRootForLog(listInterface model.ListInterface) string { + root := model.ListOptionsToDefaultPathRoot(listInterface) + root = strings.Replace(root, "/calico/resources/v3/projectcalico.org/", ".../v3/pc.org/", 1) + root = strings.Replace(root, "/calico/", ".../", 1) + return root +} + // run creates the watcher and loops indefinitely reading from the watcher. func (wc *watcherCache) run(ctx context.Context) { - wc.logger.Debug("Watcher cache starting, start initial sync processing") - wc.resyncAndCreateWatcher(ctx) + wc.logger.Debug("Watcher cache starting...") + + // On shutdown, send deletions for all the objects we're tracking. + defer wc.sendDeletionsForAllResources() + + // Main loop, repeatedly resync with the store and then watch for changes + // until our watch fails. + for ctx.Err() == nil { + wc.resyncAndLoopReadingFromWatcher(ctx) + } +} + +func (wc *watcherCache) resyncAndLoopReadingFromWatcher(ctx context.Context) { + defer wc.cleanExistingWatcher() + wc.maybeResyncAndCreateWatcher(ctx) + wc.loopReadingFromWatcher(ctx) +} + +func (wc *watcherCache) loopReadingFromWatcher(ctx context.Context) { + eventLogger := wc.logger.WithField("event", nil) - wc.logger.Debug("Starting main event processing loop") -mainLoop: for { - if wc.watch == nil { - // The watcher will be nil if the context cancelled during a resync. - wc.logger.Debug("Watch is nil. Returning") - break mainLoop - } select { case <-ctx.Done(): wc.logger.Debug("Context is done. Returning") - wc.cleanExistingWatcher() - break mainLoop + return case event, ok := <-wc.watch.ResultChan(): if !ok { // If the channel is closed then resync/recreate the watch. wc.logger.Debug("Watch channel closed by remote - recreate watcher") - wc.resyncAndCreateWatcher(ctx) - continue + return } - wc.logger.WithField("RC", wc.watch.ResultChan()).Debug("Reading event from results channel") + + // Re-use this log event so that we don't allocate every time. + eventLogger.Data["event"] = event + eventLogger.Debug("Got event from results channel") // Handle the specific event type. switch event.Type { @@ -120,7 +140,7 @@ mainLoop: kvp := event.Old if kvp == nil { // Bug, we're about to panic when we hit the nil pointer, log something useful. - wc.logger.WithField("watcher", wc).WithField("event", event).Panic("Deletion event without old value") + eventLogger.Panic("Deletion event without old value") } kvp.Value = nil wc.handleWatchListEvent(kvp) @@ -130,7 +150,7 @@ mainLoop: if kerrors.IsResourceExpired(event.Error) { // Our current watch revision is too old. Even with watch bookmarks, we hit this path after the // API server restarts (and presumably does an immediate compaction). - wc.logger.WithError(event.Error).Info("Watch has expired, triggering full resync.") + eventLogger.Info("Watch has expired, triggering full resync.") wc.resetWatchRevisionForFullResync() } else { // Unknown error, default is to just try restarting the watch on assumption that it's @@ -139,35 +159,24 @@ mainLoop: wc.errorCountAtCurrentRev++ if wc.errorCountAtCurrentRev >= MaxErrorsPerRevision { // Too many errors at the current revision, trigger a full resync. - wc.logger.WithError(event.Error).Warn("Watch repeatedly failed without making progress, triggering full resync") + eventLogger.Warn("Watch repeatedly failed without making progress, triggering full resync") wc.resetWatchRevisionForFullResync() } else { - wc.logger.WithError(event.Error).Info("Watch of resource finished. Attempting to restart it...") + eventLogger.Info("Watch of resource finished. Attempting to restart it...") } } - wc.resyncAndCreateWatcher(ctx) + return default: // Unknown event type - not much we can do other than log. - wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore") + eventLogger.Errorf("Unknown event type received from the datastore") } } } - - // The watcher cache has exited. This can only mean that it has been shutdown, so emit all updates in the cache as - // delete events. - for _, value := range wc.resources { - wc.results <- []api.Update{{ - UpdateType: api.UpdateTypeKVDeleted, - KVPair: model.KVPair{ - Key: value.key, - }, - }} - } } -// resyncAndCreateWatcher loops performing resync processing until it successfully +// maybeResyncAndCreateWatcher loops performing resync processing until it successfully // completes a resync and starts a watcher. -func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { +func (wc *watcherCache) maybeResyncAndCreateWatcher(ctx context.Context) { // The passed in context allows a resync to be stopped mid-resync. The resync should be stopped as quickly as // possible, but there should be usable data available in wc.resources so that delete events can be sent. // The strategy is to @@ -186,7 +195,6 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { select { case <-ctx.Done(): wc.logger.Debug("Context is done. Returning") - wc.cleanExistingWatcher() return case <-wc.resyncThrottleC(): // Start the resync. This processing loops until we create the watcher. If the @@ -502,3 +510,14 @@ func (wc *watcherCache) markAsValid(resourceKey string) { } } } + +func (wc *watcherCache) sendDeletionsForAllResources() { + for _, value := range wc.resources { + wc.results <- []api.Update{{ + UpdateType: api.UpdateTypeKVDeleted, + KVPair: model.KVPair{ + Key: value.key, + }, + }} + } +}