Skip to content

Commit 19a099c

Browse files
authored
Merge pull request #298 from fruch/exponential_backoff
introducing `ExponentialBackoffRetryPolicy`
2 parents 6550f80 + f4eabdf commit 19a099c

File tree

5 files changed

+126
-12
lines changed

5 files changed

+126
-12
lines changed

cassandra/cluster.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -5012,12 +5012,16 @@ def exception_from_response(response):
50125012
return response.to_exception()
50135013
else:
50145014
return response
5015+
if len(retry_decision) == 2:
5016+
retry_type, consistency = retry_decision
5017+
delay = 0
5018+
elif len(retry_decision) == 3:
5019+
retry_type, consistency, delay = retry_decision
50155020

5016-
retry_type, consistency = retry_decision
50175021
if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST):
50185022
self._query_retries += 1
50195023
reuse = retry_type == RetryPolicy.RETRY
5020-
self._retry(reuse, consistency, host)
5024+
self._retry(reuse, consistency, host, delay)
50215025
elif retry_type is RetryPolicy.RETHROW:
50225026
self._set_final_exception(exception_from_response(response))
50235027
else: # IGNORE
@@ -5027,7 +5031,7 @@ def exception_from_response(response):
50275031

50285032
self._errors[host] = exception_from_response(response)
50295033

5030-
def _retry(self, reuse_connection, consistency_level, host):
5034+
def _retry(self, reuse_connection, consistency_level, host, delay):
50315035
if self._final_exception:
50325036
# the connection probably broke while we were waiting
50335037
# to retry the operation
@@ -5039,7 +5043,7 @@ def _retry(self, reuse_connection, consistency_level, host):
50395043
self.message.consistency_level = consistency_level
50405044

50415045
# don't retry on the event loop thread
5042-
self.session.submit(self._retry_task, reuse_connection, host)
5046+
self.session.cluster.scheduler.schedule(delay, self._retry_task, reuse_connection, host)
50435047

50445048
def _retry_task(self, reuse_connection, host):
50455049
if self._final_exception:

cassandra/policies.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import random
1515
from itertools import islice, cycle, groupby, repeat
1616
import logging
1717
from random import randint, shuffle
@@ -1019,6 +1019,57 @@ def on_unavailable(self, query, consistency, required_replicas, alive_replicas,
10191019
return self._pick_consistency(alive_replicas)
10201020

10211021

1022+
class ExponentialBackoffRetryPolicy(RetryPolicy):
1023+
"""
1024+
A policy that do retries with exponential backoff
1025+
"""
1026+
1027+
def __init__(self, max_num_retries: float, min_interval: float = 0.1, max_interval: float = 10.0,
1028+
*args, **kwargs):
1029+
"""
1030+
`max_num_retries` counts how many times the operation would be retried,
1031+
`min_interval` is the initial time in seconds to wait before first retry
1032+
`max_interval` is the maximum time to wait between retries
1033+
"""
1034+
self.min_interval = min_interval
1035+
self.max_num_retries = max_num_retries
1036+
self.max_interval = max_interval
1037+
super(ExponentialBackoffRetryPolicy).__init__(*args, **kwargs)
1038+
1039+
def _calculate_backoff(self, attempt: int):
1040+
delay = min(self.max_interval, self.min_interval * 2 ** attempt)
1041+
# add some jitter
1042+
delay += random.random() * self.min_interval - (self.min_interval / 2)
1043+
return delay
1044+
1045+
def on_read_timeout(self, query, consistency, required_responses,
1046+
received_responses, data_retrieved, retry_num):
1047+
if retry_num < self.max_num_retries and received_responses >= required_responses and not data_retrieved:
1048+
return self.RETRY, consistency, self._calculate_backoff(retry_num)
1049+
else:
1050+
return self.RETHROW, None, None
1051+
1052+
def on_write_timeout(self, query, consistency, write_type,
1053+
required_responses, received_responses, retry_num):
1054+
if retry_num < self.max_num_retries and write_type == WriteType.BATCH_LOG:
1055+
return self.RETRY, consistency, self._calculate_backoff(retry_num)
1056+
else:
1057+
return self.RETHROW, None, None
1058+
1059+
def on_unavailable(self, query, consistency, required_replicas,
1060+
alive_replicas, retry_num):
1061+
if retry_num < self.max_num_retries:
1062+
return self.RETRY_NEXT_HOST, None, self._calculate_backoff(retry_num)
1063+
else:
1064+
return self.RETHROW, None, None
1065+
1066+
def on_request_error(self, query, consistency, error, retry_num):
1067+
if retry_num < self.max_num_retries:
1068+
return self.RETRY_NEXT_HOST, None, self._calculate_backoff(retry_num)
1069+
else:
1070+
return self.RETHROW, None, None
1071+
1072+
10221073
class AddressTranslator(object):
10231074
"""
10241075
Interface for translating cluster-defined endpoints.

tests/integration/standard/test_policies.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT
1818
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, SimpleConvictionPolicy, \
19-
WhiteListRoundRobinPolicy
19+
WhiteListRoundRobinPolicy, ExponentialBackoffRetryPolicy
2020
from cassandra.pool import Host
2121
from cassandra.connection import DefaultEndPoint
2222

@@ -90,3 +90,20 @@ def test_only_connects_to_subset(self):
9090
queried_hosts.update(response.response_future.attempted_hosts)
9191
queried_hosts = set(host.address for host in queried_hosts)
9292
self.assertEqual(queried_hosts, only_connect_hosts)
93+
94+
95+
class ExponentialRetryPolicyTests(unittest.TestCase):
96+
97+
def setUp(self):
98+
self.cluster = TestCluster(default_retry_policy=ExponentialBackoffRetryPolicy(max_num_retries=3))
99+
self.session = self.cluster.connect()
100+
101+
def tearDown(self):
102+
self.cluster.shutdown()
103+
104+
def test_exponential_retries(self):
105+
self.session.execute(
106+
"""
107+
CREATE KEYSPACE preparedtests
108+
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
109+
""")

tests/unit/test_policies.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
RetryPolicy, WriteType,
3333
DowngradingConsistencyRetryPolicy, ConstantReconnectionPolicy,
3434
LoadBalancingPolicy, ConvictionPolicy, ReconnectionPolicy, FallthroughRetryPolicy,
35-
IdentityTranslator, EC2MultiRegionTranslator, HostFilterPolicy)
35+
IdentityTranslator, EC2MultiRegionTranslator, HostFilterPolicy, ExponentialBackoffRetryPolicy)
3636
from cassandra.pool import Host
3737
from cassandra.connection import DefaultEndPoint, UnixSocketEndPoint
3838
from cassandra.query import Statement
@@ -1247,6 +1247,24 @@ def test_unavailable(self):
12471247
self.assertEqual(consistency, ConsistencyLevel.ONE)
12481248

12491249

1250+
class ExponentialRetryPolicyTest(unittest.TestCase):
1251+
def test_calculate_backoff(self):
1252+
policy = ExponentialBackoffRetryPolicy(max_num_retries=2)
1253+
1254+
cases = [
1255+
(0, 0.1),
1256+
(1, 2 * 0.1),
1257+
(2, (2 * 2) * 0.1),
1258+
(3, (2 * 2 * 2) * 0.1),
1259+
]
1260+
1261+
for attempts, delay in cases:
1262+
for i in range(100):
1263+
d = policy._calculate_backoff(attempts)
1264+
assert d > delay - (0.1 / 2), f"d={d} attempts={attempts}, delay={delay}"
1265+
assert d < delay + (0.1 / 2), f"d={d} attempts={attempts}, delay={delay}"
1266+
1267+
12501268
class WhiteListRoundRobinPolicyTest(unittest.TestCase):
12511269

12521270
def test_hosts_with_hostname(self):

tests/unit/test_response_future.py

+29-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE,
3131
RESULT_KIND_SCHEMA_CHANGE, RESULT_KIND_PREPARED,
3232
ProtocolHandler)
33-
from cassandra.policies import RetryPolicy
33+
from cassandra.policies import RetryPolicy, ExponentialBackoffRetryPolicy
3434
from cassandra.pool import NoConnectionsAvailable
3535
from cassandra.query import SimpleStatement
3636

@@ -265,7 +265,7 @@ def test_retry_policy_says_retry(self):
265265
host = Mock()
266266
rf._set_result(host, None, None, result)
267267

268-
session.submit.assert_called_once_with(rf._retry_task, True, host)
268+
rf.session.cluster.scheduler.schedule.assert_called_once_with(ANY, rf._retry_task, True, host)
269269
self.assertEqual(1, rf._query_retries)
270270

271271
connection = Mock(spec=Connection)
@@ -300,7 +300,7 @@ def test_retry_with_different_host(self):
300300
host = Mock()
301301
rf._set_result(host, None, None, result)
302302

303-
session.submit.assert_called_once_with(rf._retry_task, False, host)
303+
rf.session.cluster.scheduler.schedule.assert_called_once_with(ANY, rf._retry_task, False, host)
304304
# query_retries does get incremented for Overloaded/Bootstrapping errors (since 3.18)
305305
self.assertEqual(1, rf._query_retries)
306306

@@ -332,7 +332,8 @@ def test_all_retries_fail(self):
332332
rf._set_result(host, None, None, result)
333333

334334
# simulate the executor running this
335-
session.submit.assert_called_once_with(rf._retry_task, False, host)
335+
rf.session.cluster.scheduler.schedule.assert_called_once_with(ANY, rf._retry_task, False, host)
336+
336337
rf._retry_task(False, host)
337338

338339
# it should try with a different host
@@ -342,11 +343,34 @@ def test_all_retries_fail(self):
342343
rf._set_result(host, None, None, result)
343344

344345
# simulate the executor running this
345-
session.submit.assert_called_with(rf._retry_task, False, host)
346+
rf.session.cluster.scheduler.schedule.assert_called_with(ANY, rf._retry_task, False, host)
346347
rf._retry_task(False, host)
347348

348349
self.assertRaises(NoHostAvailable, rf.result)
349350

351+
def test_exponential_retry_policy_fail(self):
352+
session = self.make_session()
353+
pool = session._pools.get.return_value
354+
connection = Mock(spec=Connection)
355+
pool.borrow_connection.return_value = (connection, 1)
356+
357+
query = SimpleStatement("SELECT * FROM foo")
358+
message = QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE)
359+
rf = ResponseFuture(session, message, query, 1, retry_policy=ExponentialBackoffRetryPolicy(2))
360+
rf.send_request()
361+
rf.session._pools.get.assert_called_once_with('ip1')
362+
363+
result = Mock(spec=IsBootstrappingErrorMessage, info={})
364+
host = Mock()
365+
rf._set_result(host, None, None, result)
366+
367+
# simulate the executor running this
368+
rf.session.cluster.scheduler.schedule.assert_called_once_with(ANY, rf._retry_task, False, host)
369+
370+
delay = rf.session.cluster.scheduler.schedule.mock_calls[-1][1][0]
371+
assert delay > 0.05
372+
rf._retry_task(False, host)
373+
350374
def test_all_pools_shutdown(self):
351375
session = self.make_basic_session()
352376
session.cluster._default_load_balancing_policy.make_query_plan.return_value = ['ip1', 'ip2']

0 commit comments

Comments
 (0)