Skip to content

Commit 67c34b8

Browse files
Use tablets in token and shard awareness
Add mechanism to parse system.tablets periodically. In TokenAwarePolicy check if keyspace uses tablets if so try to use them to find replicas. Make shard awareness work when using tablets. Everything is wrapped in experimental setting, because tablets are still experimental in ScyllaDB and changes in the tablets format are possible.
1 parent 7c9df85 commit 67c34b8

File tree

8 files changed

+204
-15
lines changed

8 files changed

+204
-15
lines changed

cassandra/cluster.py

+96-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from itertools import groupby, count, chain
2828
import json
2929
import logging
30+
import os
31+
import threading
3032
from warnings import warn
3133
from random import random
3234
import re
@@ -79,6 +81,7 @@
7981
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
8082
HostTargetingStatement)
8183
from cassandra.marshal import int64_pack
84+
from cassandra.tablets import Tablet, Tablets
8285
from cassandra.timestamps import MonotonicTimestampGenerator
8386
from cassandra.compat import Mapping
8487
from cassandra.util import _resolve_contact_points_to_string_map, Version
@@ -938,6 +941,17 @@ def default_retry_policy(self, policy):
938941
establish connection pools. This can cause a rush of connections and queries if not mitigated with this factor.
939942
"""
940943

944+
experimental_tablet_refresh_time = 60
945+
"""
946+
The time at which the system.tablets table is polled periodically.
947+
This field is experimental and may be changed / removed in the future.
948+
"""
949+
950+
experimental_tablet_feature_enabled = False
951+
"""
952+
If true tablet experimental feature is enabled, its interface and use may change.
953+
"""
954+
941955
prepare_on_all_hosts = True
942956
"""
943957
Specifies whether statements should be prepared on all hosts, or just one.
@@ -1124,6 +1138,8 @@ def __init__(self,
11241138
schema_metadata_page_size=1000,
11251139
address_translator=None,
11261140
status_event_refresh_window=2,
1141+
experimental_tablet_refresh_time=60, # This field is experimental and may be changed / removed in the future
1142+
experimental_tablet_feature_enabled=False, # This field is experimental and may be changed / removed in the future
11271143
prepare_on_all_hosts=True,
11281144
reprepare_on_up=True,
11291145
execution_profiles=None,
@@ -1364,6 +1380,8 @@ def __init__(self,
13641380
self.schema_event_refresh_window = schema_event_refresh_window
13651381
self.topology_event_refresh_window = topology_event_refresh_window
13661382
self.status_event_refresh_window = status_event_refresh_window
1383+
self.experimental_tablet_refresh_time = experimental_tablet_refresh_time
1384+
self.experimental_tablet_feature_enabled = experimental_tablet_feature_enabled
13671385
self.connect_timeout = connect_timeout
13681386
self.prepare_on_all_hosts = prepare_on_all_hosts
13691387
self.reprepare_on_up = reprepare_on_up
@@ -1416,7 +1434,7 @@ def __init__(self,
14161434
self.control_connection = ControlConnection(
14171435
self, self.control_connection_timeout,
14181436
self.schema_event_refresh_window, self.topology_event_refresh_window,
1419-
self.status_event_refresh_window,
1437+
self.status_event_refresh_window, self.experimental_tablet_refresh_time,
14201438
schema_metadata_enabled, token_metadata_enabled,
14211439
schema_meta_page_size=schema_metadata_page_size)
14221440

@@ -2381,6 +2399,19 @@ def add_prepared(self, query_id, prepared_statement):
23812399
with self._prepared_statement_lock:
23822400
self._prepared_statements[query_id] = prepared_statement
23832401

2402+
# Experimental, this interface and use may change
2403+
def check_tablets_support(self, connection, timeout):
2404+
try:
2405+
sel_tablets = ControlConnection._SELECT_TABLETS
2406+
tablets_query = QueryMessage(query=sel_tablets, consistency_level=ConsistencyLevel.ONE)
2407+
connection.wait_for_response(tablets_query, timeout=timeout)
2408+
except:
2409+
return False
2410+
return True
2411+
2412+
# Experimental, this interface and use may change
2413+
def check_tablets_enabled(self):
2414+
return self.experimental_tablet_feature_enabled
23842415

23852416
class Session(object):
23862417
"""
@@ -3513,6 +3544,8 @@ class ControlConnection(object):
35133544
_SELECT_PEERS_NO_TOKENS_V2 = "SELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2"
35143545
_SELECT_SCHEMA_PEERS_V2 = "SELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2"
35153546

3547+
_SELECT_TABLETS = "SELECT * FROM system.tablets"
3548+
35163549
_MINIMUM_NATIVE_ADDRESS_DSE_VERSION = Version("6.0.0")
35173550

35183551
class PeersQueryType(object):
@@ -3527,6 +3560,7 @@ class PeersQueryType(object):
35273560
_schema_event_refresh_window = None
35283561
_topology_event_refresh_window = None
35293562
_status_event_refresh_window = None
3563+
_tablet_refresh_time = 60
35303564

35313565
_schema_meta_enabled = True
35323566
_token_meta_enabled = True
@@ -3541,6 +3575,7 @@ def __init__(self, cluster, timeout,
35413575
schema_event_refresh_window,
35423576
topology_event_refresh_window,
35433577
status_event_refresh_window,
3578+
tablet_refresh_time=60,
35443579
schema_meta_enabled=True,
35453580
token_meta_enabled=True,
35463581
schema_meta_page_size=1000):
@@ -3553,6 +3588,7 @@ def __init__(self, cluster, timeout,
35533588
self._schema_event_refresh_window = schema_event_refresh_window
35543589
self._topology_event_refresh_window = topology_event_refresh_window
35553590
self._status_event_refresh_window = status_event_refresh_window
3591+
self._tablet_refresh_time = tablet_refresh_time
35563592
self._schema_meta_enabled = schema_meta_enabled
35573593
self._token_meta_enabled = token_meta_enabled
35583594
self._schema_meta_page_size = schema_meta_page_size
@@ -3658,6 +3694,9 @@ def _try_connect(self, host):
36583694
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
36593695
}, register_timeout=self._timeout)
36603696

3697+
if self._cluster.check_tablets_enabled() and self._cluster.check_tablets_support(connection, self._timeout):
3698+
self._cluster.scheduler.schedule_unique(self._tablet_refresh_time, self._refresh_tablets, connection)
3699+
36613700
sel_peers = self._get_peers_query(self.PeersQueryType.PEERS, connection)
36623701
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
36633702
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
@@ -3679,6 +3718,11 @@ def _try_connect(self, host):
36793718
shared_results = (peers_result, local_result)
36803719
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
36813720
self._refresh_schema(connection, preloaded_results=shared_results, schema_agreement_wait=-1)
3721+
3722+
if self._cluster.check_tablets_enabled() and self._cluster.check_tablets_support(connection, self._timeout):
3723+
tablets_query = QueryMessage(query=self._SELECT_TABLETS, consistency_level=ConsistencyLevel.ONE)
3724+
tablets_result = connection.wait_for_response(tablets_query, timeout=self._timeout)
3725+
self._refresh_tablets(connection, preloaded_result=tablets_result)
36823726
except Exception:
36833727
connection.close()
36843728
raise
@@ -3964,12 +4008,59 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
39644008
log.debug("[control connection] Rebuilding token map due to topology changes")
39654009
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
39664010

4011+
# Experimental, this interface and use may change
4012+
def _refresh_tablets(self, connection, preloaded_result=None,
4013+
force_token_rebuild=False):
4014+
if preloaded_result:
4015+
log.debug("[control connection] Refreshing tablet list using preloaded result")
4016+
tablets_result = preloaded_result
4017+
else:
4018+
sel_tablets = self._SELECT_TABLETS
4019+
tablets_query = QueryMessage(query=sel_tablets, consistency_level=ConsistencyLevel.ONE)
4020+
tablets_result = connection.wait_for_response(
4021+
tablets_query, timeout=self._timeout)
4022+
tablets_result = dict_factory(tablets_result.column_names, tablets_result.parsed_rows)
4023+
4024+
tablets = {}
4025+
for row in tablets_result:
4026+
if not self._is_valid_tablet(row):
4027+
log.warning(
4028+
"Found an invalid row for tablet (%s). Ignoring tablet." %
4029+
_NodeInfo.get_broadcast_rpc_address(row))
4030+
continue
4031+
4032+
keyspace_name = row.get("keyspace_name")
4033+
table_name = row.get("table_name")
4034+
4035+
tablet = Tablet()
4036+
tablet.table_id = row.get("table_id")
4037+
tablet.tablet_count = row.get("tablet_count")
4038+
tablet.last_token = row.get("last_token")
4039+
tablet.new_replicas = row.get("new_replicas")
4040+
tablet.replicas = row.get("replicas")
4041+
tablet.stage = row.get("stage")
4042+
4043+
prev = tablets.get((keyspace_name, table_name), [])
4044+
prev.append(tablet)
4045+
tablets[(keyspace_name, table_name)] = prev
4046+
4047+
if self._cluster.metadata._tablets is None:
4048+
self._cluster.metadata._tablets = Tablets()
4049+
self._cluster.metadata._tablets.set_tablets(tablets)
4050+
self._cluster.scheduler.schedule_unique(self._tablet_refresh_time, self._refresh_tablets, connection)
4051+
return ""
4052+
39674053
@staticmethod
39684054
def _is_valid_peer(row):
39694055
return bool(_NodeInfo.get_broadcast_rpc_address(row) and row.get("host_id") and
39704056
row.get("data_center") and row.get("rack") and
39714057
('tokens' not in row or row.get('tokens')))
39724058

4059+
# Experimental, this interface and use may change
4060+
@staticmethod
4061+
def _is_valid_tablet(row):
4062+
return bool(row.get("replicas") is not None and len(row.get("replicas")) != 0 and row.get("table_name") is not None)
4063+
39734064
def _update_location_info(self, host, datacenter, rack):
39744065
if host.datacenter == datacenter and host.rack == rack:
39754066
return False
@@ -4571,7 +4662,10 @@ def _query(self, host, message=None, cb=None):
45714662
connection = None
45724663
try:
45734664
# TODO get connectTimeout from cluster settings
4574-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key if self.query else None)
4665+
if self.query:
4666+
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
4667+
else:
4668+
connection, request_id = pool.borrow_connection(timeout=2.0)
45754669
self._connection = connection
45764670
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
45774671

cassandra/metadata.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import six
2424
from six.moves import zip
2525
import sys
26-
from threading import RLock
26+
from threading import RLock, Lock
2727
import struct
2828
import random
2929
import itertools
@@ -126,6 +126,7 @@ def __init__(self):
126126
self._hosts = {}
127127
self._host_id_by_endpoint = {}
128128
self._hosts_lock = RLock()
129+
self._tablets = None
129130

130131
def export_schema_as_string(self):
131132
"""

cassandra/policies.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import bisect
1516
from itertools import islice, cycle, groupby, repeat
1617
import logging
18+
import os
1719
from random import randint, shuffle
1820
from threading import Lock
1921
import socket
@@ -335,6 +337,7 @@ class TokenAwarePolicy(LoadBalancingPolicy):
335337

336338
_child_policy = None
337339
_cluster_metadata = None
340+
_experimental_tablets_enabled = False
338341
shuffle_replicas = False
339342
"""
340343
Yield local replicas in a random order.
@@ -346,6 +349,7 @@ def __init__(self, child_policy, shuffle_replicas=False):
346349

347350
def populate(self, cluster, hosts):
348351
self._cluster_metadata = cluster.metadata
352+
self._experimental_tablets_enabled = cluster.check_tablets_enabled()
349353
self._child_policy.populate(cluster, hosts)
350354

351355
def check_supported(self):
@@ -376,7 +380,22 @@ def make_query_plan(self, working_keyspace=None, query=None):
376380
for host in child.make_query_plan(keyspace, query):
377381
yield host
378382
else:
379-
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
383+
if self._experimental_tablets_enabled and self._cluster_metadata._tablets is not None:
384+
replicas = self._cluster_metadata._tablets.get_replicas_for_key(keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(routing_key))
385+
386+
if replicas != []:
387+
replicas_mapped = set(map(lambda r: r[0], replicas))
388+
child_plan = child.make_query_plan(keyspace, query)
389+
390+
replicas = []
391+
for host in child_plan:
392+
if host.host_id in replicas_mapped:
393+
replicas.append(host)
394+
else:
395+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
396+
else:
397+
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
398+
380399
if self.shuffle_replicas:
381400
shuffle(replicas)
382401
for replica in replicas:

cassandra/pool.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
"""
1616
Connection pooling and host management.
1717
"""
18+
import bisect
1819
from concurrent.futures import Future
1920
from functools import total_ordering
2021
import logging
22+
import os
2123
import socket
2224
import time
2325
import random
@@ -439,7 +441,7 @@ def __init__(self, host, host_distance, session):
439441

440442
log.debug("Finished initializing connection for host %s", self.host)
441443

442-
def _get_connection_for_routing_key(self, routing_key=None):
444+
def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None):
443445
if self.is_shutdown:
444446
raise ConnectionException(
445447
"Pool for %s is shutdown" % (self.host,), self.host)
@@ -450,7 +452,19 @@ def _get_connection_for_routing_key(self, routing_key=None):
450452
shard_id = None
451453
if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key:
452454
t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key)
453-
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
455+
456+
if self._session.cluster.check_tablets_enabled() and self._session.cluster._load_balancing_policy._cluster_metadata._tablets is not None and table is not None:
457+
if keyspace is None:
458+
keyspace = self._keyspace
459+
460+
replicas = self._session.cluster._load_balancing_policy._cluster_metadata._tablets.get_replicas_for_key(keyspace, table, t)
461+
462+
if replicas != []:
463+
shard_id = replicas[0][1]
464+
else:
465+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
466+
else:
467+
shard_id = self.host.sharding_info.shard_id_from_token(t.value)
454468

455469
conn = self._connections.get(shard_id)
456470

@@ -496,15 +510,15 @@ def _get_connection_for_routing_key(self, routing_key=None):
496510
return random.choice(active_connections)
497511
return random.choice(list(self._connections.values()))
498512

499-
def borrow_connection(self, timeout, routing_key=None):
500-
conn = self._get_connection_for_routing_key(routing_key)
513+
def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None):
514+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
501515
start = time.time()
502516
remaining = timeout
503517
last_retry = False
504518
while True:
505519
if conn.is_closed:
506520
# The connection might have been closed in the meantime - if so, try again
507-
conn = self._get_connection_for_routing_key(routing_key)
521+
conn = self._get_connection_for_routing_key(routing_key, keyspace, table)
508522
with conn.lock:
509523
if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id:
510524
# On last retry we ignore connection status, since it is better to return closed connection than

cassandra/query.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ class Statement(object):
253253
.. versionadded:: 2.1.3
254254
"""
255255

256+
table = None
257+
"""
258+
The string name of the table this query acts on. This is used when the tablet
259+
experimental feature is enabled and in the same time :class`~.TokenAwarePolicy`
260+
is configured in the profile load balancing policy.
261+
"""
262+
256263
custom_payload = None
257264
"""
258265
:ref:`custom_payload` to be passed to the server.
@@ -272,7 +279,7 @@ class Statement(object):
272279

273280
def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
274281
serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None,
275-
is_idempotent=False):
282+
is_idempotent=False, table=None):
276283
if retry_policy and not hasattr(retry_policy, 'on_read_timeout'): # just checking one method to detect positional parameter errors
277284
raise ValueError('retry_policy should implement cassandra.policies.RetryPolicy')
278285
if retry_policy is not None:
@@ -286,6 +293,8 @@ def __init__(self, retry_policy=None, consistency_level=None, routing_key=None,
286293
self.fetch_size = fetch_size
287294
if keyspace is not None:
288295
self.keyspace = keyspace
296+
if table is not None:
297+
self.table = table
289298
if custom_payload is not None:
290299
self.custom_payload = custom_payload
291300
self.is_idempotent = is_idempotent
@@ -548,6 +557,7 @@ def __init__(self, prepared_statement, retry_policy=None, consistency_level=None
548557
meta = prepared_statement.column_metadata
549558
if meta:
550559
self.keyspace = meta[0].keyspace_name
560+
self.table = meta[0].table_name
551561

552562
Statement.__init__(self, retry_policy, consistency_level, routing_key,
553563
serial_consistency_level, fetch_size, keyspace, custom_payload,

0 commit comments

Comments
 (0)