Skip to content

4.x: Add advanced shard awareness #517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value-type: boolean
*/
CONNECTION_WARN_INIT_ERROR("advanced.connection.warn-on-init-error"),
/**
* Whether to use advanced shard awareness.
*
* <p>Value-type: boolean
*/
CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED(
"advanced.connection.advanced-shard-awareness.enabled"),
/** Inclusive lower bound of port range to use in advanced shard awareness */
ADVANCED_SHARD_AWARENESS_PORT_LOW("advanced.connection.advanced-shard-awareness.port-low"),
/** Inclusive upper bound of port range to use in advanced shard awareness */
ADVANCED_SHARD_AWARENESS_PORT_HIGH("advanced.connection.advanced-shard-awareness.port-high"),
/**
* The number of connections in the LOCAL pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONNECTION_MAX_REQUESTS, 1024);
map.put(TypedDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS, 256);
map.put(TypedDriverOption.CONNECTION_WARN_INIT_ERROR, true);
map.put(TypedDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true);
map.put(TypedDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000);
map.put(TypedDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 65535);
map.put(TypedDriverOption.RECONNECT_ON_INIT, false);
map.put(TypedDriverOption.RECONNECTION_POLICY_CLASS, "ExponentialReconnectionPolicy");
map.put(TypedDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ public String toString() {
/** Whether to log non-fatal errors when the driver tries to open a new connection. */
public static final TypedDriverOption<Boolean> CONNECTION_WARN_INIT_ERROR =
new TypedDriverOption<>(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR, GenericType.BOOLEAN);
/** Whether to use advanced shard awareness */
public static final TypedDriverOption<Boolean> CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED =
new TypedDriverOption<>(
DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, GenericType.BOOLEAN);
/** Inclusive lower bound of port range to use in advanced shard awareness */
public static final TypedDriverOption<Integer> ADVANCED_SHARD_AWARENESS_PORT_LOW =
new TypedDriverOption<>(
DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, GenericType.INTEGER);
/** Inclusive upper bound of port range to use in advanced shard awareness */
public static final TypedDriverOption<Integer> ADVANCED_SHARD_AWARENESS_PORT_HIGH =
new TypedDriverOption<>(
DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, GenericType.INTEGER);
/** The number of connections in the LOCAL pool. */
public static final TypedDriverOption<Integer> CONNECTION_POOL_LOCAL_SIZE =
new TypedDriverOption<>(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, GenericType.INTEGER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeShardingInfo;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.config.typesafe.TypesafeDriverConfig;
Expand All @@ -51,13 +53,17 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -153,12 +159,27 @@ public CompletionStage<DriverChannel> connect(Node node, DriverChannelOptions op
} else {
nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE;
}
return connect(node.getEndPoint(), options, nodeMetricUpdater);
return connect(node.getEndPoint(), null, null, options, nodeMetricUpdater);
}

public CompletionStage<DriverChannel> connect(
Node node, Integer shardId, DriverChannelOptions options) {
NodeMetricUpdater nodeMetricUpdater;
if (node instanceof DefaultNode) {
nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
} else {
nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE;
}
return connect(node.getEndPoint(), node.getShardingInfo(), shardId, options, nodeMetricUpdater);
}

