Skip to content

Commit f9f2610

Browse files
Use tablets in token and shard awareness
Add mechanism to parse system.tablets periodically. In TokenAwarePolicy check if keyspace uses tablets if so try to use them to find replicas. Make shard awareness work when using tablets. Everything is wrapped in experimental setting, because tablets are still experimental in ScyllaDB and changes in the tablets format are possible.
1 parent 7c9df85 commit f9f2610

File tree

6 files changed

+206
-16
lines changed

6 files changed

+206
-16
lines changed

cassandra/cluster.py

+84-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from itertools import groupby, count, chain
2828
import json
2929
import logging
30+
import os
31+
import threading
3032
from warnings import warn
3133
from random import random
3234
import re
@@ -73,7 +75,7 @@
7375
NeverRetryPolicy)
7476
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
7577
HostConnectionPool, HostConnection,
76-
NoConnectionsAvailable)
78+
NoConnectionsAvailable, Tablet)
7779
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
7880
BatchStatement, bind_params, QueryTrace, TraceUnavailable,
7981
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
@@ -139,6 +141,9 @@ def _is_gevent_monkey_patched():
139141
except ImportError:
140142
from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA
141143

144+
# If true tablet experimental feature is enabled, its interface and use may change
145+
EXPERIMENTAL_TABLETS = os.environ.get('EXPERIMENTAL_TABLETS', '').lower() == 'true'
146+
142147
# Forces load of utf8 encoding module to avoid deadlock that occurs
143148
# if code that is being imported tries to import the module in a seperate
144149
# thread.
@@ -938,6 +943,12 @@ def default_retry_policy(self, policy):
938943
establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.
939944
"""
940945

946+
tablet_refresh_time = 60
947+
"""
948+
The time at which the system.tablets table is polled periodically.
949+
This field is experimental and may be changed / removed in the future.
950+
"""
951+
941952
prepare_on_all_hosts = True
942953
"""
943954
Specifies whether statements should be prepared on all hosts, or just one.
@@ -1124,6 +1135,7 @@ def __init__(self,
11241135
schema_metadata_page_size=1000,
11251136
address_translator=None,
11261137
status_event_refresh_window=2,
1138+
tablet_refresh_time=60, # This field is experimental and may be changed / removed in the future
11271139
prepare_on_all_hosts=True,
11281140
reprepare_on_up=True,
11291141
execution_profiles=None,
@@ -1364,6 +1376,7 @@ def __init__(self,
13641376
self.schema_event_refresh_window = schema_event_refresh_window
13651377
self.topology_event_refresh_window = topology_event_refresh_window
13661378
self.status_event_refresh_window = status_event_refresh_window
1379+
self.tablet_refresh_time = tablet_refresh_time
13671380
self.connect_timeout = connect_timeout
13681381
self.prepare_on_all_hosts = prepare_on_all_hosts
13691382
self.reprepare_on_up = reprepare_on_up
@@ -1416,7 +1429,7 @@ def __init__(self,
14161429
self.control_connection = ControlConnection(
14171430
self, self.control_connection_timeout,
14181431
self.schema_event_refresh_window, self.topology_event_refresh_window,
1419-
self.status_event_refresh_window,
1432+
self.status_event_refresh_window, self.tablet_refresh_time,
14201433
schema_metadata_enabled, token_metadata_enabled,
14211434
schema_meta_page_size=schema_metadata_page_size)
14221435

@@ -2381,6 +2394,15 @@ def add_prepared(self, query_id, prepared_statement):
23812394
with self._prepared_statement_lock:
23822395
self._prepared_statements[query_id] = prepared_statement
23832396

2397+
# Experimental, this interface and use may change
2398+
def check_tablets_support(self, connection, timeout):
2399+
try:
2400+
sel_tablets = "SELECT * FROM system.tablets"
2401+
tablets_query = QueryMessage(query=sel_tablets, consistency_level=ConsistencyLevel.ONE)
2402+
connection.wait_for_response(tablets_query, timeout=timeout)
2403+
except:
2404+
return False
2405+
return True
23842406

23852407
class Session(object):
23862408
"""
@@ -3513,6 +3535,8 @@ class ControlConnection(object):
35133535
_SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
35143536
_SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"
35153537

3538+
_SELECT_TABLETS = "SELECT * FROM system.tablets"
3539+
35163540
_MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version("6.0.0")
35173541

35183542
class PeersQueryType(object):
@@ -3527,6 +3551,7 @@ class PeersQueryType(object):
35273551
_schema_event_refresh_window = None
35283552
_topology_event_refresh_window = None
35293553
_status_event_refresh_window = None
3554+
_tablet_refresh_time = 60
35303555

35313556
_schema_meta_enabled = True
35323557
_token_meta_enabled = True
@@ -3541,6 +3566,7 @@ def __init__(self, cluster, timeout,
35413566
schema_event_refresh_window,
35423567
topology_event_refresh_window,
35433568
status_event_refresh_window,
3569+
tablet_refresh_time=60,
35443570
schema_meta_enabled=True,
35453571
token_meta_enabled=True,
35463572
schema_meta_page_size=1000):
@@ -3553,6 +3579,7 @@ def __init__(self, cluster, timeout,
35533579
self._schema_event_refresh_window = schema_event_refresh_window
35543580
self._topology_event_refresh_window = topology_event_refresh_window
35553581
self._status_event_refresh_window = status_event_refresh_window
3582+
self._tablet_refresh_time = tablet_refresh_time
35563583
self._schema_meta_enabled = schema_meta_enabled
35573584
self._token_meta_enabled = token_meta_enabled
35583585
self._schema_meta_page_size = schema_meta_page_size
@@ -3616,7 +3643,7 @@ def _reconnect_internal(self):
36163643
raise DriverException("[control connection] Reconnection in progress during shutdown")
36173644

36183645
raise NoHostAvailable("Unable to connect to any servers", errors)
3619-
3646+
36203647
def _try_connect(self, host):
36213648
"""
36223649
Creates a new Connection, registers for pushed events, and refreshes
@@ -3656,8 +3683,12 @@ def _try_connect(self, host):
36563683
"TOPOLOGY_CHANGE": partial(_watch_callback, self_weakref, '_handle_topology_change'),
36573684
"STATUS_CHANGE": partial(_watch_callback, self_weakref, '_handle_status_change'),
36583685
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
3686+
# "TABLET_CHANGE": partial(_watch_callback, self_weakref, '_handle_tablet_change')
36593687
}, register_timeout=self._timeout)
36603688

3689+
if EXPERIMENTAL_TABLETS and self._cluster.check_tablets_support(connection, self._timeout):
3690+
self._cluster.scheduler.schedule_unique(self._tablet_refresh_time, self._refresh_tablets, connection)
3691+
36613692
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36623693
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
36633694
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
@@ -3679,6 +3710,11 @@ def _try_connect(self, host):
36793710
shared_results = (peers_result, local_result)
36803711
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
36813712
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
3713+
3714+
if EXPERIMENTAL_TABLETS and self._cluster.check_tablets_support(connection, self._timeout):
3715+
tablets_query = QueryMessage(query=self._SELECT_TABLETS, consistency_level=ConsistencyLevel.ONE)
3716+
tablets_result = connection.wait_for_response(tablets_query, timeout=self._timeout)
3717+
self._refresh_tablets(connection, preloaded_result=tablets_result)
36823718
except Exception:
36833719
connection.close()
36843720
raise
@@ -3964,12 +4000,53 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39644000
log.debug("[control connection] Rebuilding token map due to topology changes")
39654001
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
39664002

4003+
# Experimental, this interface and use may change
4004+
def _refresh_tablets(self, connection, preloaded_result=None,
4005+
force_token_rebuild=False):
4006+
if preloaded_result:
4007+
log.debug("[control connection] Refreshing tablet list using preloaded result")
4008+
tablets_result = preloaded_result
4009+
else:
4010+
sel_tablets = self._SELECT_TABLETS
4011+
tablets_query = QueryMessage(query=sel_tablets, consistency_level=ConsistencyLevel.ONE)
4012+
tablets_result = connection.wait_for_response(
4013+
tablets_query, timeout=self._timeout)
4014+
tablets_result = dict_factory(tablets_result.column_names, tablets_result.parsed_rows)
4015+
4016+
tablets = []
4017+
for row in tablets_result:
4018+
if not self._is_valid_tablet(row):
4019+
log.warning(
4020+
"Found an invalid row for tablet (%s). Ignoring tablet." %
4021+
_NodeInfo.get_broadcast_rpc_address(row))
4022+
continue
4023+
4024+
tablet = Tablet()
4025+
tablet.keyspace_name = row.get("keyspace_name")
4026+
tablet.table_name = row.get("table_name")
4027+
tablet.table_id = row.get("table_id")
4028+
tablet.tablet_count = row.get("tablet_count")
4029+
tablet.last_token = row.get("last_token")
4030+
tablet.new_replicas = row.get("new_replicas")
4031+
tablet.replicas = row.get("replicas")
4032+
tablet.stage = row.get("stage")
4033+
4034+
tablets.append(tablet)
4035+
4036+
self._cluster.metadata.set_tablets(tablets)
4037+
return ""
4038+
39674039
@staticmethod
39684040
def _is_valid_peer(row):
39694041
return bool(_NodeInfo.get_broadcast_rpc_address(row) and row.get("host_id") and
39704042
row.get("data_center") and row.get("rack") and
39714043
('tokens' not in row or row.get('tokens')))
39724044

4045+
# Experimental, this interface and use may change
4046+
@staticmethod
4047+
def _is_valid_tablet(row):
4048+
return bool(row.get("replicas") is not None and len(row.get("replicas")) != 0 and row.get("table_name") is not None)
4049+
39734050
def _update_location_info(self, host, datacenter, rack):
39744051
if host.datacenter == datacenter and host.rack == rack:
39754052
return False
@@ -4571,7 +4648,10 @@ def _query(self, host, message=None, cb=None):
45714648
connection = None
45724649
try:
45734650
# TODO get connectTimeout from cluster settings
4574-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key if self.query else None)
4651+
if self.query:
4652+
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
4653+
else:
4654+
connection, request_id = pool.borrow_connection(timeout=2.0)
45754655
self._connection = connection
45764656
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
45774657

