Skip to content

Commit 4c75b91

Browse files
Kevin Gallardoolim7t
authored andcommitted
JAVA-852: Ignore peers with null entries during auto-discovery.
1 parent 6afbc43 commit 4c75b91

File tree

4 files changed

+71
-20
lines changed

4 files changed

+71
-20
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
- [improvement] JAVA-444: Add Java process information to UUIDs.makeNode() hash.
3939
- [improvement] JAVA-977: Preserve original cause when BuiltStatement value can't be serialized.
4040
- [bug] JAVA-1094: Backport TypeCodec parse and format fixes from 3.0.
41+
- [improvement] JAVA-852: Ignore peers with null entries during discovery.
4142

4243
Merged from 2.0 branch:
4344

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class ControlConnection implements Host.StateListener, Connection.Owner {
3838

3939
private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
4040

41+
private static final boolean EXTENDED_PEER_CHECK = SystemProperties.getBoolean("com.datastax.driver.EXTENDED_PEER_CHECK", true);
42+
4143
private static final InetAddress bindAllAddress;
4244

4345
static {
@@ -280,7 +282,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
280282
// We need to refresh the node list again;
281283
// We want that because the token map was not properly initialized by the first call above,
282284
// since it requires the list of keyspaces to be loaded.
283-
refreshNodeListAndTokenMap(connection, cluster, false, true);
285+
refreshNodeListAndTokenMap(connection, cluster, false, false);
284286

285287
return connection;
286288
} catch (BusyConnectionException e) {
@@ -421,19 +423,18 @@ void refreshNodeListAndTokenMap() {
421423
}
422424
}
423425

424-
private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster, boolean logMissingRpcAddresses) {
426+
private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster) {
425427
InetAddress peer = peersRow.getInet("peer");
426428
InetAddress addr = peersRow.getInet("rpc_address");
427429

428-
if (peer.equals(connectedHost.getAddress()) || (addr != null && addr.equals(connectedHost.getAddress()))) {
430+
// We've already called isValid on the row, which checks this
431+
assert addr != null;
432+
433+
if (peer.equals(connectedHost.getAddress()) || addr.equals(connectedHost.getAddress())) {
429434
// Some DSE versions were inserting a line for the local node in peers (with mostly null values). This has been fixed, but if we
430435
// detect that's the case, ignore it as it's not really a big deal.
431436
logger.debug("System.peers on node {} has a line for itself. This is not normal but is a known problem of some DSE version. Ignoring the entry.", connectedHost);
432437
return null;
433-
} else if (addr == null) {
434-
if (logMissingRpcAddresses)
435-
logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", peer, connectedHost, peer);
436-
return null;
437438
} else if (addr.equals(bindAllAddress)) {
438439
logger.warn("Found host with 0.0.0.0 as rpc_address, using listen_address ({}) to contact it instead. If this is incorrect you should avoid the use of 0.0.0.0 server side.", peer);
439440
addr = peer;
@@ -462,7 +463,7 @@ private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, B
462463
DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
463464
c.write(future);
464465
for (Row row : future.get()) {
465-
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster, true);
466+
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster);
466467
if (addr != null && addr.equals(host.getSocketAddress()))
467468
return row;
468469
}
@@ -492,11 +493,13 @@ boolean refreshNodeInfo(Host host) {
492493
logger.warn("No row found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
493494
return false;
494495
}
495-
// Ignore hosts with a null rpc_address, as this is most likely a phantom row in system.peers (JAVA-428).
496-
// Don't test this for the control host since we're already connected to it anyway, and we read the info from system.local
497-
// which doesn't have an rpc_address column (JAVA-546).
498-
} else if (!c.address.equals(host.getSocketAddress()) && row.getInet("rpc_address") == null) {
499-
logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
496+
}
497+
498+
// Ignore rows with invalid values, as this is most likely a phantom row in system.peers (JAVA-428,
499+
// JAVA-852).
500+
// Skip the control host since we're already connected to it anyway, and we read the info from system.local,
501+
// which doesn't have an rpc_address column (JAVA-546).
502+
if (!c.address.equals(host.getSocketAddress()) && !isValidPeer(row, true)) {
500503
return false;
501504
}
502505

@@ -554,7 +557,7 @@ private static void updateLocationInfo(Host host, String datacenter, String rack
554557
cluster.loadBalancingPolicy().onAdd(host);
555558
}
556559

557-
private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logMissingRpcAddresses) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
560+
private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logInvalidPeers) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
558561
logger.debug("[Control connection] Refreshing node list and token map");
559562

560563
boolean metadataEnabled = cluster.configuration.getQueryOptions().isMetadataEnabled();
@@ -603,10 +606,10 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
603606
List<Set<String>> allTokens = new ArrayList<Set<String>>();
604607

605608
for (Row row : peersFuture.get()) {
606-
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster, logMissingRpcAddresses);
607-
if (addr == null)
609+
if (!isValidPeer(row, logInvalidPeers))
608610
continue;
609611

