diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
index 7ff3b2719ba..dca7a59d6a2 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
@@ -190,6 +190,20 @@ public enum DefaultDriverOption implements DriverOption {
*/
RETRY_POLICY_CLASS("advanced.retry-policy.class"),
+ // BACKOFF_RETRY_POLICY is a collection of sub-properties
+ BACKOFF_RETRY_POLICY("advanced.backoff-retry-policy"),
+
+ /**
+ * The class of the backoff retry policy.
+ *
+ *
Value-type: {@link String}
+ */
+ BACKOFF_RETRY_POLICY_CLASS("advanced.backoff-retry-policy.class"),
+
+ BACKOFF_RETRY_MAX_BACKOFF_MS("advanced.backoff-retry-policy.max-backoff-ms"),
+ BACKOFF_RETRY_BASE_BACKOFF_MS("advanced.backoff-retry-policy.base-backoff-ms"),
+ BACKOFF_RETRY_JITTER_RATIO("advanced.backoff-retry-policy.jitter-ratio"),
+
// SPECULATIVE_EXECUTION_POLICY is a collection of sub-properties
SPECULATIVE_EXECUTION_POLICY("advanced.speculative-execution-policy"),
/**
@@ -537,7 +551,6 @@ public enum DefaultDriverOption implements DriverOption {
*
Value-type: {@link java.time.Duration Duration}
*/
METRICS_NODE_CQL_MESSAGES_INTERVAL("advanced.metrics.node.cql-messages.refresh-interval"),
-
/**
* Whether or not to disable the Nagle algorithm.
*
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
index 64665709028..7953154509d 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
@@ -281,6 +281,10 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1));
map.put(TypedDriverOption.RECONNECTION_MAX_DELAY, Duration.ofSeconds(60));
map.put(TypedDriverOption.RETRY_POLICY_CLASS, "DefaultRetryPolicy");
+ map.put(TypedDriverOption.BACKOFF_RETRY_POLICY_CLASS, "NoBackoffPolicy");
+ map.put(TypedDriverOption.BACKOFF_RETRY_BASE_BACKOFF_MS, 100);
+ map.put(TypedDriverOption.BACKOFF_RETRY_MAX_BACKOFF_MS, 10000);
+ map.put(TypedDriverOption.BACKOFF_RETRY_JITTER_RATIO, 0.1);
map.put(TypedDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, "NoSpeculativeExecutionPolicy");
map.put(TypedDriverOption.TIMESTAMP_GENERATOR_CLASS, "AtomicTimestampGenerator");
map.put(TypedDriverOption.TIMESTAMP_GENERATOR_DRIFT_WARNING_THRESHOLD, Duration.ofSeconds(1));
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
index b6051749c71..c2e9c12cb6f 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
@@ -199,6 +199,20 @@ public String toString() {
/** The class of the retry policy. */
public static final TypedDriverOption RETRY_POLICY_CLASS =
new TypedDriverOption<>(DefaultDriverOption.RETRY_POLICY_CLASS, GenericType.STRING);
+ /** The class of the retry policy. */
+ public static final TypedDriverOption BACKOFF_RETRY_POLICY_CLASS =
+ new TypedDriverOption<>(DefaultDriverOption.BACKOFF_RETRY_POLICY_CLASS, GenericType.STRING);
+ /** The class of the retry policy. */
+ public static final TypedDriverOption BACKOFF_RETRY_BASE_BACKOFF_MS =
+ new TypedDriverOption<>(
+ DefaultDriverOption.BACKOFF_RETRY_BASE_BACKOFF_MS, GenericType.INTEGER);
+ /** The class of the retry policy. */
+ public static final TypedDriverOption BACKOFF_RETRY_MAX_BACKOFF_MS =
+ new TypedDriverOption<>(
+ DefaultDriverOption.BACKOFF_RETRY_MAX_BACKOFF_MS, GenericType.INTEGER);
+ /** The class of the retry policy. */
+ public static final TypedDriverOption BACKOFF_RETRY_JITTER_RATIO =
+ new TypedDriverOption<>(DefaultDriverOption.BACKOFF_RETRY_JITTER_RATIO, GenericType.DOUBLE);
/** The class of the speculative execution policy. */
public static final TypedDriverOption SPECULATIVE_EXECUTION_POLICY_CLASS =
new TypedDriverOption<>(
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
index 5b32389e362..d6e246361f6 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java
@@ -27,6 +27,7 @@
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
+import com.datastax.oss.driver.api.core.retry.BackoffRetryPolicy;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
@@ -95,6 +96,26 @@ default RetryPolicy getRetryPolicy(@NonNull String profileName) {
return (policy != null) ? policy : getRetryPolicies().get(DriverExecutionProfile.DEFAULT_NAME);
}
+ /**
+ * @return The driver's retry policies, keyed by profile name; the returned map is guaranteed to
+ * never be {@code null} and to always contain an entry for the {@value
+ * DriverExecutionProfile#DEFAULT_NAME} profile.
+ */
+ @NonNull
+ Map getBackoffRetryPolicies();
+
+ /**
+ * @param profileName the profile name; never {@code null}.
+ * @return The driver's retry policy for the given profile; never {@code null}.
+ */
+ @NonNull
+ default BackoffRetryPolicy getBackoffRetryPolicy(@NonNull String profileName) {
+ BackoffRetryPolicy policy = getBackoffRetryPolicies().get(profileName);
+ return (policy != null)
+ ? policy
+ : getBackoffRetryPolicies().get(DriverExecutionProfile.DEFAULT_NAME);
+ }
+
/**
* @return The driver's speculative execution policies, keyed by profile name; the returned map is
* guaranteed to never be {@code null} and to always contain an entry for the {@value
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryPolicy.java
new file mode 100644
index 00000000000..c874f36d199
--- /dev/null
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryPolicy.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.api.core.retry;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
+import com.datastax.oss.driver.api.core.connection.HeartbeatException;
+import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
+import com.datastax.oss.driver.api.core.servererrors.BootstrappingException;
+import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
+import com.datastax.oss.driver.api.core.servererrors.FunctionFailureException;
+import com.datastax.oss.driver.api.core.servererrors.OverloadedException;
+import com.datastax.oss.driver.api.core.servererrors.ProtocolError;
+import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
+import com.datastax.oss.driver.api.core.servererrors.ReadFailureException;
+import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
+import com.datastax.oss.driver.api.core.servererrors.ServerError;
+import com.datastax.oss.driver.api.core.servererrors.TruncateException;
+import com.datastax.oss.driver.api.core.servererrors.WriteFailureException;
+import com.datastax.oss.driver.api.core.servererrors.WriteType;
+import com.datastax.oss.driver.api.core.session.Request;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Defines the behavior to adopt when a request fails.
+ *
+ *
For each request, the driver gets a "query plan" (a list of coordinators to try) from the
+ * {@link LoadBalancingPolicy}, and tries each node in sequence. This policy is invoked if the
+ * request to that node fails.
+ *
+ *
The methods of this interface are invoked on I/O threads, therefore implementations should
+ * never block. In particular, don't call {@link Thread#sleep(long)} to retry after a delay:
+ * this would prevent asynchronous processing of other requests, and very negatively impact
+ * throughput. If the application needs to back off and retry later, this should be implemented in
+ * client code, not in this policy.
+ */
+public interface BackoffRetryPolicy extends AutoCloseable {
+ /**
+ * Whether to retry when the server replied with a {@code READ_TIMEOUT} error; this indicates a
+ * server-side timeout during a read query, i.e. some replicas did not reply to the
+ * coordinator in time.
+ *
+ * @param request the request that timed out.
+ * @param cl the requested consistency level.
+ * @param blockFor the minimum number of replica acknowledgements/responses that were required to
+ * fulfill the operation.
+ * @param received the number of replica that had acknowledged/responded to the operation before
+ * it failed.
+ * @param dataPresent whether the actual data was amongst the received replica responses. See
+ * {@link ReadTimeoutException#wasDataPresent()}.
+ * @param retryCount how many times the retry policy has been invoked already for this request
+ * (not counting the current invocation).
+ */
+ int onReadTimeoutBackoffMs(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ int blockFor,
+ int received,
+ boolean dataPresent,
+ int retryCount,
+ RetryVerdict verdict);
+
+ /**
+ * Whether to retry when the server replied with a {@code WRITE_TIMEOUT} error; this indicates a
+ * server-side timeout during a write query, i.e. some replicas did not reply to the
+ * coordinator in time.
+ *
+ *
Note that this method will only be invoked for {@link Request#isIdempotent()} idempotent}
+ * requests: when a write times out, it is impossible to determine with 100% certainty whether the
+ * mutation was applied or not, so the write is never safe to retry; the driver will rethrow the
+ * error directly, without invoking the retry policy.
+ *
+ * @param request the request that timed out.
+ * @param cl the requested consistency level.
+ * @param writeType the type of the write for which the timeout was raised.
+ * @param blockFor the minimum number of replica acknowledgements/responses that were required to
+ * fulfill the operation.
+ * @param received the number of replica that had acknowledged/responded to the operation before
+ * it failed.
+ * @param retryCount how many times the retry policy has been invoked already for this request
+ * (not counting the current invocation).
+ */
+ int onWriteTimeoutBackoffMs(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ @NonNull WriteType writeType,
+ int blockFor,
+ int received,
+ int retryCount,
+ RetryVerdict verdict);
+
+ /**
+ * Whether to retry when the server replied with an {@code UNAVAILABLE} error; this indicates that
+ * the coordinator determined that there were not enough replicas alive to perform a query with
+ * the requested consistency level.
+ *
+ * @param request the request that timed out.
+ * @param cl the requested consistency level.
+ * @param required the number of replica acknowledgements/responses required to perform the
+ * operation (with its required consistency level).
+ * @param alive the number of replicas that were known to be alive by the coordinator node when it
+ * tried to execute the operation.
+ * @param retryCount how many times the retry policy has been invoked already for this request
+ * (not counting the current invocation).
+ */
+ int onUnavailableBackoffMs(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ int required,
+ int alive,
+ int retryCount,
+ RetryVerdict verdict);
+
+ /**
+ * Whether to retry when a request was aborted before we could get a response from the server.
+ *
+ *
This can happen in two cases: if the connection was closed due to an external event (this
+ * will manifest as a {@link ClosedConnectionException}, or {@link HeartbeatException} for a
+ * heartbeat failure); or if there was an unexpected error while decoding the response (this can
+ * only be a driver bug).
+ *
+ *
Note that this method will only be invoked for {@linkplain Request#isIdempotent()
+ * idempotent} requests: when execution was aborted before getting a response, it is impossible to
+ * determine with 100% certainty whether a mutation was applied or not, so a write is never safe
+ * to retry; the driver will rethrow the error directly, without invoking the retry policy.
+ *
+ * @param request the request that was aborted.
+ * @param error the error.
+ * @param retryCount how many times the retry policy has been invoked already for this request
+ * (not counting the current invocation).
+ */
+ int onRequestAbortedBackoffMs(
+ @NonNull Request request, @NonNull Throwable error, int retryCount, RetryVerdict verdict);
+
+ /**
+ * Whether to retry when the server replied with a recoverable error (other than {@code
+ * READ_TIMEOUT}, {@code WRITE_TIMEOUT} or {@code UNAVAILABLE}).
+ *
+ *
This can happen for the following errors: {@link OverloadedException}, {@link ServerError},
+ * {@link TruncateException}, {@link ReadFailureException}, {@link WriteFailureException}.
+ *
+ *
The following errors are handled internally by the driver, and therefore will never
+ * be encountered in this method:
+ *
+ *
+ *
{@link BootstrappingException}: always retried on the next node;
+ *
{@link QueryValidationException} (and its subclasses), {@link FunctionFailureException}
+ * and {@link ProtocolError}: always rethrown.
+ *
+ *
+ *
Note that this method will only be invoked for {@link Request#isIdempotent()} idempotent}
+ * requests: when execution was aborted before getting a response, it is impossible to determine
+ * with 100% certainty whether a mutation was applied or not, so a write is never safe to retry;
+ * the driver will rethrow the error directly, without invoking the retry policy.
+ *
+ * @param request the request that failed.
+ * @param error the error.
+ * @param retryCount how many times the retry policy has been invoked already for this request
+ * (not counting the current invocation).
+ */
+ int onErrorResponseBackoff(
+ @NonNull Request request,
+ @NonNull CoordinatorException error,
+ int retryCount,
+ RetryVerdict verdict);
+
+ /** Called when the cluster that this policy is associated with closes. */
+ @Override
+ void close();
+}
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryVerdict.java b/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryVerdict.java
new file mode 100644
index 00000000000..f95688d0319
--- /dev/null
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/retry/BackoffRetryVerdict.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.driver.api.core.retry;
+
+import java.time.Duration;
+
+/**
+ * The verdict returned by a {@link RetryPolicy} determining what to do when a request failed. A
+ * verdict contains a {@link RetryDecision} indicating if a retry should be attempted at all and
+ * where, with what delay, and a method that allows the original request to be modified before the
+ * retry.
+ */
+public interface BackoffRetryVerdict extends RetryVerdict {
+
+ /** @return a delay that request needs to take before retrying. */
+ Duration getRetryBackoff();
+}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
index dd59a76f7d8..53d49b228e9 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java
@@ -38,6 +38,7 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
+import com.datastax.oss.driver.api.core.retry.BackoffRetryPolicy;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
@@ -150,6 +151,8 @@ public class DefaultDriverContext implements InternalDriverContext {
new LazyReference<>("reconnectionPolicy", this::buildReconnectionPolicy, cycleDetector);
private final LazyReference