Skip to content

Commit d53fce9

Browse files
committed
Do proper loop detection for agg clusters
1 parent d4f1a26 commit d53fce9

File tree

2 files changed

+59
-6
lines changed

2 files changed

+59
-6
lines changed

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

+57-6
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,10 @@ private void releaseSubscription(ClusterSubscription subscription) {
251251
checkNotNull(subscription, "subscription");
252252
String clusterName = subscription.getClusterName();
253253
syncContext.execute(() -> {
254-
XdsWatcherBase<?> cdsWatcher =
255-
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
254+
XdsWatcherBase<?> cdsWatcher = null;
255+
if (resourceWatchers.containsKey(CLUSTER_RESOURCE)) {
256+
cdsWatcher = resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
257+
}
256258
if (cdsWatcher == null) {
257259
return; // already released while waiting for the syncContext
258260
}
@@ -324,6 +326,13 @@ private void maybePublishConfig() {
324326
return;
325327
}
326328

329+
List<String> namesInLoop = detectLoops(rawClusterWatchers);
330+
if (namesInLoop != null) {
331+
String error = "Detected loop in cluster dependencies: " + namesInLoop;
332+
xdsConfigWatcher.onError("xDS node ID: " + dataPlaneAuthority,
333+
Status.UNAVAILABLE.withDescription(error));
334+
return;
335+
}
327336
XdsConfig newConfig = buildConfig();
328337
if (Objects.equals(newConfig, lastXdsConfig)) {
329338
return;
@@ -332,6 +341,51 @@ private void maybePublishConfig() {
332341
xdsConfigWatcher.onUpdate(lastXdsConfig);
333342
}
334343

344+
private List<String> detectLoops(TypeWatchers<?> rawClusterWatchers) {
345+
for (XdsWatcherBase<?> watcher : rawClusterWatchers.watchers.values()) {
346+
if (!watcher.hasDataValue()) {
347+
continue;
348+
}
349+
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
350+
351+
XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcher.getData().getValue();
352+
if (cdsUpdate.clusterType() != ClusterType.AGGREGATE) {
353+
continue;
354+
}
355+
List<String> namesInLoop =
356+
detectLoops(Arrays.asList(watcher.resourceName), cdsUpdate.prioritizedClusterNames());
357+
if (namesInLoop != null) {
358+
return namesInLoop;
359+
}
360+
}
361+
362+
return null;
363+
}
364+
365+
private List<String> detectLoops(List<String> parents, ImmutableList<String> children) {
366+
if (!Collections.disjoint(parents, children)) {
367+
String problemChild = children.stream().filter(c -> parents.contains(c)).findFirst().get();
368+
return new ImmutableList.Builder<String>().addAll(parents).add(problemChild).build();
369+
}
370+
371+
for (String child : children) {
372+
CdsWatcher childWatcher = getCluster(child);
373+
if (childWatcher == null || !childWatcher.getData().hasValue()
374+
|| childWatcher.getData().getValue().clusterType() != ClusterType.AGGREGATE) {
375+
continue;
376+
}
377+
ImmutableList newParents =
378+
new ImmutableList.Builder().addAll(parents).add(childWatcher.resourceName()).build();
379+
List<String> childLoop =
380+
detectLoops(newParents, childWatcher.getData().getValue().prioritizedClusterNames());
381+
if (childLoop != null) {
382+
return childLoop;
383+
}
384+
}
385+
386+
return null;
387+
}
388+
335389
@VisibleForTesting
336390
XdsConfig buildConfig() {
337391
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
@@ -988,10 +1042,7 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
9881042
setData(update);
9891043
Set<String> addedClusters = Sets.difference(newNames, oldNames);
9901044
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
991-
992-
if (addedClusters.isEmpty()) {
993-
maybePublishConfig();
994-
}
1045+
maybePublishConfig();
9951046
} else { // data was set to error status above
9961047
maybePublishConfig();
9971048
}

xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java

+2
Original file line numberDiff line numberDiff line change
@@ -734,11 +734,13 @@ public void aggregateCluster_withLoops_afterEds() {
734734
xdsClient.deliverCdsUpdate(cluster3, update3);
735735

736736
// cluster2 (aggr.) -> [cluster3 (EDS)]
737+
reset(helper);
737738
CdsUpdate update2a =
738739
CdsUpdate.forAggregate(cluster2, Arrays.asList(cluster3, cluster1, cluster2, cluster3))
739740
.roundRobinLbPolicy().build();
740741
xdsClient.deliverCdsUpdate(cluster2, update2a);
741742
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2, cluster3);
743+
742744
verify(helper).updateBalancingState(
743745
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
744746
Status unavailable = Status.UNAVAILABLE.withDescription(

0 commit comments

Comments
 (0)