612+
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster);
610613
foundHosts.add(addr);
611614
dcs.add(row.getString("data_center"));
612615
racks.add(row.getString("rack"));
@@ -654,6 +657,46 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
654657
cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
655658
}
656659

660+
private static boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
661+
boolean isValid = peerRow.getColumnDefinitions().contains("rpc_address")
662+
&& !peerRow.isNull("rpc_address");
663+
if (EXTENDED_PEER_CHECK) {
664+
isValid &= peerRow.getColumnDefinitions().contains("host_id")
665+
&& !peerRow.isNull("host_id")
666+
&& peerRow.getColumnDefinitions().contains("data_center")
667+
&& !peerRow.isNull("data_center")
668+
&& peerRow.getColumnDefinitions().contains("rack")
669+
&& !peerRow.isNull("rack")
670+
&& peerRow.getColumnDefinitions().contains("tokens")
671+
&& !peerRow.isNull("tokens");
672+
}
673+
if (!isValid && logIfInvalid)
674+
logger.warn("Found invalid row in system.peers: {}. " +
675+
"This is likely a gossip or snitch issue, this host will be ignored.", formatInvalidPeer(peerRow));
676+
return isValid;
677+
}
678+
679+
// Custom formatting to avoid spamming the logs if 'tokens' is present and contains a gazillion tokens
680+
private static String formatInvalidPeer(Row peerRow) {
681+
StringBuilder sb = new StringBuilder("[peer=" + peerRow.getInet("peer"));
682+
formatMissingOrNullColumn(peerRow, "rpc_address", sb);
683+
if (EXTENDED_PEER_CHECK) {
684+
formatMissingOrNullColumn(peerRow, "host_id", sb);
685+
formatMissingOrNullColumn(peerRow, "data_center", sb);
686+
formatMissingOrNullColumn(peerRow, "rack", sb);
687+
formatMissingOrNullColumn(peerRow, "tokens", sb);
688+
}
689+
sb.append("]");
690+
return sb.toString();
691+
}
692+
693+
private static void formatMissingOrNullColumn(Row peerRow, String columnName, StringBuilder sb) {
694+
if (!peerRow.getColumnDefinitions().contains(columnName))
695+
sb.append(", missing ").append(columnName);
696+
else if (peerRow.isNull(columnName))
697+
sb.append(", ").append(columnName).append("=null");
698+
}
699+
657700
boolean waitForSchemaAgreement() throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
658701
long start = System.nanoTime();
659702
long elapsed = 0;
@@ -690,7 +733,10 @@ boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionExcepti
690733

691734
for (Row row : peersFuture.get()) {
692735

693-
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster, true);
736+
if (!isValidPeer(row, false))
737+
continue;
738+
739+
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster);
694740
if (addr == null || row.isNull("schema_version"))
695741
continue;
696742

driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collections;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.UUID;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.TimeoutException;
4041

@@ -140,8 +141,7 @@ public void should_handle_failing_or_missing_contact_points() throws UnknownHost
140141
}
141142
assertThat(cluster).host(failingHost.address).isReconnectingFromDown();
142143
}
143-
assertThat(cluster).host(missingHostAddress).isNull();
144-
144+
assertThat(TestUtils.findHost(cluster, missingHostAddress)).isNull();
145145
} finally {
146146
if (cluster != null)
147147
cluster.close();
@@ -275,6 +275,7 @@ private void primePeerRows(Scassandra scassandra, List<FakeHost> otherHosts) thr
275275
.put("rack", "rack1")
276276
.put("release_version", "2.0.1")
277277
.put("tokens", ImmutableSet.of(Long.toString(Long.MIN_VALUE + i++)))
278+
.put("host_id", UUID.randomUUID())
278279
.build());
279280
}
280281

driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.beust.jcommander.internal.Lists;
1919
import com.beust.jcommander.internal.Maps;
20+
import com.datastax.driver.core.utils.UUIDs;
2021
import com.google.common.collect.ImmutableList;
2122
import com.google.common.collect.ImmutableMap;
2223
import com.google.common.collect.ImmutableSet;
@@ -310,6 +311,7 @@ private void primeMetadata(Scassandra node) {
310311
.put("rack", "rack1")
311312
.put("release_version", getPeerInfo(dc, n + 1, "release_version", "2.1.8"))
312313
.put("tokens", ImmutableSet.of(Long.toString(tokens.get(n))))
314+
.put("host_id", UUIDs.random())
313315
.build();
314316
rows.add(row);
315317
}
@@ -373,7 +375,8 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
373375
column("data_center", TEXT),
374376
column("rack", TEXT),
375377
column("release_version", TEXT),
376-
column("tokens", set(TEXT))
378+
column("tokens", set(TEXT)),
379+
column("host_id", UUID)
377380
};
378381

379382
public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = {

0 commit comments

Comments
 (0)