Skip to content

Commit a48b5bb

Browse files
committed
xdsclient: update watcher API as per gRFC A88
1 parent 724f450 commit a48b5bb

23 files changed

+385
-366
lines changed

xds/csds/csds_e2e_test.go

+10-25
Original file line numberDiff line numberDiff line change
@@ -70,49 +70,37 @@ func Test(t *testing.T) {
7070

7171
type nopListenerWatcher struct{}
7272

73-
func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
73+
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
7474
onDone()
7575
}
76-
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
77-
onDone()
78-
}
79-
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
76+
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
8077
onDone()
8178
}
8279

8380
type nopRouteConfigWatcher struct{}
8481

85-
func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
82+
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) {
8683
onDone()
8784
}
88-
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
89-
onDone()
90-
}
91-
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85+
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
9286
onDone()
9387
}
9488

9589
type nopClusterWatcher struct{}
9690

97-
func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
98-
onDone()
99-
}
100-
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
91+
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
10192
onDone()
10293
}
103-
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
94+
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
10495
onDone()
10596
}
10697

10798
type nopEndpointsWatcher struct{}
10899

109-
func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
110-
onDone()
111-
}
112-
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
100+
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
113101
onDone()
114102
}
115-
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
103+
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
116104
onDone()
117105
}
118106

@@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
137125
}
138126
}
139127

140-
func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
141-
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
142-
}
143-
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
128+
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
144129
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
145130
}
146-
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
131+
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
147132
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
148133
}
149134

xds/internal/balancer/cdsbalancer/cdsbalancer.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ func (b *cdsBalancer) ResolverError(err error) {
342342
if b.lbCfg != nil {
343343
root = b.lbCfg.ClusterName
344344
}
345-
b.onClusterError(root, err)
345+
b.onClusterAmbientError(root, err)
346346
})
347347
}
348348

@@ -428,20 +428,20 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
428428
// If the security config is invalid, for example, if the provider
429429
// instance is not found in the bootstrap config, we need to put the
430430
// channel in transient failure.
431-
b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
431+
b.onClusterAmbientError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
432432
return
433433
}
434434
}
435435

436436
clustersSeen := make(map[string]bool)
437437
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
438438
if err != nil {
439-
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
439+
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
440440
return
441441
}
442442
if ok {
443443
if len(dms) == 0 {
444-
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
444+
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
445445
return
446446
}
447447
// Child policy is built the first time we resolve the cluster graph.
@@ -501,7 +501,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
501501
// TRANSIENT_FAILURE.
502502
//
503503
// Only executed in the context of a serializer callback.
504-
func (b *cdsBalancer) onClusterError(name string, err error) {
504+
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
505505
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)
506506

507507
if b.childLB != nil {
@@ -525,15 +525,14 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
525525
// TRANSIENT_FAILURE.
526526
//
527527
// Only executed in the context of a serializer callback.
528-
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
529-
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
528+
func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) {
530529
if b.childLB != nil {
531530
b.childLB.ResolverError(err)
532531
} else {
533532
// If child balancer was never created, fail the RPCs with errors.
534533
b.ccw.UpdateState(balancer.State{
535534
ConnectivityState: connectivity.TransientFailure,
536-
Picker: base.NewErrPicker(err),
535+
Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)),
537536
})
538537
}
539538
}

xds/internal/balancer/cdsbalancer/cluster_watcher.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@ type clusterWatcher struct {
3232
parent *cdsBalancer
3333
}
3434

35-
func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
35+
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
36+
if err != nil {
37+
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() }
38+
cw.parent.serializer.ScheduleOr(handleError, onDone)
39+
return
40+
}
3641
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
3742
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
3843
}
3944

40-
func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
41-
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
45+
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
46+
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
4247
cw.parent.serializer.ScheduleOr(handleError, onDone)
4348
}
4449

45-
func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
46-
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
47-
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
48-
}
49-
5050
// watcherState groups the state associated with a clusterWatcher.
5151
type watcherState struct {
5252
watcher *clusterWatcher // The underlying watcher.

xds/internal/balancer/clusterresolver/resource_resolver_eds.go

+24-25
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,42 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
7676
}
7777

7878
// OnUpdate is invoked to report an update for the resource being watched.
79-
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
79+
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
8080
if er.stopped.HasFired() {
8181
onDone()
8282
return
8383
}
8484

