Skip to content

Commit

Permalink
Respect startPublishing call by always re-notifying watcher in XdsC…
Browse files Browse the repository at this point in the history
…lientImpl

When switching from the backup to the primary store (implemented in the
XdsClientImpl), the data in the primary store is never replayed. This is
different from the backup store behavior which respects the invocation of
startPublishing and replays the contents of the store. This means that if the
contents of the backup store are different from the contents of the primary
store, and the client switches from the backup to the primary, the client will
only see the backup values and not the primary values.
  • Loading branch information
PapaCharlie committed Aug 29, 2024
1 parent c67ff74 commit 2ba6cb7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 52 deletions.
5 changes: 0 additions & 5 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,6 @@ public void setData(@Nullable ResourceUpdate data)

void addWatcher(ResourceWatcher watcher)
{
if (_watchers.contains(watcher))
{
_log.warn("Watcher {} already registered", watcher);
return;
}
_watchers.add(watcher);
if (_data != null)
{
Expand Down
38 changes: 13 additions & 25 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ public void listenToCluster(String clusterName)
}
else
{
_watchedClusterResources.computeIfAbsent(clusterName, k ->
{
XdsClient.NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName);
_xdsClient.watchXdsResource(resourceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedClusterResources.computeIfAbsent(clusterName, this::getClusterResourceWatcher);
_xdsClient.watchXdsResource(resourceName, watcher);
}
}

Expand All @@ -169,36 +166,27 @@ public void listenToUris(String clusterName)
}
else
{
_watchedUriResources.computeIfAbsent(clusterName, k ->
{
XdsClient.D2URIMapResourceWatcher watcher = getUriResourceWatcher(clusterName);
_xdsClient.watchXdsResource(resourceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedUriResources.computeIfAbsent(clusterName, this::getUriResourceWatcher);
_xdsClient.watchXdsResource(resourceName, watcher);
}
}

public void listenToService(String serviceName)
{
_watchedServiceResources.computeIfAbsent(serviceName, k ->
{
XdsClient.NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedServiceResources.computeIfAbsent(serviceName, this::getServiceResourceWatcher);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher);
}

private void listenToSymlink(String name, String fullResourceName)
{
// use full resource name ("/d2/clusters/$FooClusterMater", "/d2/uris/$FooClusterMaster") as the key
// instead of just the symlink name ("$FooClusterMaster") to differentiate clusters and uris symlink resources.
_watchedSymlinkResources.computeIfAbsent(fullResourceName, k ->
{
// use symlink name "$FooClusterMaster" to create the watcher
XdsClient.NodeResourceWatcher watcher = getSymlinkResourceWatcher(fullResourceName, name);
_xdsClient.watchXdsResource(k, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> getSymlinkResourceWatcher(k, name));
// use symlink name "$FooClusterMaster" to create the watcher
_xdsClient.watchXdsResource(fullResourceName, watcher);
}

XdsClient.NodeResourceWatcher getServiceResourceWatcher(String serviceName)
Expand Down
18 changes: 18 additions & 0 deletions d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Objects;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -442,6 +443,23 @@ public void testHandleD2URICollectionResponseWithRemoval()
Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap());
}

@Test
public void testResourceSubscriberAddWatcher()
{
ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null);
XdsClient.ResourceWatcher watcher = Mockito.mock(XdsClient.ResourceWatcher.class);
subscriber.addWatcher(watcher);
verify(watcher, times(0)).onChanged(any());

D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap());
subscriber.setData(update);
for (int i = 0; i < 10; i++)
{
subscriber.addWatcher(watcher);
}
verify(watcher, times(10)).onChanged(eq(update));
}

private static class XdsClientImplFixture
{
XdsClientImpl _xdsClientImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static com.linkedin.d2.balancer.properties.PropertyKeys.*;
import static com.linkedin.d2.balancer.properties.PropertyKeys.ALLOWED_CLIENT_OVERRIDE_KEYS;
import static com.linkedin.d2.balancer.properties.PropertyKeys.HTTP_REQUEST_TIMEOUT;
import static org.mockito.Mockito.*;


Expand Down Expand Up @@ -112,10 +113,13 @@ public void testListenToService(Map<String, Object> clientOverride, Map<String,
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
String serviceName = "FooService";
fixture.getSpiedAdaptor(Collections.singletonMap(serviceName, clientOverride))
.listenToService(serviceName);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor(Collections.singletonMap(serviceName, clientOverride))
.listenToService(serviceName);
}

verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq("/d2/services/" + serviceName), anyNodeWatcher());

NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
symlinkNodeWatcher.onChanged(new XdsClient.NodeUpdate(XdsD2.Node.newBuilder()
Expand Down Expand Up @@ -150,28 +154,37 @@ public void testListenToService(Map<String, Object> clientOverride, Map<String,
public void testListenToNormalCluster()
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME);
}

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME, null, PRIMARY_CLUSTER_PROPERTIES);
}

@Test
public void testListenToClusterSymlink()
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME);
}

// verify symlink is watched
verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), anyNodeWatcher());

// update symlink data
NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
fixture._nodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME));
}

// verify both cluster and uri data of the actual cluster is watched
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyNodeWatcher());

// update cluster data
NodeResourceWatcher clusterNodeWatcher = fixture._nodeWatcher;
Expand All @@ -185,10 +198,13 @@ public void testListenToClusterSymlink()
String primaryClusterResourceName2 = CLUSTER_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
ClusterStoreProperties primaryClusterProperties2 = new ClusterStoreProperties(PRIMARY_CLUSTER_NAME_2);

symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryClusterResourceName2));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryClusterResourceName2));
}

verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), anyNodeWatcher());
verify(fixture._xdsClient).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryClusterResourceName2), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2), anyMapWatcher());
verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME, primaryClusterProperties2);

// if the old primary cluster gets an update, it will be published under its original cluster name
Expand All @@ -204,9 +220,12 @@ public void testListenToClusterSymlink()
public void testListenToNormalUri() throws PropertySerializationException
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME);
}

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
XdsD2.D2URI protoUri = getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME, VERSION);
Map<String, XdsD2.D2URI> uriMap = new HashMap<>(Collections.singletonMap(URI_NAME, protoUri));
fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap));
Expand Down Expand Up @@ -247,17 +266,23 @@ public void testListenToNormalUri() throws PropertySerializationException
public void testListenToUriSymlink() throws PropertySerializationException
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME);
}

// verify symlink is watched
verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher());

// update symlink data
NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));
}

// verify actual cluster of the uris is watched
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());

// update uri data
D2URIMapResourceWatcher watcher = fixture._uriMapWatcher;
Expand All @@ -269,9 +294,12 @@ public void testListenToUriSymlink() throws PropertySerializationException

// test update symlink to a new primary cluster
String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2));
}

verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher());
verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME);

// if the old primary cluster gets an update, it will be published under its original cluster name
Expand Down

0 comments on commit 2ba6cb7

Please sign in to comment.