From 2ba6cb727373e443170301e02a2be0b7d5bc93a5 Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Thu, 29 Aug 2024 15:02:52 -0700 Subject: [PATCH] Respect `startPublishing` call by always re-notifying watcher in XdsClientImpl When switching from the backup to the primary store (implemented in the XdsClientImpl), the data in the primary store is never replayed. This is different from the backup store behavior which respects the invocation of startPublishing and replays the contents of the store. This means that if the contents of the backup store are different from the contents of the primary store, and the client switches from the backup to the primary, the client will only see the backup values and not the primary values. --- .../com/linkedin/d2/xds/XdsClientImpl.java | 5 -- .../d2/xds/XdsToD2PropertiesAdaptor.java | 38 ++++------ .../linkedin/d2/xds/TestXdsClientImpl.java | 18 +++++ .../d2/xds/TestXdsToD2PropertiesAdaptor.java | 72 +++++++++++++------ 4 files changed, 81 insertions(+), 52 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 6f8a64bb39..3eb91ecd81 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -554,11 +554,6 @@ public void setData(@Nullable ResourceUpdate data) void addWatcher(ResourceWatcher watcher) { - if (_watchers.contains(watcher)) - { - _log.warn("Watcher {} already registered", watcher); - return; - } _watchers.add(watcher); if (_data != null) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index a8b9293135..4e8deabfb7 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -150,12 +150,9 @@ public void listenToCluster(String clusterName) } else { - _watchedClusterResources.computeIfAbsent(clusterName, k -> - { - XdsClient.NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedClusterResources.computeIfAbsent(clusterName, this::getClusterResourceWatcher); + _xdsClient.watchXdsResource(resourceName, watcher); } } @@ -169,36 +166,27 @@ public void listenToUris(String clusterName) } else { - _watchedUriResources.computeIfAbsent(clusterName, k -> - { - XdsClient.D2URIMapResourceWatcher watcher = getUriResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedUriResources.computeIfAbsent(clusterName, this::getUriResourceWatcher); + _xdsClient.watchXdsResource(resourceName, watcher); } } public void listenToService(String serviceName) { - _watchedServiceResources.computeIfAbsent(serviceName, k -> - { - XdsClient.NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); - _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedServiceResources.computeIfAbsent(serviceName, this::getServiceResourceWatcher); + _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher); } private void listenToSymlink(String name, String fullResourceName) { // use full resource name ("/d2/clusters/$FooClusterMater", "/d2/uris/$FooClusterMaster") as the key // instead of just the symlink name ("$FooClusterMaster") to differentiate clusters and uris symlink resources. - _watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> - { - // use symlink name "$FooClusterMaster" to create the watcher - XdsClient.NodeResourceWatcher watcher = getSymlinkResourceWatcher(fullResourceName, name); - _xdsClient.watchXdsResource(k, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> getSymlinkResourceWatcher(k, name)); + // use symlink name "$FooClusterMaster" to create the watcher + _xdsClient.watchXdsResource(fullResourceName, watcher); } XdsClient.NodeResourceWatcher getServiceResourceWatcher(String serviceName) diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index b356a64a1e..565a95c11e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Objects; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -442,6 +443,23 @@ public void testHandleD2URICollectionResponseWithRemoval() Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); } + @Test + public void testResourceSubscriberAddWatcher() + { + ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null); + XdsClient.ResourceWatcher watcher = Mockito.mock(XdsClient.ResourceWatcher.class); + subscriber.addWatcher(watcher); + verify(watcher, times(0)).onChanged(any()); + + D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap()); + subscriber.setData(update); + for (int i = 0; i < 10; i++) + { + subscriber.addWatcher(watcher); + } + verify(watcher, times(10)).onChanged(eq(update)); + } + private static class XdsClientImplFixture { XdsClientImpl _xdsClientImpl; diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java index 95e4b09c85..b630ecc86e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -46,7 +46,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static com.linkedin.d2.balancer.properties.PropertyKeys.*; +import static com.linkedin.d2.balancer.properties.PropertyKeys.ALLOWED_CLIENT_OVERRIDE_KEYS; +import static com.linkedin.d2.balancer.properties.PropertyKeys.HTTP_REQUEST_TIMEOUT; import static org.mockito.Mockito.*; @@ -112,10 +113,13 @@ public void testListenToService(Map clientOverride, Map clientOverride, Map uriMap = new HashMap<>(Collections.singletonMap(URI_NAME, protoUri)); fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap)); @@ -247,17 +266,23 @@ public void testListenToNormalUri() throws PropertySerializationException public void testListenToUriSymlink() throws PropertySerializationException { XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); - fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); + for (int i = 0; i < 10; i++) + { + fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); + } // verify symlink is watched - verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher()); // update symlink data NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher; - symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); + for (int i = 0; i < 10; i++) + { + symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); + } // verify actual cluster of the uris is watched - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher()); // update uri data D2URIMapResourceWatcher watcher = fixture._uriMapWatcher; @@ -269,9 +294,12 @@ public void testListenToUriSymlink() throws PropertySerializationException // test update symlink to a new primary cluster String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2; - symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2)); + for (int i = 0; i < 10; i++) + { + symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2)); + } - verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher()); verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME); // if the old primary cluster gets an update, it will be published under its original cluster name