cassandra/metadata.py

+18
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ def __init__(self):
126126
self._hosts = {}
127127
self._host_id_by_endpoint = {}
128128
self._hosts_lock = RLock()
129+
self._tablets = []
130+
self._tablets_lock = RLock()
129131

130132
def export_schema_as_string(self):
131133
"""
@@ -391,6 +393,22 @@ def all_hosts_items(self):
391393
with self._hosts_lock:
392394
return list(self._hosts.items())
393395

396+
# Experimental, this interface and use may change
397+
def all_tablets(self):
398+
"""
399+
Returns a list of all known :class`.Tablet` instances in the cluster.
400+
"""
401+
with self._tablets_lock:
402+
return self._tablets
403+
404+
# Experimental, this interface and use may change
405+
def set_tablets(self, tablets):
406+
with self._tablets_lock:
407+
self._tablets = tablets
408+
409+
# Experimental, this interface and use may change
410+
def use_tablets(self):
411+
return self.all_tablets() != []
394412

395413
REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."
396414

cassandra/policies.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from itertools import islice, cycle, groupby, repeat
1616
import logging
17+
import os
1718
from random import randint, shuffle
1819
from threading import Lock
1920
import socket
@@ -376,7 +377,36 @@ def make_query_plan(self, working_keyspace=None, query=None):
376377
for host in child.make_query_plan(keyspace, query):
377378
yield host
378379
else:
379-
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
380+
if self._cluster_metadata.use_tablets():
381+
tablets = self._cluster_metadata.all_tablets()
382+
383+
l = -1
384+
for i, tablet in enumerate(tablets):
385+
if tablet.keyspace_name == keyspace and tablet.table_name == query.table:
386+
l = i
387+
break
388+
389+
if l != -1:
390+
r = l + tablets[l].tablet_count
391+
t = self._cluster_metadata.token_map.token_class.from_key(routing_key)
392+
while l < r:
393+
m = (l + r) // 2
394+
if tablets[m].last_token < t.value:
395+
l = m + 1
396+
else:
397+
r = m
398+
399+
replicas = []
400+
replicas_mapped = set(map(lambda r: r[0], tablets[l].replicas))
401+
child_plan = child.make_query_plan(keyspace, query)
402+
for host in child_plan:
403+
if host.host_id in replicas_mapped:
404+
replicas.append(host)
405+
else:
406+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
407+
else:
408+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
409+
380410
if self.shuffle_replicas:
381411
shuffle(replicas)
382412
for replica in replicas:

cassandra/pool.py

+57-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from concurrent.futures import Future
1919
from functools import total_ordering
2020
import logging
21+
import os
2122
import socket
2223
import time
2324
import random
@@ -252,6 +253,32 @@ def __repr__(self):
252253
dc = (" %s" % (self._datacenter,)) if self._datacenter else ""
253254
return "<%s: %s%s>" % (self.__class__.__name__, self.endpoint, dc)
254255

256+
# Experimental, this interface and use may change
257+
class Tablet(object):
258+
"""
259+
Represents a single ScyllaDB tablet.
260+
"""
261+
keyspace_name = ""
262+
table_id = None
263+
last_token = 0
264+
table_name = ""
265+
tablet_count = 0
266+
new_replicas = None
267+
replicas = None
268+
stage = ""
269+
270+
lock = None
271+
272+
def __init__(self, keyspace_name = "", table_id = None, last_token = 0, table_name = "", tablet_count = 0, new_replicas = None, replicas = None, stage = ""):
273+
self.keyspace_name = keyspace_name
274+
self.table_id = table_id
275+
self.table_name = table_name
276+
self.tablet_count = tablet_count
277+
self.last_token = last_token
278+
self.new_replicas = new_replicas
279+
self.replicas = replicas
280+
self.stage = stage
281+
self.lock = RLock()
255282

256283
class _ReconnectionHandler(object):
257284
"""
@@ -439,7 +466,7 @@ def __init__(self, host, host_distance, session):
439466

