Skip to content

Commit fab60c9

Browse files
committed
Merge pull request apache#592 from datastax/java852
JAVA-852: Ignore peers with null entries during auto-discovery.
2 parents 6afbc43 + 468eb79 commit fab60c9

File tree

6 files changed

+243
-59
lines changed

6 files changed

+243
-59
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
- [improvement] JAVA-444: Add Java process information to UUIDs.makeNode() hash.
3939
- [improvement] JAVA-977: Preserve original cause when BuiltStatement value can't be serialized.
4040
- [bug] JAVA-1094: Backport TypeCodec parse and format fixes from 3.0.
41+
- [improvement] JAVA-852: Ignore peers with null entries during discovery.
4142

4243
Merged from 2.0 branch:
4344

driver-core/pom.xml

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
limitations under the License.
1616
1717
-->
18-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
1920
<modelVersion>4.0.0</modelVersion>
2021
<parent>
2122
<groupId>com.datastax.cassandra</groupId>
@@ -25,7 +26,9 @@
2526
<artifactId>cassandra-driver-core</artifactId>
2627
<packaging>bundle</packaging>
2728
<name>DataStax Java Driver for Apache Cassandra - Core</name>
28-
<description>A driver for Apache Cassandra 1.2+ that works exclusively with the Cassandra Query Language version 3 (CQL3) and Cassandra's binary protocol.</description>
29+
<description>A driver for Apache Cassandra 1.2+ that works exclusively with the Cassandra Query Language version 3
30+
(CQL3) and Cassandra's binary protocol.
31+
</description>
2932
<url>https://github.com/datastax/java-driver</url>
3033

3134
<properties>
@@ -67,17 +70,17 @@
6770
<!-- Each of them is only a mandatory runtime dependency if you want to use the compression it offers -->
6871

6972
<dependency>
70-
<groupId>org.xerial.snappy</groupId>
71-
<artifactId>snappy-java</artifactId>
72-
<version>${snappy.version}</version>
73-
<optional>true</optional>
73+
<groupId>org.xerial.snappy</groupId>
74+
<artifactId>snappy-java</artifactId>
75+
<version>${snappy.version}</version>
76+
<optional>true</optional>
7477
</dependency>
7578

7679
<dependency>
77-
<groupId>net.jpountz.lz4</groupId>
78-
<artifactId>lz4</artifactId>
79-
<version>${lz4.version}</version>
80-
<optional>true</optional>
80+
<groupId>net.jpountz.lz4</groupId>
81+
<artifactId>lz4</artifactId>
82+
<version>${lz4.version}</version>
83+
<optional>true</optional>
8184
</dependency>
8285

