Skip to content

Commit 7796e9c

Browse files
Use tablets in token and shard awareness
Add mechanism to parse system.tablets periodically. In TokenAwarePolicy check if keyspace uses tablets if so try to use them to find replicas. Make shard awareness work when using tablets. Everything is wrapped in experimental setting, because tablets are still experimental in ScyllaDB and changes in the tablets format are possible.
1 parent 7c9df85 commit 7796e9c

File tree

8 files changed

+210
-15
lines changed

8 files changed

+210
-15
lines changed

cassandra/cluster.py

+76-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import weakref
4242
from weakref import WeakValueDictionary
4343

44-
from cassandra import (ConsistencyLevel, AuthenticationFailed,
44+
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
4545
OperationTimedOut, UnsupportedOperation,
4646
SchemaTargetType, DriverException, ProtocolVersion,
4747
UnresolvableContactPoints)
@@ -79,6 +79,7 @@
7979
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
8080
HostTargetingStatement)
8181
from cassandra.marshal import int64_pack
82+
from cassandra.tablets import Tablet, Tablets
8283
from cassandra.timestamps import MonotonicTimestampGenerator
8384
from cassandra.compat import Mapping
8485
from cassandra.util import _resolve_contact_points_to_string_map, Version
@@ -938,6 +939,17 @@ def default_retry_policy(self, policy):
938939
establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.
939940
"""
940941

942+
experimental_tablet_refresh_time = 60
943+
"""
944+
The time at which the system.tablets table is polled periodically.
945+
This field is experimental and may be changed / removed in the future.
946+
"""
947+
948+
experimental_tablet_feature_enabled = False
949+
"""
950+
If true tablet experimental feature is enabled, its interface and use may change.
951+
"""
952+
941953
prepare_on_all_hosts = True
942954
"""
943955
Specifies whether statements should be prepared on all hosts, or just one.
@@ -1124,6 +1136,8 @@ def __init__(self,
11241136
schema_metadata_page_size=1000,
11251137
address_translator=None,
11261138
status_event_refresh_window=2,
1139+
experimental_tablet_refresh_time=60, # This field is experimental and may be changed / removed in the future
1140+
experimental_tablet_feature_enabled=False, # This field is experimental and may be changed / removed in the future
11271141
prepare_on_all_hosts=True,
11281142
reprepare_on_up=True,
11291143
execution_profiles=None,
@@ -1364,6 +1378,8 @@ def __init__(self,
13641378
self.schema_event_refresh_window = schema_event_refresh_window
13651379
self.topology_event_refresh_window = topology_event_refresh_window
13661380
self.status_event_refresh_window = status_event_refresh_window
1381+
self.experimental_tablet_refresh_time = experimental_tablet_refresh_time
1382+
self.experimental_tablet_feature_enabled = experimental_tablet_feature_enabled
13671383
self.connect_timeout = connect_timeout
13681384
self.prepare_on_all_hosts = prepare_on_all_hosts
13691385
self.reprepare_on_up = reprepare_on_up
@@ -1416,7 +1432,7 @@ def __init__(self,
14161432
self.control_connection = ControlConnection(
14171433
self, self.control_connection_timeout,
14181434
self.schema_event_refresh_window, self.topology_event_refresh_window,
1419-
self.status_event_refresh_window,
1435+
self.status_event_refresh_window, self.experimental_tablet_refresh_time,
14201436
schema_metadata_enabled, token_metadata_enabled,
14211437
schema_meta_page_size=schema_metadata_page_size)
14221438

@@ -2381,6 +2397,9 @@ def add_prepared(self, query_id, prepared_statement):
23812397
with self._prepared_statement_lock:
23822398
self._prepared_statements[query_id] = prepared_statement
23832399

2400+
# Experimental, this interface and use may change
2401+
def check_tablets_enabled(self):
2402+
return self.experimental_tablet_feature_enabled
23842403

23852404
class Session(object):
23862405
"""
@@ -3513,6 +3532,8 @@ class ControlConnection(object):
35133532
_SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
35143533
_SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"
35153534

3535+
_SELECT_TABLETS = "SELECT * FROM system.tablets"
3536+
35163537
_MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version("6.0.0")
35173538

35183539
class PeersQueryType(object):
@@ -3527,6 +3548,7 @@ class PeersQueryType(object):
35273548
_schema_event_refresh_window = None
35283549
_topology_event_refresh_window = None
35293550
_status_event_refresh_window = None
3551+
_tablet_refresh_time = 60
35303552

35313553
_schema_meta_enabled = True
35323554
_token_meta_enabled = True
@@ -3541,6 +3563,7 @@ def __init__(self, cluster, timeout,
35413563
schema_event_refresh_window,
35423564
topology_event_refresh_window,
35433565
status_event_refresh_window,
3566+
tablet_refresh_time=60,
35443567
schema_meta_enabled=True,
35453568
token_meta_enabled=True,
35463569
schema_meta_page_size=1000):
@@ -3553,6 +3576,7 @@ def __init__(self, cluster, timeout,
35533576
self._schema_event_refresh_window = schema_event_refresh_window
35543577
self._topology_event_refresh_window = topology_event_refresh_window
35553578
self._status_event_refresh_window = status_event_refresh_window
3579+
self._tablet_refresh_time = tablet_refresh_time
35563580
self._schema_meta_enabled = schema_meta_enabled
35573581
self._token_meta_enabled = token_meta_enabled
35583582
self._schema_meta_page_size = schema_meta_page_size
@@ -3658,6 +3682,18 @@ def _try_connect(self, host):
36583682
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
36593683
}, register_timeout=self._timeout)
36603684

3685+
if self._cluster.check_tablets_enabled():
3686+
try:
3687+
self._refresh_tablets(connection)
3688+
except InvalidRequest:
3689+
self._cluster.experimental_tablet_feature_enabled = False
3690+
# The information about tablets is passed to load_balancing_policy during `populate` that happens
3691+
# before this check, so if there is an error during `_refresh_tablets` we need to change the field
3692+
# in the cluster, but also propagate this information to the load_balancing_policy using `populate`
3693+
self._cluster.load_balancing_policy.populate(self._cluster, self._cluster.metadata.all_hosts())
3694+
else:
3695+
self._cluster.scheduler.schedule_unique(self._tablet_refresh_time, self._refresh_tablets_periodically, connection)
3696+
36613697
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36623698
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
36633699
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
@@ -3964,6 +4000,40 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39644000
log.debug("[control connection] Rebuilding token map due to topology changes")
39654001
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
39664002

4003+
def _refresh_tablets_periodically(self, connection):
4004+
try:
4005+
self._refresh_tablets(connection)
4006+
except ReferenceError:
4007+
return
4008+
except Exception:
4009+
log.debug("[control connection] Error refreshing tablets", exc_info=True)
4010+
self._signal_error()
4011+
self._cluster.scheduler.schedule_unique(self._tablet_refresh_time, self._refresh_tablets, connection)
4012+
4013+
# Experimental, this interface and use may change
4014+
def _refresh_tablets(self, connection):
4015+
sel_tablets = self._SELECT_TABLETS
4016+
tablets_query = QueryMessage(query=sel_tablets, consistency_level=ConsistencyLevel.ONE)
4017+
tablets_result = connection.wait_for_response(
4018+
tablets_query, timeout=self._timeout)
4019+
tablets_result = dict_factory(tablets_result.column_names, tablets_result.parsed_rows)
4020+
4021+
tablets = {}
4022+
for row in tablets_result:
4023+
tablet = Tablet.from_row(row)
4024+
if tablet is None:
4025+
log.warning(
4026+
"Found an invalid row for tablet (%s). Ignoring tablet." %
4027+
_NodeInfo.get_broadcast_rpc_address(row))
4028+
continue
4029+
4030+
keyspace_name = row.get("keyspace_name")
4031+
table_name = row.get("table_name")
4032+
4033+
tablets.setdefault((keyspace_name, table_name), []).append(tablet)
4034+
4035+
self._cluster.metadata._tablets = Tablets(tablets)
4036+
39674037
@staticmethod
39684038
def _is_valid_peer(row):
39694039
return bool(_NodeInfo.get_broadcast_rpc_address(row) and row.get("host_id") and
@@ -4571,7 +4641,10 @@ def _query(self, host, message=None, cb=None):
45714641
connection = None
45724642
try:
45734643
# TODO get connectTimeout from cluster settings
4574-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key if self.query else None)
4644+
if self.query:
4645+
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
4646+
else:
4647+
connection, request_id = pool.borrow_connection(timeout=2.0)
45754648
self._connection = connection
45764649
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
45774650

cassandra/metadata.py

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self):
126126
self._hosts = {}
127127
self._host_id_by_endpoint = {}
128128
self._hosts_lock = RLock()
129+
self._tablets = None
129130

130131
def export_schema_as_string(self):
131132
"""

cassandra/policies.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ class TokenAwarePolicy(LoadBalancingPolicy):
335335

336336
_child_policy = None
337337
_cluster_metadata = None
338+
_experimental_tablets_enabled = False
338339
shuffle_replicas = False
339340
"""
340341
Yield local replicas in a random order.
@@ -346,6 +347,7 @@ def __init__(self, child_policy, shuffle_replicas=False):
346347

347348
def populate(self, cluster, hosts):
348349
self._cluster_metadata = cluster.metadata
350+
self._experimental_tablets_enabled = cluster.check_tablets_enabled()
349351
self._child_policy.populate(cluster, hosts)
350352

351353
def check_supported(self):
@@ -376,7 +378,19 @@ def make_query_plan(self, working_keyspace=None, query=None):
376378
for host in child.make_query_plan(keyspace, query):
377379
yield host
378380
else:
379-
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
381+
replicas = []
382+
if self._experimental_tablets_enabled:
383+
tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(routing_key))
384+
385+
if tablet is not None:
386+
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
387+
child_plan = child.make_query_plan(keyspace, query)
388+
389+
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
390+
391+
if replicas == []:
392+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
393+
380394
if self.shuffle_replicas:
381395
shuffle(replicas)
382396
for replica in replicas:

cassandra/pool.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def __init__(self, host, host_distance, session):
439439

440440
log.debug("Finished initializing connection for host %s", self.host)
441441

442-
def _get_connection_for_routing_key(self, routing_key=None):
442+
def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None):
443443
if self.is_shutdown:
444444
raise ConnectionException(
445445
"Pool for %s is shutdown" % (self.host,), self.host)
@@ -450,7 +450,19 @@ def _get_connection_for_routing_key(self, routing_key=None):
450450
shard_id = None
451451
if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key:
452452
t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key)
453-
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
453+
454+
shard_id = None
455+
if self._session.cluster.check_tablets_enabled() and table is not None:
456+
if keyspace is None:
457+
keyspace = self._keyspace
458+
459+
tablet = self._session.cluster._load_balancing_policy._cluster_metadata._tablets.get_tablet_for_key(keyspace, table, t)
460+
461+
if tablet is not None:
462+
shard_id = tablet.replicas[0][1]
463+
464+
if shard_id is None:
465+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
454466

455467
conn = self._connections.get(shard_id)
456468

@@ -496,15 +508,15 @@ def _get_connection_for_routing_key(self, routing_key=None):
496508
return random.choice(active_connections)
497509
return random.choice(list(self._connections.values()))
498510

499-
def borrow_connection(self, timeout, routing_key=None):
500-
conn = self._get_connection_for_routing_key(routing_key)
511+
def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None):
512+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
501513
start = time.time()
502514
remaining = timeout
503515
last_retry = False
504516
while True:
505517
if conn.is_closed:
506518
# The connection might have been closed in the meantime - if so, try again
507-
conn = self._get_connection_for_routing_key(routing_key)
519+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
508520
with conn.lock:
509521
if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id:
510522
# On last retry we ignore connection status, since it is better to return closed connection than

cassandra/query.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ class Statement(object):
253253
.. versionadded:: 2.1.3
254254
"""
255255

256+
table = None
257+
"""
258+
The string name of the table this query acts on. This is used when the tablet
259+
experimental feature is enabled and in the same time :class`~.TokenAwarePolicy`
260+
is configured in the profile load balancing policy.
261+
"""
262+
256263
custom_payload = None
257264
"""
258265
:ref:`custom_payload` to be passed to the server.
@@ -272,7 +279,7 @@ class Statement(object):
272279

273280
def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
274281
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None,
275-
is_idempotent=False):
282+
is_idempotent=False, table=None):
276283
if retry_policy and not hasattr(retry_policy, 'on_read_timeout'): # just checking one method to detect positional parameter errors
277284
raise ValueError('retry_policy should implement cassandra.policies.RetryPolicy')
278285
if retry_policy is not None:
@@ -286,6 +293,8 @@ def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
286293
self.fetch_size = fetch_size
287294
if keyspace is not None:
288295
self.keyspace = keyspace
296+
if table is not None:
297+
self.table = table
289298
if custom_payload is not None:
290299
self.custom_payload = custom_payload
291300
self.is_idempotent = is_idempotent
@@ -548,6 +557,7 @@ def __init__(self, prepared_statement, retry_policy=None, consistency_level=None
548557
meta = prepared_statement.column_metadata
549558
if meta:
550559
self.keyspace = meta[0].keyspace_name
560+
self.table = meta[0].table_name
551561

552562
Statement.__init__(self, retry_policy, consistency_level, routing_key,
553563
serial_consistency_level, fetch_size, keyspace, custom_payload,

cassandra/tablets.py

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Experimental, this interface and use may change
2+
class Tablet(object):
3+
"""
4+
Represents a single ScyllaDB tablet.
5+
"""
6+
last_token = 0
7+
new_replicas = None
8+
replicas = None
9+
stage = ""
10+
11+
def __init__(self, last_token = 0, new_replicas = None, replicas = None, stage = ""):
12+
self.last_token = last_token
13+
self.new_replicas = new_replicas
14+
self.replicas = replicas
15+
self.stage = stage
16+
17+
# Experimental, this interface and use may change
18+
@staticmethod
19+
def _is_valid_tablet(row):
20+
return row.get("replicas") is not None and len(row.get("replicas")) != 0 and row.get("table_name") is not None and row.get("keyspace_name") is not None
21+
22+
# Experimental, this interface and use may change
23+
@staticmethod
24+
def from_row(row):
25+
if Tablet._is_valid_tablet(row):
26+
tablet = Tablet(row.get("last_token"), row.get("new_replicas"), row.get("replicas"), row.get("stage"))
27+
return tablet
28+
return None
29+
30+
# Experimental, this interface and use may change
31+
class Tablets(object):
32+
_tablets = {}
33+
34+
def __init__(self, tablets):
35+
self._tablets = tablets
36+
37+
# Experimental, this interface and use may change
38+
def get_tablet_for_key(self, keyspace, table, t):
39+
tablet = self._tablets.get((keyspace, table), [])
40+
if tablet == []:
41+
return None
42+
43+
id = bisect_left(tablet, t.value, key = lambda tablet: tablet.last_token)
44+
return tablet[id]
45+
46+
# bisect.bisect_left implementation from Python 3.11, needed untill support for
47+
# Python < 3.10 is dropped, it is needed to use `key` to extract last_token from
48+
# Tablet list - better solution performance-wise than materialize list of last_tokens
49+
def bisect_left(a, x, lo=0, hi=None, *, key=None):
50+
"""Return the index where to insert item x in list a, assuming a is sorted.
51+
52+
The return value i is such that all e in a[:i] have e < x, and all e in
53+
a[i:] have e >= x. So if x already appears in the list, a.insert(i, x) will
54+
insert just before the leftmost x already there.
55+
56+
Optional args lo (default 0) and hi (default len(a)) bound the
57+
slice of a to be searched.
58+
"""
59+
60+
if lo < 0:
61+
raise ValueError('lo must be non-negative')
62+
if hi is None:
63+
hi = len(a)
64+
# Note, the comparison uses "<" to match the
65+
# __lt__() logic in list.sort() and in heapq.
66+
if key is None:
67+
while lo < hi:
68+
mid = (lo + hi) // 2
69+
if a[mid] < x:
70+
lo = mid + 1
71+
else:
72+
hi = mid
73+
else:
74+
while lo < hi:
75+
mid = (lo + hi) // 2
76+
if key(a[mid]) < x:
77+
lo = mid + 1
78+
else:
79+
hi = mid
80+
return lo

0 commit comments

Comments
 (0)