Skip to content

Commit 948c443

Browse files
Use tablets in token and shard awareness
Add mechanism to parse system.tablets periodically. In TokenAwarePolicy check if keyspace uses tablets if so try to use them to find replicas. Make shard awareness work when using tablets. Everything is wrapped in experimental setting, because tablets are still experimental in ScyllaDB and changes in the tablets format are possible.
1 parent bc5cf17 commit 948c443

File tree

8 files changed

+196
-17
lines changed

8 files changed

+196
-17
lines changed

cassandra/cluster.py

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import weakref
4242
from weakref import WeakValueDictionary
4343

44-
from cassandra import (ConsistencyLevel, AuthenticationFailed,
44+
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
4545
OperationTimedOut, UnsupportedOperation,
4646
SchemaTargetType, DriverException, ProtocolVersion,
4747
UnresolvableContactPoints)
@@ -51,6 +51,7 @@
5151
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
5252
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
5353
from cassandra.cqltypes import UserType
54+
import cassandra.cqltypes as types
5455
from cassandra.encoder import Encoder
5556
from cassandra.protocol import (QueryMessage, ResultMessage,
5657
ErrorMessage, ReadTimeoutErrorMessage,
@@ -79,6 +80,7 @@
7980
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
8081
HostTargetingStatement)
8182
from cassandra.marshal import int64_pack
83+
from cassandra.tablets import Tablet, Tablets
8284
from cassandra.timestamps import MonotonicTimestampGenerator
8385
from cassandra.compat import Mapping
8486
from cassandra.util import _resolve_contact_points_to_string_map, Version
@@ -938,6 +940,11 @@ def default_retry_policy(self, policy):
938940
establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.
939941
"""
940942

943+
experimental_tablet_feature_enabled = False
944+
"""
945+
If true tablet experimental feature is enabled, its interface and use may change.
946+
"""
947+
941948
prepare_on_all_hosts = True
942949
"""
943950
Specifies whether statements should be prepared on all hosts, or just one.
@@ -1124,6 +1131,7 @@ def __init__(self,
11241131
schema_metadata_page_size=1000,
11251132
address_translator=None,
11261133
status_event_refresh_window=2,
1134+
experimental_tablet_feature_enabled=False, # This field is experimental and may be changed / removed in the future
11271135
prepare_on_all_hosts=True,
11281136
reprepare_on_up=True,
11291137
execution_profiles=None,
@@ -1341,6 +1349,7 @@ def __init__(self,
13411349
self.schema_event_refresh_window = schema_event_refresh_window
13421350
self.topology_event_refresh_window = topology_event_refresh_window
13431351
self.status_event_refresh_window = status_event_refresh_window
1352+
self.experimental_tablet_feature_enabled = experimental_tablet_feature_enabled
13441353
self.connect_timeout = connect_timeout
13451354
self.prepare_on_all_hosts = prepare_on_all_hosts
13461355
self.reprepare_on_up = reprepare_on_up
@@ -1393,9 +1402,8 @@ def __init__(self,
13931402
self.control_connection = ControlConnection(
13941403
self, self.control_connection_timeout,
13951404
self.schema_event_refresh_window, self.topology_event_refresh_window,
1396-
self.status_event_refresh_window,
1397-
schema_metadata_enabled, token_metadata_enabled,
1398-
schema_meta_page_size=schema_metadata_page_size)
1405+
self.status_event_refresh_window, schema_metadata_enabled,
1406+
token_metadata_enabled, schema_meta_page_size=schema_metadata_page_size)
13991407

14001408
if client_id is None:
14011409
self.client_id = uuid.uuid4()
@@ -2389,6 +2397,9 @@ def add_prepared(self, query_id, prepared_statement):
23892397
with self._prepared_statement_lock:
23902398
self._prepared_statements[query_id] = prepared_statement
23912399

2400+
# Experimental, this interface and use may change
2401+
def check_tablets_enabled(self):
2402+
return self.experimental_tablet_feature_enabled
23922403

23932404
class Session(object):
23942405
"""
@@ -3687,6 +3698,9 @@ def _try_connect(self, host):
36873698
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
36883699
}, register_timeout=self._timeout)
36893700

3701+
if self._cluster.check_tablets_enabled():
3702+
self._init_tablets()
3703+
36903704
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36913705
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
36923706
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
@@ -3993,6 +4007,34 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39934007
log.debug("[control connection] Rebuilding token map due to topology changes")
39944008
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
39954009

4010+
# Experimental, this interface and use may change
4011+
def _init_tablets(self):
4012+
self._cluster.metadata._tablets = Tablets({})
4013+
4014+
# Experimental, this interface and use may change
4015+
def add_tablet(self, keyspace, table, tablet):
4016+
tablets = self._cluster.metadata._tablets._tablets
4017+
tablets_for_table = tablets.setdefault((keyspace, table), [])
4018+
start = None
4019+
end = len(tablets_for_table) - 1
4020+
for id, t in enumerate(tablets_for_table):
4021+
if t.last_token > tablet.first_token and t.first_token < tablet.last_token:
4022+
if start == None:
4023+
start = id
4024+
end = id
4025+
if start != None:
4026+
del tablets_for_table[start:end + 1]
4027+
4028+
insert_idx = len(tablets_for_table)
4029+
for id, t in enumerate(tablets_for_table):
4030+
if t.first_token > tablet.first_token:
4031+
insert_idx = id
4032+
break
4033+
tablets_for_table.insert(insert_idx, tablet)
4034+
4035+
tablets[(keyspace, table)] = tablets_for_table
4036+
self._cluster.metadata._tablets = Tablets(tablets)
4037+
39964038
@staticmethod
39974039
def _is_valid_peer(row):
39984040
return bool(_NodeInfo.get_broadcast_rpc_address(row) and row.get("host_id") and
@@ -4600,7 +4642,10 @@ def _query(self, host, message=None, cb=None):
46004642
connection = None
46014643
try:
46024644
# TODO get connectTimeout from cluster settings
4603-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key if self.query else None)
4645+
if self.query:
4646+
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
4647+
else:
4648+
connection, request_id = pool.borrow_connection(timeout=2.0)
46044649
self._connection = connection
46054650
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
46064651

@@ -4719,6 +4764,17 @@ def _set_result(self, host, connection, pool, response):
47194764
self._warnings = getattr(response, 'warnings', None)
47204765
self._custom_payload = getattr(response, 'custom_payload', None)
47214766

4767+
if self._custom_payload and self.session.cluster.check_tablets_enabled():
4768+
ctype = types.lookup_casstype('ListType(TupleType(UUIDType, Int32Type))')
4769+
tablet_replicas = ctype.from_binary(self._custom_payload.get('tablet_replicas'), 3)
4770+
ctype_token = types.lookup_casstype('LongType')
4771+
first_token = ctype_token.from_binary(self._custom_payload.get('token_range')[:8], 3)
4772+
last_token = ctype_token.from_binary(self._custom_payload.get('token_range')[8:], 3)
4773+
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
4774+
keyspace = self.query.keyspace
4775+
table = self.query.table
4776+
self.session.cluster.control_connection.add_tablet(keyspace, table, tablet)
4777+
47224778
if isinstance(response, ResultMessage):
47234779
if response.kind == RESULT_KIND_SET_KEYSPACE:
47244780
session = getattr(self, 'session', None)

cassandra/metadata.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self):
126126
self._hosts = {}
127127
self._host_id_by_endpoint = {}
128128
self._hosts_lock = RLock()
129+
self._tablets = None
129130

130131
def export_schema_as_string(self):
131132
"""

cassandra/policies.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ class TokenAwarePolicy(LoadBalancingPolicy):
335335

336336
_child_policy = None
337337
_cluster_metadata = None
338+
_experimental_tablets_enabled = False
338339
shuffle_replicas = False
339340
"""
340341
Yield local replicas in a random order.
@@ -346,6 +347,7 @@ def __init__(self, child_policy, shuffle_replicas=False):
346347

347348
def populate(self, cluster, hosts):
348349
self._cluster_metadata = cluster.metadata
350+
self._experimental_tablets_enabled = cluster.check_tablets_enabled()
349351
self._child_policy.populate(cluster, hosts)
350352

351353
def check_supported(self):
@@ -376,7 +378,19 @@ def make_query_plan(self, working_keyspace=None, query=None):
376378
for host in child.make_query_plan(keyspace, query):
377379
yield host
378380
else:
379-
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
381+
replicas = []
382+
if self._experimental_tablets_enabled:
383+
tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(routing_key))
384+
385+
if tablet is not None:
386+
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
387+
child_plan = child.make_query_plan(keyspace, query)
388+
389+
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
390+
391+
if replicas == []:
392+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
393+
380394
if self.shuffle_replicas:
381395
shuffle(replicas)
382396
for replica in replicas:

cassandra/pool.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def __init__(self, host, host_distance, session):
439439

440440
log.debug("Finished initializing connection for host %s", self.host)
441441

442-
def _get_connection_for_routing_key(self, routing_key=None):
442+
def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None):
443443
if self.is_shutdown:
444444
raise ConnectionException(
445445
"Pool for %s is shutdown" % (self.host,), self.host)
@@ -450,7 +450,23 @@ def _get_connection_for_routing_key(self, routing_key=None):
450450
shard_id = None
451451
if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key:
452452
t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key)
453-
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
453+
454+
shard_id = None
455+
if self._session.cluster.check_tablets_enabled() and table is not None:
456+
if keyspace is None:
457+
keyspace = self._keyspace
458+
459+
tablet = self._session.cluster._load_balancing_policy._cluster_metadata._tablets.get_tablet_for_key(keyspace, table, t)
460+
461+
if tablet is not None:
462+
for replica in tablet.replicas:
463+
log.info(replica)
464+
if replica[0] == self.host.host_id:
465+
shard_id = replica[1]
466+
break
467+
468+
if shard_id is None:
469+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
454470

455471
conn = self._connections.get(shard_id)
456472

@@ -496,15 +512,15 @@ def _get_connection_for_routing_key(self, routing_key=None):
496512
return random.choice(active_connections)
497513
return random.choice(list(self._connections.values()))
498514

499-
def borrow_connection(self, timeout, routing_key=None):
500-
conn = self._get_connection_for_routing_key(routing_key)
515+
def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None):
516+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
501517
start = time.time()
502518
remaining = timeout
503519
last_retry = False
504520
while True:
505521
if conn.is_closed:
506522
# The connection might have been closed in the meantime - if so, try again
507-
conn = self._get_connection_for_routing_key(routing_key)
523+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
508524
with conn.lock:
509525
if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id:
510526
# On last retry we ignore connection status, since it is better to return closed connection than

cassandra/query.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ class Statement(object):
253253
.. versionadded:: 2.1.3
254254
"""
255255

256+
table = None
257+
"""
258+
The string name of the table this query acts on. This is used when the tablet
259+
experimental feature is enabled and in the same time :class`~.TokenAwarePolicy`
260+
is configured in the profile load balancing policy.
261+
"""
262+
256263
custom_payload = None
257264
"""
258265
:ref:`custom_payload` to be passed to the server.
@@ -272,7 +279,7 @@ class Statement(object):
272279

273280
def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
274281
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None,
275-
is_idempotent=False):
282+
is_idempotent=False, table=None):
276283
if retry_policy and not hasattr(retry_policy, 'on_read_timeout'): # just checking one method to detect positional parameter errors
277284
raise ValueError('retry_policy should implement cassandra.policies.RetryPolicy')
278285
if retry_policy is not None:
@@ -286,6 +293,8 @@ def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
286293
self.fetch_size = fetch_size
287294
if keyspace is not None:
288295
self.keyspace = keyspace
296+
if table is not None:
297+
self.table = table
289298
if custom_payload is not None:
290299
self.custom_payload = custom_payload
291300
self.is_idempotent = is_idempotent
@@ -548,6 +557,7 @@ def __init__(self, prepared_statement, retry_policy=None, consistency_level=None
548557
meta = prepared_statement.column_metadata
549558
if meta:
550559
self.keyspace = meta[0].keyspace_name
560+
self.table = meta[0].table_name
551561

552562
Statement.__init__(self, retry_policy, consistency_level, routing_key,
553563
serial_consistency_level, fetch_size, keyspace, custom_payload,

cassandra/tablets.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Experimental, this interface and use may change
2+
class Tablet(object):
3+
"""
4+
Represents a single ScyllaDB tablet.
5+
"""
6+
first_token = 0
7+
last_token = 0
8+
replicas = None
9+
10+
def __init__(self, first_token = 0, last_token = 0, replicas = None):
11+
self.first_token = first_token
12+
self.last_token = last_token
13+
self.replicas = replicas
14+
15+
@staticmethod
16+
def _is_valid_tablet(replicas):
17+
return replicas is not None and len(replicas) != 0
18+
19+
@staticmethod
20+
def from_row(first_token, last_token, replicas):
21+
if Tablet._is_valid_tablet(replicas):
22+
tablet = Tablet(first_token, last_token,replicas)
23+
return tablet
24+
return None
25+
26+
# Experimental, this interface and use may change
27+
class Tablets(object):
28+
_tablets = {}
29+
30+
def __init__(self, tablets):
31+
self._tablets = tablets
32+
33+
def get_tablet_for_key(self, keyspace, table, t):
34+
tablet = self._tablets.get((keyspace, table), [])
35+
if tablet == []:
36+
return None
37+
38+
id = bisect_left(tablet, t.value, key = lambda tablet: tablet.last_token)
39+
if id < len(tablet) and t.value > tablet[id].first_token:
40+
return tablet[id]
41+
return None
42+
43+
# bisect.bisect_left implementation from Python 3.11, needed untill support for
44+
# Python < 3.10 is dropped, it is needed to use `key` to extract last_token from
45+
# Tablet list - better solution performance-wise than materialize list of last_tokens
46+
def bisect_left(a, x, lo=0, hi=None, *, key=None):
47+
"""Return the index where to insert item x in list a, assuming a is sorted.
48+
49+
The return value i is such that all e in a[:i] have e < x, and all e in
50+
a[i:] have e >= x. So if x already appears in the list, a.insert(i, x) will
51+
insert just before the leftmost x already there.
52+
53+
Optional args lo (default 0) and hi (default len(a)) bound the
54+
slice of a to be searched.
55+
"""
56+
57+
if lo < 0:
58+
raise ValueError('lo must be non-negative')
59+
if hi is None:
60+
hi = len(a)
61+
# Note, the comparison uses "<" to match the
62+
# __lt__() logic in list.sort() and in heapq.
63+
if key is None:
64+
while lo < hi:
65+
mid = (lo + hi) // 2
66+
if a[mid] < x:
67+
lo = mid + 1
68+
else:
69+
hi = mid
70+
else:
71+
while lo < hi:
72+
mid = (lo + hi) // 2
73+
if key(a[mid]) < x:
74+
lo = mid + 1
75+
else:
76+
hi = mid
77+
return lo

0 commit comments

Comments
 (0)