Skip to content

Commit a8b3df6

Browse files
committed
ensure we unsubscribe removed resources on reconnects
1 parent 4577722 commit a8b3df6

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -526,10 +526,12 @@ private boolean respond(Watch watch, U snapshot, T group) {
526526
}
527527

528528
private List<String> findRemovedResources(DeltaWatch watch, Map<String, VersionedResource<?>> snapshotResources) {
529-
// remove resources for which client has a tracked version but do not exist in snapshot
530-
return watch.trackedResources().keySet()
531-
.stream()
529+
// remove resources for which client has a tracked version or is pending a response but do not exist in snapshot.
530+
// when reconnections occur, envoy sends a request subscribing to resources that could no longer exist in the snapshot
531+
// and we don't count those as trackedResources because we don't know the version.
532+
return Stream.concat(watch.trackedResources().keySet().stream(), watch.pendingResources().stream())
532533
.filter(s -> !snapshotResources.containsKey(s))
534+
.distinct()
533535
.collect(Collectors.toList());
534536
}
535537

cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@
55
import static org.assertj.core.api.Assertions.assertThat;
66

77
import com.google.common.collect.ImmutableList;
8+
import com.google.common.collect.ImmutableSet;
89
import com.google.common.collect.Sets;
910
import com.google.protobuf.Message;
11+
import io.envoyproxy.controlplane.cache.DeltaResponse;
12+
import io.envoyproxy.controlplane.cache.DeltaWatch;
13+
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
1014
import io.envoyproxy.controlplane.cache.NodeGroup;
1115
import io.envoyproxy.controlplane.cache.Resources;
1216
import io.envoyproxy.controlplane.cache.Response;
@@ -20,6 +24,7 @@
2024
import io.envoyproxy.envoy.config.listener.v3.Listener;
2125
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
2226
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
27+
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
2328
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
2429
import java.util.Collections;
2530
import java.util.HashMap;
@@ -543,6 +548,35 @@ public void groups() {
543548
assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP);
544549
}
545550

551+
552+
@Test
553+
public void respondRemovedResourcesWhenPendingResourceButNotTracked() {
554+
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());
555+
556+
cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1);
557+
558+
DeltaResponseTracker responseTracker = new DeltaResponseTracker();
559+
560+
DeltaWatch watch = cache.createDeltaWatch(
561+
DeltaXdsRequest.create(DeltaDiscoveryRequest.newBuilder()
562+
.setNode(Node.getDefaultInstance())
563+
.setTypeUrl(CLUSTER_TYPE_URL)
564+
.addResourceNamesSubscribe("non_existant")
565+
.addResourceNamesSubscribe("cluster0")
566+
.build()),
567+
"",
568+
Collections.emptyMap(),
569+
ImmutableSet.of("non_existant"),
570+
true,
571+
responseTracker, false);
572+
573+
assertThat(watch.isCancelled()).isFalse();
574+
Assertions.assertThat(responseTracker.responses).isNotEmpty();
575+
576+
Assertions.assertThat(responseTracker.responses.get(0).removedResources()).isNotEmpty();
577+
Assertions.assertThat(responseTracker.responses.get(0).removedResources()).containsOnly("non_existant");
578+
}
579+
546580
private static class ResponseTracker implements Consumer<Response> {
547581

548582
private final LinkedList<Response> responses = new LinkedList<>();
@@ -564,6 +598,17 @@ public void accept(Response response) {
564598
}
565599
}
566600

601+
private static class DeltaResponseTracker implements Consumer<DeltaResponse> {
602+
603+
private final LinkedList<DeltaResponse> responses = new LinkedList<>();
604+
605+
@Override
606+
public void accept(DeltaResponse response) {
607+
responses.add(response);
608+
}
609+
610+
}
611+
567612
private static class SingleNodeGroup implements NodeGroup<String> {
568613

569614
private static final String GROUP = "node";

0 commit comments

Comments
 (0)