Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SubnetAddressTranslator #2013

Open
wants to merge 9 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<optional>true</optional>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't love the inclusion of another dependency here, even if it's an optional one. It's only used in one class (near as I can tell)... is there really no way to get the functionality we need without adding this in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thing that comes to mind is implementing it ourselves (or in other words copying it over from the library). Lemme evaluate how much of the util code is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need at least the following functionality to work with subnets here:

  • Validate subnet string is in a prefix block format
  • Check if subnet contains IP address
  • All for IPv4 and IPv6

The library is quite big, so copying over its parts is an overkill.
Then the alternative is to implement these functions ourselves.
Looking into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of vibe-coding and we can have it with around 100 lines of code. Will work on integrating a change.

<dependency>
<groupId>com.github.stephenc.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,48 @@ public enum DefaultDriverOption implements DriverOption {
*
* <p>Value-type: boolean
*/
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san");
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"),
/**
* An address to always translate all node addresses to that same proxy hostname no matter what IP
* address a node has, but still using its native transport port.
*
* <p>Value-Type: {@link String}
*/
ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME("advanced.address-translator.advertised-hostname"),
/**
* A map of Cassandra node subnets (CIDR notations) to target addresses, for example (note quoted
* keys):
*
* <pre>
* advanced.address-translator.subnet-addresses {
* "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
* "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
* # IPv6 example:
* # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
* # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
* }
* </pre>
*
* Note: subnets must be represented as prefix blocks, see {@link
* inet.ipaddr.Address#isPrefixBlock()}.
*
* <p>Value type: {@link java.util.Map Map}&#60;{@link String},{@link String}&#62;
*/
ADDRESS_TRANSLATOR_SUBNET_ADDRESSES("advanced.address-translator.subnet-addresses"),
/**
* A default address to fallback to if Cassandra node IP isn't contained in any of the configured
* subnets.
*
* <p>Value-Type: {@link String}
*/
ADDRESS_TRANSLATOR_DEFAULT_ADDRESS("advanced.address-translator.default-address"),
/**
* Whether to resolve the addresses on initialization (if true) or on each node (re-)connection
* (if false). Defaults to false.
*
* <p>Value-Type: boolean
*/
ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses");

private final String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.util.AddressUtils;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
Expand All @@ -41,7 +38,22 @@ public static Set<EndPoint> merge(

Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
for (String spec : configContactPoints) {
for (InetSocketAddress address : extract(spec, resolve)) {

Set<InetSocketAddress> addresses = Collections.emptySet();
try {
addresses = AddressUtils.extract(spec, resolve);
} catch (RuntimeException e) {
LOG.warn("Ignoring invalid contact point {} ({})", spec, e.getMessage(), e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just continue here to next iteration of the outer for loop. You know addresses is the empty set at this point so there's no point iterating over it below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I look at this now it feels like we had to make this a bit more complicated because AddressUtils.extract() now throws exceptions in most cases rather than just logging errors and returning an empty set. Was there a particular reason for this change? It's not immediately clear the exceptions buy you much here.

Copy link
Contributor Author

@jahstreet jahstreet Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, #extract code was used only in this class and we logged errors together with reasons of these errors. Now the info about reasons is moved to util method, which is called from multiple places. In this class, I aimed to keep logging (as well as other functionality) as close to the origin as seemed possible to avoid opinionated refactoring, so I needed a way to get reasons of errors from the utility #extract to log them together with the context logs.
Happy to agree on the way it should look like and change accordingly.


if (addresses.size() > 1) {
LOG.info(
"Contact point {} resolves to multiple addresses, will use them all ({})",
spec,
addresses);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this log message offer us much useful information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it was there so I kept it as is.
As for me, this log is a good additional info when debugging failed to connect issues. Like, one could be surprised to see the client failed to connect logs where contact points do not match the configured ones.
What is your opinion on the need of it?


for (InetSocketAddress address : addresses) {
DefaultEndPoint endPoint = new DefaultEndPoint(address);
boolean wasNew = result.add(endPoint);
if (!wasNew) {
Expand All @@ -51,43 +63,4 @@ public static Set<EndPoint> merge(
}
return ImmutableSet.copyOf(result);
}

private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
int separator = spec.lastIndexOf(':');
if (separator < 0) {
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
return Collections.emptySet();
}

String host = spec.substring(0, separator);
String portSpec = spec.substring(separator + 1);
int port;
try {
port = Integer.parseInt(portSpec);
} catch (NumberFormatException e) {
LOG.warn("Ignoring invalid contact point {} (expecting a number, got {})", spec, portSpec);
return Collections.emptySet();
}
if (!resolve) {
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
} else {
try {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
if (inetAddresses.length > 1) {
LOG.info(
"Contact point {} resolves to multiple addresses, will use them all ({})",
spec,
Arrays.deepToString(inetAddresses));
}
Set<InetSocketAddress> result = new HashSet<>();
for (InetAddress inetAddress : inetAddresses) {
result.add(new InetSocketAddress(inetAddress, port));
}
return result;
} catch (UnknownHostException e) {
LOG.warn("Ignoring invalid contact point {} (unknown host {})", spec, host);
return Collections.emptySet();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
*/
package com.datastax.oss.driver.internal.core.addresstranslation;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;

import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.context.DriverContext;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetSocketAddress;
Expand All @@ -37,28 +38,13 @@ public class FixedHostNameAddressTranslator implements AddressTranslator {

private static final Logger LOG = LoggerFactory.getLogger(FixedHostNameAddressTranslator.class);

public static final String ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME =
"advanced.address-translator.advertised-hostname";

public static DriverOption ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION =
new DriverOption() {
@NonNull
@Override
public String getPath() {
return ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;
}
};

private final String advertisedHostname;
private final String logPrefix;

public FixedHostNameAddressTranslator(@NonNull DriverContext context) {
logPrefix = context.getSessionName();
advertisedHostname =
context
.getConfig()
.getDefaultProfile()
.getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION);
context.getConfig().getDefaultProfile().getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.addresstranslation;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_DEFAULT_ADDRESS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_SUBNET_ADDRESSES;

import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.internal.core.util.AddressUtils;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This translator returns the proxy address of the private subnet containing the Cassandra node IP,
* or default address if no matching subnets, or passes through the original node address if no
* default configured.
*
* <p>The translator can be used for scenarios when all nodes are behind some kind of proxy, and
* that proxy is different for nodes located in different subnets (eg. when Cassandra is deployed in
* multiple datacenters/regions). One can use this, for example, for Cassandra on Kubernetes with
* different Cassandra datacenters deployed to different Kubernetes clusters.
*/
public class SubnetAddressTranslator implements AddressTranslator {
private static final Logger LOG = LoggerFactory.getLogger(SubnetAddressTranslator.class);

private final List<SubnetAddress> subnetAddresses;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private final Optional<InetSocketAddress> defaultAddress;

private final String logPrefix;

public SubnetAddressTranslator(@NonNull DriverContext context) {
logPrefix = context.getSessionName();
boolean resolveAddresses =
context
.getConfig()
.getDefaultProfile()
.getBoolean(ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES, false);
this.subnetAddresses =
context.getConfig().getDefaultProfile().getStringMap(ADDRESS_TRANSLATOR_SUBNET_ADDRESSES)
.entrySet().stream()
.map(
e -> {
// Quoted and/or containing forward slashes map keys in reference.conf are read to
// strings with additional quotes, eg. 100.64.0.0/15 -> '100.64.0."0/15"' or
// "100.64.0.0/15" -> '"100.64.0.0/15"'
String subnetCIDR = e.getKey().replaceAll("\"", "");
String address = e.getValue();
return new SubnetAddress(subnetCIDR, parseAddress(address, resolveAddresses));
})
.collect(Collectors.toList());
this.defaultAddress =
Optional.ofNullable(
context
.getConfig()
.getDefaultProfile()
.getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS, null))
.map(address -> parseAddress(address, resolveAddresses));

validateSubnetsAreNotOverlapping(this.subnetAddresses);
}

@NonNull
@Override
public InetSocketAddress translate(@NonNull InetSocketAddress address) {
InetSocketAddress translatedAddress = null;
for (SubnetAddress subnetAddress : subnetAddresses) {
if (subnetAddress.contains(address)) {
translatedAddress = subnetAddress.address;
}
}
if (translatedAddress == null && defaultAddress.isPresent()) {
translatedAddress = defaultAddress.get();
}
if (translatedAddress == null) {
translatedAddress = address;
}
LOG.debug("[{}] Translated {} to {}", logPrefix, address, translatedAddress);
return translatedAddress;
}

@Override
public void close() {}

@Nullable
private InetSocketAddress parseAddress(String address, boolean resolve) {
try {
InetSocketAddress parsedAddress = AddressUtils.extract(address, resolve).iterator().next();
LOG.debug("[{}] Parsed {} to {}", logPrefix, address, parsedAddress);
return parsedAddress;
} catch (RuntimeException e) {
throw new IllegalArgumentException(
String.format("Invalid address %s (%s)", address, e.getMessage()), e);
}
}

private static void validateSubnetsAreNotOverlapping(List<SubnetAddress> subnetAddresses) {
for (int i = 0; i < subnetAddresses.size() - 1; i++) {
for (int j = i + 1; j < subnetAddresses.size(); j++) {
SubnetAddress subnetAddress1 = subnetAddresses.get(i);
SubnetAddress subnetAddress2 = subnetAddresses.get(j);
if (subnetAddress1.isOverlapping(subnetAddress2)) {
throw new IllegalArgumentException(
String.format(
"Configured subnets are overlapping: %s, %s",
subnetAddress1.subnet, subnetAddress2.subnet));
}
}
}
}

private static class SubnetAddress {
private final IPAddress subnet;
private final InetSocketAddress address;

private SubnetAddress(String subnetCIDR, InetSocketAddress address) {
this.subnet = parseSubnet(subnetCIDR);
this.address = address;
}

private static IPAddress parseSubnet(String subnetCIDR) {
IPAddress subnet = new IPAddressString(subnetCIDR).getAddress();
if (subnet == null) {
throw new IllegalArgumentException("Invalid subnet: " + subnetCIDR);
}
if (!subnet.isPrefixBlock()) {
throw new IllegalArgumentException(
String.format(
"Subnet %s must be represented as a network prefix block %s",
subnet, subnet.toPrefixBlock()));
}
return subnet;
}

private boolean isOverlapping(SubnetAddress other) {
IPAddress thisSubnet = this.subnet;
IPAddress otherSubnet = other.subnet;
return thisSubnet.contains(otherSubnet.getLower())
|| thisSubnet.contains(otherSubnet.getUpper())
|| otherSubnet.contains(thisSubnet.getLower())
|| otherSubnet.contains(thisSubnet.getUpper());
}

private boolean contains(InetSocketAddress address) {
IPAddress ipAddress = new IPAddressString(address.getAddress().getHostAddress()).getAddress();
if (subnet.isIPv4() && ipAddress.isIPv4Convertible()) {
ipAddress = ipAddress.toIPv4();
} else if (subnet.isIPv6() && ipAddress.isIPv6Convertible()) {
ipAddress = ipAddress.toIPv6();
}
return subnet.contains(ipAddress);
}
}
}
Loading