8386
<!-- End of compression libraries -->
@@ -221,7 +224,9 @@
221224
<executions>
222225
<execution>
223226
<phase>package</phase>
224-
<goals><goal>shade</goal></goals>
227+
<goals>
228+
<goal>shade</goal>
229+
</goals>
225230
<configuration>
226231
<shadedArtifactAttached>true</shadedArtifactAttached>
227232
<artifactSet>
@@ -287,7 +292,7 @@
287292
</goals>
288293
</pluginExecutionFilter>
289294
<action>
290-
<ignore />
295+
<ignore/>
291296
</action>
292297
</pluginExecution>
293298
</pluginExecutions>
@@ -375,6 +380,7 @@
375380
<includes>
376381
<include>**/SSL*Test.java</include>
377382
<include>**/UUIDsPID*.java</include>
383+
<include>**/ControlConnectionTest.java</include>
378384
</includes>
379385
</configuration>
380386
</plugin>

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class ControlConnection implements Host.StateListener, Connection.Owner {
3838

3939
private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
4040

41+
private static final boolean EXTENDED_PEER_CHECK = SystemProperties.getBoolean("com.datastax.driver.EXTENDED_PEER_CHECK", true);
42+
4143
private static final InetAddress bindAllAddress;
4244

4345
static {
@@ -280,7 +282,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
280282
// We need to refresh the node list again;
281283
// We want that because the token map was not properly initialized by the first call above,
282284
// since it requires the list of keyspaces to be loaded.
283-
refreshNodeListAndTokenMap(connection, cluster, false, true);
285+
refreshNodeListAndTokenMap(connection, cluster, false, false);
284286

285287
return connection;
286288
} catch (BusyConnectionException e) {
@@ -421,19 +423,18 @@ void refreshNodeListAndTokenMap() {
421423
}
422424
}
423425

424-
private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster, boolean logMissingRpcAddresses) {
426+
private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster) {
425427
InetAddress peer = peersRow.getInet("peer");
426428
InetAddress addr = peersRow.getInet("rpc_address");
427429

428-
if (peer.equals(connectedHost.getAddress()) || (addr != null && addr.equals(connectedHost.getAddress()))) {
430+
// We've already called isValid on the row, which checks this
431+
assert addr != null;
432+
433+
if (peer.equals(connectedHost.getAddress()) || addr.equals(connectedHost.getAddress())) {
429434
// Some DSE versions were inserting a line for the local node in peers (with mostly null values). This has been fixed, but if we
430435
// detect that's the case, ignore it as it's not really a big deal.
431436
logger.debug("System.peers on node {} has a line for itself. This is not normal but is a known problem of some DSE version. Ignoring the entry.", connectedHost);
432437
return null;
433-
} else if (addr == null) {
434-
if (logMissingRpcAddresses)
435-
logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", peer, connectedHost, peer);
436-
return null;
437438
} else if (addr.equals(bindAllAddress)) {
438439
logger.warn("Found host with 0.0.0.0 as rpc_address, using listen_address ({}) to contact it instead. If this is incorrect you should avoid the use of 0.0.0.0 server side.", peer);
439440
addr = peer;
@@ -462,7 +463,7 @@ private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, B
462463
DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
463464
c.write(future);
464465
for (Row row : future.get()) {
465-
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster, true);
466+
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster);
466467
if (addr != null && addr.equals(host.getSocketAddress()))
467468
return row;
468469
}
@@ -492,11 +493,13 @@ boolean refreshNodeInfo(Host host) {
492493
logger.warn("No row found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
493494
return false;
494495
}
495-
// Ignore hosts with a null rpc_address, as this is most likely a phantom row in system.peers (JAVA-428).
496-
// Don't test this for the control host since we're already connected to it anyway, and we read the info from system.local
497-
// which doesn't have an rpc_address column (JAVA-546).
498-
} else if (!c.address.equals(host.getSocketAddress()) && row.getInet("rpc_address") == null) {
499-
logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
496+
}
497+
498+
// Ignore rows with invalid values, as this is most likely a phantom row in system.peers (JAVA-428,
499+
// JAVA-852).
500+
// Skip the control host since we're already connected to it anyway, and we read the info from system.local,
501+
// which doesn't have an rpc_address column (JAVA-546).
502+
if (!c.address.equals(host.getSocketAddress()) && !isValidPeer(row, true)) {
500503
return false;
501504
}
502505

@@ -554,7 +557,7 @@ private static void updateLocationInfo(Host host, String datacenter, String rack
554557
cluster.loadBalancingPolicy().onAdd(host);
555558
}
556559

557-
private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logMissingRpcAddresses) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
560+
private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logInvalidPeers) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
558561
logger.debug("[Control connection] Refreshing node list and token map");
559562

560563
boolean metadataEnabled = cluster.configuration.getQueryOptions().isMetadataEnabled();
@@ -603,10 +606,10 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
603606
List<Set<String>> allTokens = new ArrayList<Set<String>>();
604607

605608
for (Row row : peersFuture.get()) {
606-
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster, logMissingRpcAddresses);
607-
if (addr == null)
609+
if (!isValidPeer(row, logInvalidPeers))
608610
continue;
609611

