diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 7f0a445ac..6ab7ac580 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -526,10 +526,12 @@ private boolean respond(Watch watch, U snapshot, T group) { } private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { - // remove resources for which client has a tracked version but do not exist in snapshot - return watch.trackedResources().keySet() - .stream() + // remove resources for which client has a tracked version or is pending a response but do not exist in snapshot. + // when reconnections occur, envoy sends a request subscribing to resources that could no longer exist, + // and we don't count those as trackedResources because we don't know the version. + return Stream.concat(watch.trackedResources().keySet().stream(), watch.pendingResources().stream()) .filter(s -> !snapshotResources.containsKey(s)) + .distinct() .collect(Collectors.toList()); } diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java index b8654f6ba..1ae3364f5 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java @@ -5,8 +5,12 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.protobuf.Message; +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.NodeGroup; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; @@ -20,6 +24,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import java.util.Collections; import java.util.HashMap; @@ -543,6 +548,35 @@ public void groups() { assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP); } + + @Test + public void respondRemovedResourcesWhenPendingResourceButNotTracked() { + SimpleCache cache = new SimpleCache<>(new SingleNodeGroup()); + + cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1); + + DeltaResponseTracker responseTracker = new DeltaResponseTracker(); + + DeltaWatch watch = cache.createDeltaWatch( + DeltaXdsRequest.create(DeltaDiscoveryRequest.newBuilder() + .setNode(Node.getDefaultInstance()) + .setTypeUrl(CLUSTER_TYPE_URL) + .addResourceNamesSubscribe("non_existant") + .addResourceNamesSubscribe("cluster0") + .build()), + "", + Collections.emptyMap(), + ImmutableSet.of("non_existant"), + true, + responseTracker, false); + + assertThat(watch.isCancelled()).isFalse(); + Assertions.assertThat(responseTracker.responses).isNotEmpty(); + + Assertions.assertThat(responseTracker.responses.get(0).removedResources()).isNotEmpty(); + Assertions.assertThat(responseTracker.responses.get(0).removedResources()).containsOnly("non_existant"); + } + private static class ResponseTracker implements Consumer { private final LinkedList responses = new LinkedList<>(); @@ -564,6 +598,17 @@ public void accept(Response response) { } } + private static class DeltaResponseTracker implements Consumer { + + private final LinkedList responses = new LinkedList<>(); + + @Override + public void accept(DeltaResponse response) { + responses.add(response); + } + + } + private static class SingleNodeGroup implements NodeGroup { private static final String GROUP = "node";