440467
log.debug("Finished initializing connection for host %s", self.host)
441468

442-
def _get_connection_for_routing_key(self, routing_key=None):
469+
def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None):
443470
if self.is_shutdown:
444471
raise ConnectionException(
445472
"Pool for %s is shutdown" % (self.host,), self.host)
@@ -450,7 +477,32 @@ def _get_connection_for_routing_key(self, routing_key=None):
450477
shard_id = None
451478
if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key:
452479
t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key)
453-
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
480+
481+
if self._session.cluster._load_balancing_policy._cluster_metadata.use_tablets() and table is not None:
482+
tablets = self._session.cluster._load_balancing_policy._cluster_metadata.all_tablets()
483+
484+
if keyspace is None:
485+
keyspace = self._keyspace
486+
487+
l = -1
488+
for i, tablet in enumerate(tablets):
489+
if tablet.keyspace_name == keyspace and tablet.table_name == table:
490+
l = i
491+
break
492+
493+
if l != -1:
494+
r = l + tablets[l].tablet_count
495+
while l < r:
496+
m = (l + r) // 2
497+
if tablets[m].last_token < t.value:
498+
l = m + 1
499+
else:
500+
r = m
501+
shard_id = tablets[l].replicas[0][1]
502+
else:
503+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
504+
else:
505+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
454506

455507
conn = self._connections.get(shard_id)
456508

@@ -496,15 +548,15 @@ def _get_connection_for_routing_key(self, routing_key=None):
496548
return random.choice(active_connections)
497549
return random.choice(list(self._connections.values()))
498550

499-
def borrow_connection(self, timeout, routing_key=None):
500-
conn = self._get_connection_for_routing_key(routing_key)
551+
def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None):
552+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
501553
start = time.time()
502554
remaining = timeout
503555
last_retry = False
504556
while True:
505557
if conn.is_closed:
506558
# The connection might have been closed in the meantime - if so, try again
507-
conn = self._get_connection_for_routing_key(routing_key)
559+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
508560
with conn.lock:
509561
if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id:
510562
# On last retry we ignore connection status, since it is better to return closed connection than

0 commit comments

Comments
 (0)