Skip to content

Commit 05f38bf

Browse files
author
alexsa
committed
Make ports required and move ContactPoints#extract to AddressUtils
1 parent b6bc879 commit 05f38bf

File tree

7 files changed

+166
-143
lines changed

7 files changed

+166
-143
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2121
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
22+
import com.datastax.oss.driver.internal.core.util.AddressUtils;
2223
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
2324
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
24-
import java.net.InetAddress;
2525
import java.net.InetSocketAddress;
26-
import java.net.UnknownHostException;
27-
import java.util.Arrays;
2826
import java.util.Collections;
29-
import java.util.HashSet;
3027
import java.util.List;
3128
import java.util.Set;
3229
import org.slf4j.Logger;
@@ -41,7 +38,22 @@ public static Set<EndPoint> merge(
4138

4239
Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
4340
for (String spec : configContactPoints) {
44-
for (InetSocketAddress address : extract(spec, resolve)) {
41+
42+
Set<InetSocketAddress> addresses = Collections.emptySet();
43+
try {
44+
addresses = AddressUtils.extract(spec, resolve);
45+
} catch (RuntimeException e) {
46+
LOG.warn("Ignoring invalid contact point {} ({})", spec, e.getMessage());
47+
}
48+
49+
if (addresses.size() > 1) {
50+
LOG.info(
51+
"Contact point {} resolves to multiple addresses, will use them all ({})",
52+
spec,
53+
addresses);
54+
}
55+
56+
for (InetSocketAddress address : addresses) {
4557
DefaultEndPoint endPoint = new DefaultEndPoint(address);
4658
boolean wasNew = result.add(endPoint);
4759
if (!wasNew) {
@@ -51,43 +63,4 @@ public static Set<EndPoint> merge(
5163
}
5264
return ImmutableSet.copyOf(result);
5365
}
54-
55-
private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
56-
int separator = spec.lastIndexOf(':');
57-
if (separator < 0) {
58-
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
59-
return Collections.emptySet();
60-
}
61-
62-
String host = spec.substring(0, separator);
63-
String portSpec = spec.substring(separator + 1);
64-
int port;
65-
try {
66-
port = Integer.parseInt(portSpec);
67-
} catch (NumberFormatException e) {
68-
LOG.warn("Ignoring invalid contact point {} (expecting a number, got {})", spec, portSpec);
69-
return Collections.emptySet();
70-
}
71-
if (!resolve) {
72-
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
73-
} else {
74-
try {
75-
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
76-
if (inetAddresses.length > 1) {
77-
LOG.info(
78-
"Contact point {} resolves to multiple addresses, will use them all ({})",
79-
spec,
80-
Arrays.deepToString(inetAddresses));
81-
}
82-
Set<InetSocketAddress> result = new HashSet<>();
83-
for (InetAddress inetAddress : inetAddresses) {
84-
result.add(new InetSocketAddress(inetAddress, port));
85-
}
86-
return result;
87-
} catch (UnknownHostException e) {
88-
LOG.warn("Ignoring invalid contact point {} (unknown host {})", spec, host);
89-
return Collections.emptySet();
90-
}
91-
}
92-
}
9366
}

core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslator.java

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717
*/
1818
package com.datastax.oss.driver.internal.core.addresstranslation;
1919

20+
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.RESOLVE_CONTACT_POINTS;
21+
2022
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
2123
import com.datastax.oss.driver.api.core.config.DriverOption;
2224
import com.datastax.oss.driver.api.core.context.DriverContext;
23-
import com.google.common.base.Splitter;
25+
import com.datastax.oss.driver.internal.core.util.AddressUtils;
2426
import edu.umd.cs.findbugs.annotations.NonNull;
27+
import edu.umd.cs.findbugs.annotations.Nullable;
2528
import inet.ipaddr.IPAddress;
2629
import inet.ipaddr.IPAddressString;
2730
import java.net.InetSocketAddress;
2831
import java.util.List;
29-
import java.util.Objects;
3032
import java.util.Optional;
3133
import java.util.stream.Collectors;
3234
import org.slf4j.Logger;
@@ -43,7 +45,6 @@
4345
* different Cassandra datacenters deployed to different Kubernetes clusters.
4446
*/
4547
public class SubnetAddressTranslator implements AddressTranslator {
46-
4748
private static final Logger LOG = LoggerFactory.getLogger(SubnetAddressTranslator.class);
4849

4950
/**
@@ -53,23 +54,22 @@ public class SubnetAddressTranslator implements AddressTranslator {
5354
* <pre>
5455
* advanced.address-translator.subnet-addresses {
5556
* "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
56-
* "100.66.0.0/15" = "cassandra.datacenter2.com"
57+
* "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
5758
* # IPv6 example:
5859
* # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
59-
* # "::ffff:6442:0/111" = "cassandra.datacenter2.com"
60+
* # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
6061
* }
6162
* </pre>
6263
*
63-
* If configured without port, the default 9042 will be used. Also supports IPv6 subnets. Note:
64-
* subnets must be represented as prefix blocks, see {@link inet.ipaddr.Address#isPrefixBlock()}.
64+
* Note: subnets must be represented as prefix blocks, see {@link
65+
* inet.ipaddr.Address#isPrefixBlock()}.
6566
*/
6667
public static final String ADDRESS_TRANSLATOR_SUBNET_ADDRESSES =
6768
"advanced.address-translator.subnet-addresses";
6869

6970
/**
7071
* A default address to fallback to if Cassandra node IP isn't contained in any of the configured
71-
* subnets. If configured without port, the default 9042 will be used. Also supports IPv6
72-
* addresses.
72+
* subnets.
7373
*/
7474
public static final String ADDRESS_TRANSLATOR_DEFAULT_ADDRESS =
7575
"advanced.address-translator.default-address";
@@ -92,11 +92,9 @@ public String getPath() {
9292
}
9393
};
9494

95-
private static final String DELIMITER = ":";
96-
private static final int DEFAULT_PORT = 9042;
97-
9895
private final List<SubnetAddress> subnetAddresses;
99-
private final Optional<InetSocketAddress> defaultAddress;
96+
private final Optional<String> defaultAddress;
97+
private final boolean resolveAddresses;
10098
private final String logPrefix;
10199

102100
public SubnetAddressTranslator(@NonNull DriverContext context) {
@@ -109,19 +107,22 @@ public SubnetAddressTranslator(@NonNull DriverContext context) {
109107
// Quoted and/or containing forward slashes map keys in reference.conf are read to
110108
// strings with additional quotes, eg. 100.64.0.0/15 -> '100.64.0."0/15"' or
111109
// "100.64.0.0/15" -> '"100.64.0.0/15"'
112-
String subnet = e.getKey().replaceAll("\"", "");
110+
String subnetCIDR = e.getKey().replaceAll("\"", "");
113111
String address = e.getValue();
114-
return new SubnetAddress(subnet, address);
112+
return new SubnetAddress(subnetCIDR, address);
115113
})
116114
.collect(Collectors.toList());
117115
this.defaultAddress =
118116
Optional.ofNullable(
119-
context
120-
.getConfig()
121-
.getDefaultProfile()
122-
.getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS_OPTION, null))
123-
.map(SubnetAddressTranslator::parseAddress);
124-
SubnetAddressTranslator.validateSubnetsAreNotOverlapping(this.subnetAddresses);
117+
context
118+
.getConfig()
119+
.getDefaultProfile()
120+
.getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS_OPTION, null));
121+
this.resolveAddresses =
122+
context.getConfig().getDefaultProfile().getBoolean(RESOLVE_CONTACT_POINTS, true);
123+
124+
validateSubnetsAreNotOverlapping(this.subnetAddresses);
125+
this.defaultAddress.ifPresent(SubnetAddressTranslator::validateAddress);
125126
}
126127

