diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 3c838afb67fc..bd11580bb640 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -70,49 +70,37 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopClusterWatcher struct{} -func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { - writeOnDone(w.testCtxDone, w.onDoneCh, onDone) -} -func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index 835461d0997b..3ff11ed19412 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,21 +32,26 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() } + cw.parent.serializer.ScheduleOr(handleNotFound, onDone) + } else { + handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() } + cw.parent.serializer.ScheduleOr(handleError, onDone) + } + return + } handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() } cw.parent.serializer.ScheduleOr(handleError, onDone) } -func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() } - cw.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - // watcherState groups the state associated with a clusterWatcher. type watcherState struct { watcher *clusterWatcher // The underlying watcher. diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index ddb949019ee5..c0712328ad27 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,12 +76,34 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return } + if err != nil { + if er.logger.V(2) { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) + } else { + er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err) + } + } + // Report an empty update that would result in no priority child being + // created for this discovery mechanism. This would result in the priority + // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or + // localities) if this was the only discovery mechanism, or would result in + // the priority LB policy using a lower priority discovery mechanism when + // that becomes available. + er.mu.Lock() + er.update = &xdsresource.EndpointsUpdate{} + er.mu.Unlock() + + er.topLevelResolver.onUpdate(onDone) + return + } + er.mu.Lock() er.update = &update.Resource er.mu.Unlock() @@ -89,7 +111,7 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return @@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun er.topLevelResolver.onUpdate(onDone) } - -func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - if er.stopped.HasFired() { - onDone() - return - } - - if er.logger.V(2) { - er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) - } - - // Report an empty update that would result in no priority child being - // created for this discovery mechanism. This would result in the priority - // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or - // localities) if this was the only discovery mechanism, or would result in - // the priority LB policy using a lower priority discovery mechanism when - // that becomes available. - er.mu.Lock() - er.update = &xdsresource.EndpointsUpdate{} - er.mu.Unlock() - - er.topLevelResolver.onUpdate(onDone) -} diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 0de6604484b1..5b55256afadd 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,21 +36,21 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + handleError := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() } + l.parent.serializer.ScheduleOr(handleError, onDone) + return + } handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) } -func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() } - l.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - func (l *listenerWatcher) stop() { l.cancel() l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) @@ -68,7 +68,17 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + var handleError func(context.Context) + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + handleError = func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() } + } else { + handleError = func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } + } + r.parent.serializer.ScheduleOr(handleError, onDone) + return + } handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) onDone() @@ -76,16 +86,11 @@ func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, on r.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) } -func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() } - r.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - func (r *routeConfigWatcher) stop() { r.cancel() r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 09d320018aee..0d466aa8e71c 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -414,19 +414,33 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { - lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + if err != nil { + lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err) + } else { + lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + } return } if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + if err != nil { + lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err) + } else { + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + } + } + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + lw.parent.onLDSResourceDoesNotExist(err) + } + return } lw.parent.handleLDSUpdate(update.Resource) } -func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err) @@ -438,17 +452,3 @@ func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // For errors which are anything other than "resource-not-found", we // continue to use the old configuration. } - -func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - defer onDone() - if lw.parent.closed.HasFired() { - lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name) - return - } - if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error", lw.name) - } - - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name) - lw.parent.onLDSResourceDoesNotExist(err) -} diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index bcd3938e6f1a..998145b32767 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,7 +147,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -156,26 +156,20 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDo } rw.mu.Unlock() if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + if err != nil { + rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err) + } else { + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + } } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) -} - -func (rw *rdsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - defer onDone() - rw.mu.Lock() - if rw.canceled { - rw.mu.Unlock() + if err != nil { + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) return } - rw.mu.Unlock() - if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err) - } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) } -func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -184,8 +178,7 @@ func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { } rw.mu.Unlock() if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName) + rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err) } - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName) rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) } diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index dae72e2a7733..522b5d9f37a9 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -35,10 +35,27 @@ type TestResourceWatcher struct { ResourceDoesNotExistCh chan struct{} } -// OnUpdate is invoked by the xDS client to report the latest update on the resource -// being watched. -func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.OnDoneFunc) { +// OnResourceChanged is invoked by the xDS client to report the latest update +// or an error on the resource being watched. +func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + select { + case <-w.ResourceDoesNotExistCh: + default: + } + w.ResourceDoesNotExistCh <- struct{}{} + return + } + select { + case <-w.ErrorCh: + default: + } + w.ErrorCh <- err + return + + } select { case <-w.UpdateCh: default: @@ -46,8 +63,8 @@ func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xds w.UpdateCh <- &data } -// OnError is invoked by the xDS client to report the latest error. -func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +// OnAmbientError is invoked by the xDS client to report the latest error. +func (w *TestResourceWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() select { case <-w.ErrorCh: @@ -56,17 +73,6 @@ func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) w.ErrorCh <- err } -// OnResourceDoesNotExist is used by the xDS client to report that the resource -// being watched no longer exists. -func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - defer onDone() - select { - case <-w.ResourceDoesNotExistCh: - default: - } - w.ResourceDoesNotExistCh <- struct{}{} -} - // NewTestResourceWatcher returns a TestResourceWatcher to watch for resources // via the xDS client. func NewTestResourceWatcher() *TestResourceWatcher { diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index f81685a45e69..4e324208e0f8 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -177,24 +177,13 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, a.logger.Infof("Connection to server %s failed with error: %v", serverConfig, err) } - // We do not consider it an error if the ADS stream was closed after having - // received a response on the stream. This is because there are legitimate - // reasons why the server may need to close the stream during normal - // operations, such as needing to rebalance load or the underlying - // connection hitting its max connection age limit. See gRFC A57 for more - // details. - if xdsresource.ErrType(err) == xdsresource.ErrTypeStreamFailedAfterRecv { - a.logger.Warningf("Watchers not notified since ADS stream failed after having received at least one response: %v", err) - return - } - // Propagate the connection error from the transport layer to all watchers. for _, rType := range a.resources { for _, state := range rType { for watcher := range state.watchers { watcher := watcher a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) + watcher.OnAmbientError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) }) } } @@ -363,7 +352,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher err := uErr.Err watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnAmbientError(err, done) }) } continue } @@ -388,7 +377,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher resource := uErr.Resource watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceChanged(resource, nil, done) }) } } @@ -436,9 +425,15 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig } if state.md.Status == xdsresource.ServiceStatusNotExist { // The metadata status is set to "ServiceStatusNotExist" if a - // previous update deleted this resource, in which case we do not - // want to repeatedly call the watch callbacks with a - // "resource-not-found" error. + // previous update deleted this resource, in which case we + // want to send an ambient error. + for watcher := range state.watchers { + watcher := watcher + watcherCnt.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnAmbientError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: previous update deleted this resource"), done) + }) + } continue } if serverConfig.ServerFeaturesIgnoreResourceDeletion() { @@ -455,17 +450,17 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig continue } - // If we get here, it means that the resource exists in cache, but not - // in the new update. Delete the resource from cache, and send a - // resource not found error to indicate that the resource has been - // removed. Metadata for the resource is still maintained, as this is - // required by CSDS. + // If we get here, it means that the resource exists in cache, but + // not in the new update. Delete the resource from cache. Metadata + // for the resource is still maintained, as this is required by CSDS. state.cache = nil state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed"), done) + }) } } } @@ -507,7 +502,9 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + }) } } @@ -643,7 +640,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the cached // resource here for watchCallbackSerializer. resource := state.cache - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(resource, nil, func() {}) }) } // If last update was NACK'd, notify the new watcher of error // immediately as well. @@ -655,12 +652,14 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the error // here for watchCallbackSerializer. err := state.md.ErrState.Err - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnAmbientError(err, func() {}) }) } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + }) } cleanup = a.unwatchResource(rType, resourceName, watcher) }, func() { diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index ed4ee360fb7d..b21f89131296 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -45,7 +45,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(nil, err, func() {}) }) return func() {} } @@ -54,7 +54,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) c.serializer.TrySchedule(func(context.Context) { - watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + watcher.OnResourceChanged(nil, fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) }) return func() {} } diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index ff0243f3d462..ffa6cdf09d03 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -60,7 +60,28 @@ func newBLockingListenerWatcher() *blockingListenerWatcher { } } -func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.OnDoneFunc) { +func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, done xdsresource.OnDoneFunc) { + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + // Notify receipt of resource not found. + select { + case lw.notFoundCh <- struct{}{}: + default: + } + } else { + select { + case lw.errorCh <- struct{}{}: + default: + } + } + + select { + case lw.doneNotifierCh <- done: + default: + } + + return + } // Notify receipt of the update. select { case lw.updateCh <- struct{}{}: @@ -73,7 +94,7 @@ func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResource } } -func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.OnDoneFunc) { +func (lw *blockingListenerWatcher) OnAmbientError(err error, done xdsresource.OnDoneFunc) { // Notify receipt of an error. select { case lw.errorCh <- struct{}{}: @@ -86,19 +107,6 @@ func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.OnDoneFun } } -func (lw *blockingListenerWatcher) OnResourceDoesNotExist(done xdsresource.OnDoneFunc) { - // Notify receipt of resource not found. - select { - case lw.notFoundCh <- struct{}{}: - default: - } - - select { - case lw.doneNotifierCh <- done: - default: - } -} - type wrappedADSStream struct { v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient recvCh chan struct{} diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index f8cd6dac7691..165ca0057b6b 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -44,13 +44,10 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopClusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -67,12 +64,17 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) + onDone() + return + } cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) onDone() } -func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -81,11 +83,6 @@ func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) - onDone() -} - // badClusterResource returns a cluster resource for the given name which // contains a config_source_specifier for the `lrs_server` field which is not // set to `self`, and hence is expected to be NACKed by the client. diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 21021b8992bb..c6506ddf408a 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,10 +53,10 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopEndpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } func (noopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { @@ -76,12 +76,17 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) + onDone() + return + } ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) onDone() } -func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (ew *endpointsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -90,11 +95,6 @@ func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) - onDone() -} - // badEndpointsResource returns a endpoints resource for the given // edsServiceName which contains an endpoint with a load_balancing weight of // `0`. This is expected to be NACK'ed by the xDS client. diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index b05b9caf4adc..b03e296e207e 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,13 +48,10 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopListenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -71,12 +68,17 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + onDone() + return + } lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -85,11 +87,6 @@ func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - onDone() -} - type listenerWatcherMultiple struct { updateCh *testutils.Channel } @@ -100,21 +97,21 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + lw.updateCh.Send(listenerUpdateErrTuple{err: err}) + onDone() + return + } lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcherMultiple) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { lw.updateCh.Send(listenerUpdateErrTuple{err: err}) onDone() } -func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - onDone() -} - // badListenerResource returns a listener resource for the given name which does // not contain the `RouteSpecifier` field in the HTTPConnectionManager, and // hence is expected to be NACKed by the client. diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 6b8152620231..76e764421730 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,7 +69,12 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + onDone() + return + } rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) @@ -77,7 +82,7 @@ func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResour onDone() } -func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (rw *testRouteConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -86,11 +91,6 @@ func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFu onDone() } -func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - onDone() -} - func (rw *testRouteConfigWatcher) cancel() { rw.cancel1() rw.cancel2() diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index b8dd1c72f465..dfb161bb69a5 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,13 +43,10 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -66,12 +63,17 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + onDone() + return + } rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) onDone() } -func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (rw *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -80,11 +82,6 @@ func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) onDone() } -func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - onDone() -} - // badRouteConfigResource returns a RouteConfiguration resource for the given // routeName which contains a retry config with num_retries set to `0`. This is // expected to be NACK'ed by the xDS client. diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 0460385d0fb7..67681a1ce641 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -161,7 +161,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -177,7 +177,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", VersionInfo: "1", }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -194,7 +194,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3routepb.RouteConfiguration{})}, }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -418,7 +418,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -434,7 +434,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", VersionInfo: "1", }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -451,7 +451,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3clusterpb.Cluster{})}, }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -667,7 +667,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -683,7 +683,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", VersionInfo: "1", }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -700,7 +700,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3endpointpb.ClusterLoadAssignment{})}, }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -974,7 +974,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -990,7 +990,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", VersionInfo: "1", }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -1007,7 +1007,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3listenerpb.Listener{})}, }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 8e9375fcbbec..0e43f0261cd4 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -110,39 +110,42 @@ func (c *ClusterResourceData) Raw() *anypb.Any { // ClusterWatcher wraps the callbacks to be invoked for different events // corresponding to the cluster resource being watched. type ClusterWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ClusterResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*ClusterResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingClusterWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } c := data.(*ClusterResourceData) - d.watcher.OnUpdate(c, onDone) -} - -func (d *delegatingClusterWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(c, nil, onDone) } -func (d *delegatingClusterWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingClusterWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchCluster uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 94c03d0c5228..2f0faf5b70aa 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -106,39 +106,42 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { // EndpointsWatcher wraps the callbacks to be invoked for different // events corresponding to the endpoints resource being watched. type EndpointsWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*EndpointsResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*EndpointsResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingEndpointsWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } e := data.(*EndpointsResourceData) - d.watcher.OnUpdate(e, onDone) -} - -func (d *delegatingEndpointsWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(e, nil, onDone) } -func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingEndpointsWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchEndpoints uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index e3ca1134a07b..07ddd5ae1bfc 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -143,39 +143,42 @@ func (l *ListenerResourceData) Raw() *anypb.Any { // ListenerWatcher wraps the callbacks to be invoked for different // events corresponding to the listener resource being watched. type ListenerWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ListenerResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*ListenerResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingListenerWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } l := data.(*ListenerResourceData) - d.watcher.OnUpdate(l, onDone) -} - -func (d *delegatingListenerWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(l, nil, onDone) } -func (d *delegatingListenerWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingListenerWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchListener uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index e14f56f781d1..55b5f4a88430 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -58,27 +58,48 @@ type Producer interface { // from the xDS server. type OnDoneFunc func() -// ResourceWatcher wraps the callbacks to be invoked for different events -// corresponding to the resource being watched. +// ResourceWatcher is an interface that can to be implemented to wrap the +// callbacks to be invoked for different events corresponding to the resource +// being watched. type ResourceWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // // The ResourceData parameter needs to be type asserted to the appropriate - // type for the resource being watched. - OnUpdate(ResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // type for the resource being watched. In case of error, the ResourceData + // is nil otherwise its not nil and error is nil but both will never be nil + // together. + // + // Watcher is expected to use the most recent value passed to + // OnResourceChanged(), regardless of whether that's a resource or an error + // i.e., if the watcher is given an error via OnResourceChanged(), that + // means it should stop using any previously delivered resource. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(ResourceData, error, OnDoneFunc) + + // OnAmbientError is invoked to notify the watcher of an error that occurs + // after a resource has been received (i.e. we already have a cached + // resource) that should not modify the watcher’s use of that resource but + // that may be useful information about the ambient state of the XdsClient. + // In particular, the watcher should NOT stop using the previously seen + // resource, and the XdsClient will NOT remove the resource from its cache. + // However, the error message may be useful as additional context to + // include in errors that are being generated for other reasons. + // + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } // TODO: Once the implementation is complete, rename this interface as diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 98ac313288a2..25576903d96d 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -107,39 +107,42 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { // RouteConfigWatcher wraps the callbacks to be invoked for different // events corresponding to the route configuration resource being watched. type RouteConfigWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*RouteConfigResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*RouteConfigResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingRouteConfigWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } rc := data.(*RouteConfigResourceData) - d.watcher.OnUpdate(rc, onDone) -} - -func (d *delegatingRouteConfigWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(rc, nil, onDone) } -func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingRouteConfigWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchRouteConfig uses xDS to discover the configuration associated with the