612+
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster);
610613
foundHosts.add(addr);
611614
dcs.add(row.getString("data_center"));
612615
racks.add(row.getString("rack"));
@@ -654,6 +657,46 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
654657
cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
655658
}
656659

660+
private static boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
661+
boolean isValid = peerRow.getColumnDefinitions().contains("rpc_address")
662+
&& !peerRow.isNull("rpc_address");
663+
if (EXTENDED_PEER_CHECK) {
664+
isValid &= peerRow.getColumnDefinitions().contains("host_id")
665+
&& !peerRow.isNull("host_id")
666+
&& peerRow.getColumnDefinitions().contains("data_center")
667+
&& !peerRow.isNull("data_center")
668+
&& peerRow.getColumnDefinitions().contains("rack")
669+
&& !peerRow.isNull("rack")
670+
&& peerRow.getColumnDefinitions().contains("tokens")
671+
&& !peerRow.isNull("tokens");
672+
}
673+
if (!isValid && logIfInvalid)
674+
logger.warn("Found invalid row in system.peers: {}. " +
675+
"This is likely a gossip or snitch issue, this host will be ignored.", formatInvalidPeer(peerRow));
676+
return isValid;
677+
}
678+
679+
// Custom formatting to avoid spamming the logs if 'tokens' is present and contains a gazillion tokens
680+
private static String formatInvalidPeer(Row peerRow) {
681+
StringBuilder sb = new StringBuilder("[peer=" + peerRow.getInet("peer"));
682+
formatMissingOrNullColumn(peerRow, "rpc_address", sb);
683+
if (EXTENDED_PEER_CHECK) {
684+
formatMissingOrNullColumn(peerRow, "host_id", sb);
685+
formatMissingOrNullColumn(peerRow, "data_center", sb);
686+
formatMissingOrNullColumn(peerRow, "rack", sb);
687+
formatMissingOrNullColumn(peerRow, "tokens", sb);
688+
}
689+
sb.append("]");
690+
return sb.toString();
691+
}
692+
693+
private static void formatMissingOrNullColumn(Row peerRow, String columnName, StringBuilder sb) {
694+
if (!peerRow.getColumnDefinitions().contains(columnName))
695+
sb.append(", missing ").append(columnName);
696+
else if (peerRow.isNull(columnName))
697+
sb.append(", ").append(columnName).append("=null");
698+
}
699+
657700
boolean waitForSchemaAgreement() throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
658701
long start = System.nanoTime();
659702
long elapsed = 0;
@@ -690,7 +733,10 @@ boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionExcepti
690733

691734
for (Row row : peersFuture.get()) {
692735

693-
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster, true);
736+
if (!isValidPeer(row, false))
737+
continue;
738+
739+
InetSocketAddress addr = addressToUseForPeerHost(row, connection.address, cluster);
694740
if (addr == null || row.isNull("schema_version"))
695741
continue;
696742

driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collections;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.UUID;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.TimeoutException;
4041

@@ -140,8 +141,7 @@ public void should_handle_failing_or_missing_contact_points() throws UnknownHost
140141
}
141142
assertThat(cluster).host(failingHost.address).isReconnectingFromDown();
142143
}
143-
assertThat(cluster).host(missingHostAddress).isNull();
144-
144+
assertThat(TestUtils.findHost(cluster, missingHostAddress)).isNull();
145145
} finally {
146146
if (cluster != null)
147147
cluster.close();
@@ -275,6 +275,7 @@ private void primePeerRows(Scassandra scassandra, List<FakeHost> otherHosts) thr
275275
.put("rack", "rack1")
276276
.put("release_version", "2.0.1")
277277
.put("tokens", ImmutableSet.of(Long.toString(Long.MIN_VALUE + i++)))
278+
.put("host_id", UUID.randomUUID())
278279
.build());
279280
}
280281

0 commit comments

Comments
 (0)