127128
@NonNull
@@ -130,11 +131,11 @@ public InetSocketAddress translate(@NonNull InetSocketAddress address) {
130131
InetSocketAddress translatedAddress = null;
131132
for (SubnetAddress subnetAddress : subnetAddresses) {
132133
if (subnetAddress.contains(address)) {
133-
translatedAddress = subnetAddress.address;
134+
translatedAddress = parseAddress(subnetAddress.address, resolveAddresses);
134135
}
135136
}
136137
if (translatedAddress == null && defaultAddress.isPresent()) {
137-
translatedAddress = defaultAddress.get();
138+
translatedAddress = parseAddress(defaultAddress.get(), resolveAddresses);
138139
}
139140
if (translatedAddress == null) {
140141
translatedAddress = address;
@@ -146,15 +147,17 @@ public InetSocketAddress translate(@NonNull InetSocketAddress address) {
146147
@Override
147148
public void close() {}
148149

149-
private static InetSocketAddress parseAddress(String address) {
150-
List<String> addressTuple = Splitter.onPattern(DELIMITER).splitToList(address);
151-
if (addressTuple.size() == 2) {
152-
return new InetSocketAddress(addressTuple.get(0), Integer.parseInt(addressTuple.get(1)));
153-
}
154-
if (addressTuple.size() == 1) {
155-
return new InetSocketAddress(addressTuple.get(0), DEFAULT_PORT);
150+
@Nullable
151+
private static InetSocketAddress parseAddress(String address, boolean resolve) {
152+
return AddressUtils.extract(address, resolve).iterator().next();
153+
}
154+
155+
private static void validateAddress(String address) {
156+
try {
157+
parseAddress(address, false);
158+
} catch (RuntimeException e) {
159+
throw new IllegalArgumentException("Invalid address: " + address, e);
156160
}
157-
throw new IllegalArgumentException("Invalid default address: " + address);
158161
}
159162

160163
private static void validateSubnetsAreNotOverlapping(List<SubnetAddress> subnetAddresses) {
@@ -174,21 +177,28 @@ private static void validateSubnetsAreNotOverlapping(List<SubnetAddress> subnetA
174177

175178
private static class SubnetAddress {
176179
private final IPAddress subnet;
177-
private final InetSocketAddress address;
180+
private final String address;
181+
182+
private SubnetAddress(String subnetCIDR, String address) {
183+
this.subnet = parseSubnet(subnetCIDR);
184+
this.address = address;
178185

179-
private SubnetAddress(String subnet, String address) {
180-
IPAddress subnetIpAddress = new IPAddressString(subnet).getAddress();
181-
if (subnetIpAddress == null) {
182-
throw new IllegalArgumentException("Invalid subnet: " + subnet);
186+
validateAddress(this.address);
187+
}
188+
189+
private static IPAddress parseSubnet(String subnetCIDR) {
190+
IPAddress subnet = new IPAddressString(subnetCIDR).getAddress();
191+
if (subnet == null) {
192+
throw new IllegalArgumentException("Invalid subnet: " + subnetCIDR);
183193
}
184-
if (!subnetIpAddress.isPrefixBlock()) {
194+
if (!subnet.isPrefixBlock()) {
195+
185196
throw new IllegalArgumentException(
186197
String.format(
187198
"Subnet %s must be represented as a network prefix block %s",
188-
subnetIpAddress, subnetIpAddress.toPrefixBlock()));
199+
subnet, subnet.toPrefixBlock()));
189200
}
190-
this.subnet = subnetIpAddress;
191-
this.address = parseAddress(address);
201+
return subnet;
192202
}
193203

194204
private boolean isOverlapping(SubnetAddress other) {
@@ -209,18 +219,5 @@ private boolean contains(InetSocketAddress address) {
209219
}
210220
return subnet.contains(ipAddress);
211221
}
212-
213-
@Override
214-
public boolean equals(Object other) {
215-
if (this == other) return true;
216-
if (!(other instanceof SubnetAddress)) return false;
217-
SubnetAddress that = (SubnetAddress) other;
218-
return Objects.equals(subnet, that.subnet) && Objects.equals(address, that.address);
219-
}
220-
221-
@Override
222-
public int hashCode() {
223-
return Objects.hash(subnet, address);
224-
}
225222
}
226223
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.util;
19+
20+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
21+
import java.net.InetAddress;
22+
import java.net.InetSocketAddress;
23+
import java.net.UnknownHostException;
24+
import java.util.HashSet;
25+
import java.util.Set;
26+
27+
public class AddressUtils {
28+
29+
public static Set<InetSocketAddress> extract(String address, boolean resolve) {
30+
int separator = address.lastIndexOf(':');
31+
if (separator < 0) {
32+
throw new IllegalArgumentException("expecting format host:port");
33+
}
34+
35+
String host = address.substring(0, separator);
36+
String portString = address.substring(separator + 1);
37+
int port;
38+
try {
39+
port = Integer.parseInt(portString);
40+
} catch (NumberFormatException e) {
41+
throw new IllegalArgumentException("expecting port to be a number, got " + portString, e);
42+
}
43+
if (!resolve) {
44+
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
45+
} else {
46+
InetAddress[] inetAddresses;
47+
try {
48+
inetAddresses = InetAddress.getAllByName(host);
49+
} catch (UnknownHostException e) {
50+
throw new RuntimeException("unknown host " + host, e);
51+
}
52+
Set<InetSocketAddress> result = new HashSet<>();
53+
for (InetAddress inetAddress : inetAddresses) {
54+
result.add(new InetSocketAddress(inetAddress, port));
55+
}
56+
return result;
57+
}
58+
}
59+
}

core/src/main/resources/reference.conf

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,16 +1040,18 @@ datastax-java-driver {
10401040
# This property has to be set only in case you use FixedHostNameAddressTranslator.
10411041
# advertised-hostname = mycustomhostname
10421042
#
1043-
# Theses properties have to be set only in case you use SubnetAddressTranslator.
1043+
# These properties are only applicable in case you use SubnetAddressTranslator.
10441044
# subnet-addresses {
10451045
# "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
1046-
# "100.66.0.0/15" = "cassandra.datacenter2.com" # port defaults to 9042 if not specified
1046+
# "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
10471047
# # IPv6 example:
10481048
# # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
1049-
# # "::ffff:6442:0/111" = "cassandra.datacenter2.com" # port defaults to 9042 if not specified
1049+
# # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
10501050
# }
1051-
# Optional. When configured, addresses not matching the configured subnets are translated to it. Port defaults to 9042 if not specified.
1051+
# Optional. When configured, addresses not matching the configured subnets are translated this address.
10521052
# default-address = "cassandra.datacenter1.com:9042"
1053+
# Note: `advanced.resolve-contact-points` (see below) is used to determine whether to resolve subnet
1054+
# and default address on translation.
10531055
}
10541056

10551057
# Whether to resolve the addresses passed to `basic.contact-points`.

core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void should_ignore_malformed_host_and_port_and_warn() {
121121
ContactPoints.merge(Collections.emptySet(), ImmutableList.of("foobar"), true);
122122

123123
assertThat(endPoints).isEmpty();
124-
assertLog(Level.WARN, "Ignoring invalid contact point foobar (expecting host:port)");
124+
assertLog(Level.WARN, "Ignoring invalid contact point foobar (expecting format host:port)");
125125
}
126126

127127
@Test
@@ -132,7 +132,7 @@ public void should_ignore_malformed_port_and_warn() {
132132
assertThat(endPoints).isEmpty();
133133
assertLog(
134134
Level.WARN,
135-
"Ignoring invalid contact point 127.0.0.1:foobar (expecting a number, got foobar)");
135+
"Ignoring invalid contact point 127.0.0.1:foobar (expecting port to be a number, got foobar)");
136136
}
137137

138138
@Test

0 commit comments

Comments
 (0)