diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0f973964694..ed6b4dd5421 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -51,6 +51,7 @@ import com.datastax.oss.driver.internal.core.metadata.DefaultNode; import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; +import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; @@ -108,6 +109,7 @@ public class CqlRequestHandler implements Throttled { protected final CompletableFuture result; private final Message message; private final Timer timer; + private Boolean retryFailedSend = true; /** * How many speculative executions are currently running (including the initial execution). We * track this in order to know when to fail the request if all executions have reached the end of @@ -505,17 +507,28 @@ public void operationComplete(Future future) throws Exception { setFinalError(error.getCause(), node, execution); } else { LOG.trace( - "[{}] Failed to send request on {}, trying next node (cause: {})", + "[{}] Failed to send request on {}, trying next node (cause: {}), queryPlan: {}, retryFailedSend: {}", logPrefix, channel, - error); + error, + queryPlan, + retryFailedSend); + Node retriedNode = null; + if (queryPlan.size() == 0 && retryFailedSend) { + retryFailedSend = false; + // No nodes left, but checking if any connections still available + ChannelPool p = session.getPools().get(node); + if (p != null && p.size() > 0) { + LOG.trace("Trying node {} one more time since it still has few connections", node); + retriedNode = node; + } + } recordError(node, error); trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); ((DefaultNode) node) .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); - sendRequest( - null, queryPlan, execution, retryCount, scheduleNextExecution); // try next node + sendRequest(retriedNode, queryPlan, execution, retryCount, scheduleNextExecution); } } else { LOG.trace("[{}] Request sent on {}", logPrefix, channel); diff --git a/core/src/main/java/com/yugabyte/oss/driver/internal/core/loadbalancing/PartitionAwarePolicy.java b/core/src/main/java/com/yugabyte/oss/driver/internal/core/loadbalancing/PartitionAwarePolicy.java index 4b17b1c04af..6e24c4915a8 100644 --- a/core/src/main/java/com/yugabyte/oss/driver/internal/core/loadbalancing/PartitionAwarePolicy.java +++ b/core/src/main/java/com/yugabyte/oss/driver/internal/core/loadbalancing/PartitionAwarePolicy.java @@ -383,20 +383,21 @@ private static void AppendValueToChannel( channel.write(value); break; } - case ProtocolConstants.DataType.LIST: { - ListType listType = (ListType) type; - DataType dataTypeOfListValue = listType.getElementType(); - int length = value.getInt(); - for (int j = 0; j < length; j++) { - // Appending each element. - int size = value.getInt(); - ByteBuffer buf = value.slice(); - buf.limit(size); - AppendValueToChannel(dataTypeOfListValue, buf, channel); - value.position(value.position() + size); + case ProtocolConstants.DataType.LIST: + { + ListType listType = (ListType) type; + DataType dataTypeOfListValue = listType.getElementType(); + int length = value.getInt(); + for (int j = 0; j < length; j++) { + // Appending each element. + int size = value.getInt(); + ByteBuffer buf = value.slice(); + buf.limit(size); + AppendValueToChannel(dataTypeOfListValue, buf, channel); + value.position(value.position() + size); + } + break; } - break; - } case ProtocolConstants.DataType.SET: { SetType setType = (SetType) type;