@VisibleForTesting
CompletionStage<DriverChannel> connect(
EndPoint endPoint, DriverChannelOptions options, NodeMetricUpdater nodeMetricUpdater) {
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater) {
CompletableFuture<DriverChannel> resultFuture = new CompletableFuture<>();

ProtocolVersion currentVersion;
Expand All @@ -174,6 +195,8 @@ CompletionStage<DriverChannel> connect(

connect(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
Expand All @@ -185,6 +208,8 @@ CompletionStage<DriverChannel> connect(

private void connect(
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
Expand All @@ -204,7 +229,28 @@ private void connect(

nettyOptions.afterBootstrapInitialized(bootstrap);

ChannelFuture connectFuture = bootstrap.connect(endPoint.resolve());
ChannelFuture connectFuture;
if (shardId == null || shardingInfo == null) {
if (shardId != null) {
LOG.debug(
"Requested connection to shard {} but shardingInfo is currently missing for Node at endpoint {}. Falling back to arbitrary local port.",
shardId,
endPoint);
}
connectFuture = bootstrap.connect(endPoint.resolve());
} else {
int localPort =
PortAllocator.getNextAvailablePort(shardingInfo.getShardsCount(), shardId, context);
if (localPort == -1) {
LOG.warn(
"Could not find free port for shard {} at {}. Falling back to arbitrary local port.",
shardId,
endPoint);
connectFuture = bootstrap.connect(endPoint.resolve());
} else {
connectFuture = bootstrap.connect(endPoint.resolve(), new InetSocketAddress(localPort));
}
}

connectFuture.addListener(
cf -> {
Expand Down Expand Up @@ -253,6 +299,8 @@ private void connect(
downgraded.get());
connect(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
downgraded.get(),
Expand Down Expand Up @@ -391,4 +439,92 @@ protected void initChannel(Channel channel) {
}
}
}

static class PortAllocator {
private static final AtomicInteger lastPort = new AtomicInteger(-1);
private static final Logger LOG = LoggerFactory.getLogger(PortAllocator.class);

public static int getNextAvailablePort(int shardCount, int shardId, DriverContext context) {
int lowPort =
context
.getConfig()
.getDefaultProfile()
.getInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW);
int highPort =
context
.getConfig()
.getDefaultProfile()
.getInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH);
if (highPort - lowPort < shardCount) {
LOG.error(
"There is not enough ports in range [{},{}] for {} shards. Update your configuration.",
lowPort,
highPort,
shardCount);
}
int lastPortValue, foundPort = -1;
do {
lastPortValue = lastPort.get();

// We will scan from lastPortValue
// (or lowPort is there was no lastPort or lastPort is too low)
int scanStart = lastPortValue == -1 ? lowPort : lastPortValue;
if (scanStart < lowPort) {
scanStart = lowPort;
}

// Round it up to "% shardCount == shardId"
scanStart += (shardCount - scanStart % shardCount) + shardId;

// Scan from scanStart upwards to highPort.
for (int port = scanStart; port <= highPort; port += shardCount) {
if (isTcpPortAvailable(port, context)) {
foundPort = port;
break;
}
}

// If we started scanning from a high scanStart port
// there might have been not enough ports left that are
// smaller than highPort. Scan from the beginning
// from the lowPort.
if (foundPort == -1) {
scanStart = lowPort + (shardCount - lowPort % shardCount) + shardId;

for (int port = scanStart; port <= highPort; port += shardCount) {
if (isTcpPortAvailable(port, context)) {
foundPort = port;
break;
}
}
}

// No luck! All ports taken!
if (foundPort == -1) {
return -1;
}
} while (!lastPort.compareAndSet(lastPortValue, foundPort));

return foundPort;
}

public static boolean isTcpPortAvailable(int port, DriverContext context) {
try {
ServerSocket serverSocket = new ServerSocket();
try {
serverSocket.setReuseAddress(
context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, false));
serverSocket.bind(new InetSocketAddress(port), 1);
return true;
} finally {
serverSocket.close();
}
} catch (IOException ex) {
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -489,9 +490,23 @@ private CompletionStage<Boolean> addMissingChannels() {
channels.length * wantedCount - Arrays.stream(channels).mapToInt(ChannelSet::size).sum();
LOG.debug("[{}] Trying to create {} missing channels", logPrefix, missing);
DriverChannelOptions options = buildDriverOptions();
for (int i = 0; i < missing; i++) {
CompletionStage<DriverChannel> channelFuture = channelFactory.connect(node, options);
pendingChannels.add(channelFuture);
for (int shard = 0; shard < channels.length; shard++) {
LOG.trace(
"[{}] Missing {} channels for shard {}",
logPrefix,
wantedCount - channels[shard].size(),
shard);
for (int p = channels[shard].size(); p < wantedCount; p++) {
CompletionStage<DriverChannel> channelFuture;
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED)) {
channelFuture = channelFactory.connect(node, shard, options);
} else {
channelFuture = channelFactory.connect(node, options);
}
pendingChannels.add(channelFuture);
}
}
return CompletableFutures.allDone(pendingChannels)
.thenApplyAsync(this::onAllConnected, adminExecutor);
Expand Down Expand Up @@ -551,6 +566,23 @@ private boolean onAllConnected(@SuppressWarnings("unused") Void v) {
channel);
channel.forceClose();
} else {
if (config
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED)
&& channel.localAddress() instanceof InetSocketAddress
&& channel.getShardingInfo() != null) {
int port = ((InetSocketAddress) channel.localAddress()).getPort();
int actualShard = channel.getShardId();
int targetShard = port % channel.getShardingInfo().getShardsCount();
if (actualShard != targetShard) {
LOG.warn(
"[{}] New channel {} connected to shard {}, but shard {} was requested. If this is not transient check your driver AND cluster configuration of shard aware port.",
logPrefix,
channel,
actualShard,
targetShard);
}
}
LOG.debug("[{}] New channel added {}", logPrefix, channel);
if (channels[channel.getShardId()].size() < wantedCount) {
addChannel(channel);
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,52 @@ datastax-java-driver {
# change.
# Overridable in a profile: no
warn-on-init-error = true


advanced-shard-awareness {
# Whether to use advanced shard awareness when trying to open new connections.
#
# Requires passing shard-aware port as contact point (usually 19042 or 19142(ssl)).
# Having this enabled makes sense only for ScyllaDB clusters.
#
# Reduces number of reconnections driver needs to fully initialize connection pool.
# In short it's a feature that allows targeting particular shard when connecting to a node by using specific
# local port number.
# For context see https://www.scylladb.com/2021/04/27/connect-faster-to-scylla-with-a-shard-aware-port/
#
# If set to false the driver will not attempt to use this feature. This means connection's local port
# will be random according to system rules and driver will keep opening connections until it gets right shards.
# In such case non-shard aware port is recommended (by default 9042 or 9142).
# If set to true the driver will attempt to use it and will log warnings each time something
# makes it not possible.
#
# If the node for some reason does not report it's sharding info the driver
# will log a warning and create connection the same way as if this feature was disabled.
# If the cluster ignores the request for specific shard warning will also be logged,
# although the local port will already be chosen according to advanced shard awareness rules.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for connections created after the
# change.
# Overridable in a profile: no
enabled = true

# Inclusive lower bound of port range to use in advanced shard awareness
# The driver will attempt to reserve ports for connection only within the range.
# Required: yes
# Modifiable at runtime: yes, the new value will be used for calls after the
# change.
# Overridable in a profile: no
port-low = 10000

# Inclusive upper bound of port range to use in advanced shard awareness.
# The driver will attempt to reserve ports for connection only within the range.
# Required: yes
# Modifiable at runtime: yes, the new value will be used for calls after the
# change.
# Overridable in a profile: no
port-high = 65535
}
}

# Advanced options for the built-in load-balancing policies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public void should_report_available_ids() {
// When
CompletionStage<DriverChannel> channelFuture =
factory.connect(
SERVER_ADDRESS, DriverChannelOptions.builder().build(), NoopNodeMetricUpdater.INSTANCE);
SERVER_ADDRESS,
null,
null,
DriverChannelOptions.builder().build(),
NoopNodeMetricUpdater.INSTANCE);
completeSimpleChannelInit();

// Then
Expand Down
Loading
Loading