|
| 1 | +import logging |
| 2 | +import unittest |
| 3 | +from cassandra import OperationType, RateLimitReached |
| 4 | +from cassandra.cluster import Cluster |
| 5 | +from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy |
| 6 | + |
| 7 | +from tests.integration import PROTOCOL_VERSION, use_cluster |
| 8 | + |
| 9 | +LOGGER = logging.getLogger(__name__) |
| 10 | + |
| 11 | +def setup_module(): |
| 12 | + use_cluster('rate_limit', [3], start=True) |
| 13 | + |
| 14 | +class TestRateLimitExceededException(unittest.TestCase): |
| 15 | + @classmethod |
| 16 | + def setup_class(cls): |
| 17 | + cls.cluster = Cluster(contact_points=["127.0.0.1"], protocol_version=PROTOCOL_VERSION, |
| 18 | + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), |
| 19 | + reconnection_policy=ConstantReconnectionPolicy(1)) |
| 20 | + cls.session = cls.cluster.connect() |
| 21 | + |
| 22 | + @classmethod |
| 23 | + def teardown_class(cls): |
| 24 | + cls.cluster.shutdown() |
| 25 | + |
| 26 | + def test_rate_limit_exceeded(self): |
| 27 | + self.session.execute( |
| 28 | + """ |
| 29 | + DROP KEYSPACE IF EXISTS ratetests |
| 30 | + """ |
| 31 | + ) |
| 32 | + self.session.execute( |
| 33 | + """ |
| 34 | + CREATE KEYSPACE IF NOT EXISTS ratetests |
| 35 | + WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1} |
| 36 | + """) |
| 37 | + |
| 38 | + self.session.execute("USE ratetests") |
| 39 | + self.session.execute( |
| 40 | + """ |
| 41 | + CREATE TABLE tbl (pk int PRIMARY KEY, v int) |
| 42 | + WITH per_partition_rate_limit = {'max_writes_per_second': 1} |
| 43 | + """) |
| 44 | + |
| 45 | + prepared = self.session.prepare( |
| 46 | + """ |
| 47 | + INSERT INTO tbl (pk, v) VALUES (?, ?) |
| 48 | + """) |
| 49 | + |
| 50 | + # The rate limit is 1 write/s, so repeat the same query |
| 51 | + # until an error occurs, it should happen quickly |
| 52 | + def execute_write(): |
| 53 | + for _ in range(1000): |
| 54 | + self.session.execute(prepared.bind((123, 456))) |
| 55 | + |
| 56 | + with self.assertRaises(RateLimitReached) as context: |
| 57 | + execute_write() |
| 58 | + |
| 59 | + self.assertEqual(context.exception.op_type, OperationType.Write) |
0 commit comments