85+
if err != nil {
86+
if er.logger.V(2) {
87+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
88+
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
89+
} else {
90+
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err)
91+
}
92+
}
93+
// Report an empty update that would result in no priority child being
94+
// created for this discovery mechanism. This would result in the priority
95+
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
96+
// localities) if this was the only discovery mechanism, or would result in
97+
// the priority LB policy using a lower priority discovery mechanism when
98+
// that becomes available.
99+
er.mu.Lock()
100+
er.update = &xdsresource.EndpointsUpdate{}
101+
er.mu.Unlock()
102+
103+
er.topLevelResolver.onUpdate(onDone)
104+
return
105+
}
106+
85107
er.mu.Lock()
86108
er.update = &update.Resource
87109
er.mu.Unlock()
88110

89111
er.topLevelResolver.onUpdate(onDone)
90112
}
91113

92-
func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
114+
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
93115
if er.stopped.HasFired() {
94116
onDone()
95117
return
@@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun
119141

120142
er.topLevelResolver.onUpdate(onDone)
121143
}
122-
123-
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
124-
if er.stopped.HasFired() {
125-
onDone()
126-
return
127-
}
128-
129-
if er.logger.V(2) {
130-
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
131-
}
132-
133-
// Report an empty update that would result in no priority child being
134-
// created for this discovery mechanism. This would result in the priority
135-
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
136-
// localities) if this was the only discovery mechanism, or would result in
137-
// the priority LB policy using a lower priority discovery mechanism when
138-
// that becomes available.
139-
er.mu.Lock()
140-
er.update = &xdsresource.EndpointsUpdate{}
141-
er.mu.Unlock()
142-
143-
er.topLevelResolver.onUpdate(onDone)
144-
}

xds/internal/resolver/watch_service.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,21 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
3636
return lw
3737
}
3838

39-
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
39+
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
40+
if err != nil {
41+
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() }
42+
l.parent.serializer.ScheduleOr(handleError, onDone)
43+
return
44+
}
4045
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
4146
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
4247
}
4348

44-
func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
45-
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
49+
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
50+
handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() }
4651
l.parent.serializer.ScheduleOr(handleError, onDone)
4752
}
4853

49-
func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
50-
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
51-
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
52-
}
53-
5454
func (l *listenerWatcher) stop() {
5555
l.cancel()
5656
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
@@ -68,24 +68,24 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
6868
return rw
6969
}
7070

71-
func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
71+
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
72+
if err != nil {
73+
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() }
74+
r.parent.serializer.ScheduleOr(handleError, onDone)
75+
return
76+
}
7277
handleUpdate := func(context.Context) {
7378
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
7479
onDone()
7580
}
7681
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
7782
}
7883

79-
func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
80-
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
84+
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
85+
handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() }
8186
r.parent.serializer.ScheduleOr(handleError, onDone)
8287
}
8388

84-
func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85-
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
86-
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
87-
}
88-
8989
func (r *routeConfigWatcher) stop() {
9090
r.cancel()
9191
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)

xds/internal/resolver/xds_resolver.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -518,17 +518,21 @@ func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate
518518
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
519519
}
520520

521-
func (r *xdsResolver) onListenerResourceError(err error) {
521+
func (r *xdsResolver) onListenerResourceAmbientError(err error) {
522522
if r.logger.V(2) {
523523
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
524524
}
525525
r.onError(err)
526526
}
527527

528528
// Only executed in the context of a serializer callback.
529-
func (r *xdsResolver) onListenerResourceNotFound() {
529+
func (r *xdsResolver) onListenerResourceChangedError(err error) {
530530
if r.logger.V(2) {
531-
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
531+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
532+
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
533+
} else {
534+
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
535+
}
532536
}
533537

534538
r.listenerUpdateRecvd = false
@@ -559,17 +563,21 @@ func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresourc
559563
}
560564

561565
// Only executed in the context of a serializer callback.
562-
func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
566+
func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) {
563567
if r.logger.V(2) {
564568
r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
565569
}
566570
r.onError(err)
567571
}
568572

569573
// Only executed in the context of a serializer callback.
570-
func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
574+
func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) {
571575
if r.logger.V(2) {
572-
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
576+
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
577+
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
578+
} else {
579+
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)
580+
}
573581
}
574582

575583
if r.rdsResourceName != name {

0 commit comments

Comments
 (0)