Skip to content

Commit e4a000f

Browse files
committed
Introduce metadata_request_timeout configuration option
This option allows user to control timeout for driver internal queries. Idea is to make driver queries more resilient and being independent of user queries.
1 parent f112165 commit e4a000f

File tree

4 files changed

+49
-26
lines changed

4 files changed

+49
-26
lines changed

cassandra/cluster.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import absolute_import
2020

2121
import atexit
22+
import datetime
2223
from binascii import hexlify
2324
from collections import defaultdict
2425
from collections.abc import Mapping
@@ -1033,6 +1034,12 @@ def default_retry_policy(self, policy):
10331034
or to disable the shardaware port (advanced shardaware)
10341035
"""
10351036

1037+
metadata_request_timeout = datetime.timedelta(seconds=2)
1038+
"""
1039+
Timeout for all queries used by driver it self.
1040+
Supported only by Scylla clusters.
1041+
"""
1042+
10361043
@property
10371044
def schema_metadata_enabled(self):
10381045
"""
@@ -1148,7 +1155,9 @@ def __init__(self,
11481155
client_id=None,
11491156
cloud=None,
11501157
scylla_cloud=None,
1151-
shard_aware_options=None):
1158+
shard_aware_options=None,
1159+
metadata_request_timeout=None,
1160+
):
11521161
"""
11531162
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
11541163
extablishing connection pools or refreshing metadata.
@@ -1240,6 +1249,8 @@ def __init__(self,
12401249
self.no_compact = no_compact
12411250

12421251
self.auth_provider = auth_provider
1252+
if metadata_request_timeout is not None:
1253+
self.metadata_request_timeout = metadata_request_timeout
12431254

12441255
if load_balancing_policy is not None:
12451256
if isinstance(load_balancing_policy, type):
@@ -3549,6 +3560,7 @@ class PeersQueryType(object):
35493560
_is_shutdown = False
35503561
_timeout = None
35513562
_protocol_version = None
3563+
_metadata_request_timeout = None
35523564

35533565
_schema_event_refresh_window = None
35543566
_topology_event_refresh_window = None
@@ -3648,7 +3660,7 @@ def _reconnect_internal(self):
36483660
(conn, _) = self._connect_host_in_lbp()
36493661
if conn is not None:
36503662
return conn
3651-
3663+
36523664
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
36533665
self._cluster._resolve_hostnames()
36543666

@@ -3693,7 +3705,10 @@ def _try_connect(self, host):
36933705
# If sharding information is available, it's a ScyllaDB cluster, so do not use peers_v2 table.
36943706
if connection.features.sharding_info is not None:
36953707
self._uses_peers_v2 = False
3696-
3708+
3709+
# Cassandra does not support "USING TIMEOUT"
3710+
self._metadata_request_timeout = None if connection.features.sharding_info is None \
3711+
else datetime.timedelta(seconds=self._cluster.control_connection_timeout)
36973712
self._tablets_routing_v1 = connection.features.tablets_routing_v1
36983713

36993714
# use weak references in both directions
@@ -3830,7 +3845,12 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
38303845
log.debug("Skipping schema refresh due to lack of schema agreement")
38313846
return False
38323847

3833-
self._cluster.metadata.refresh(connection, self._timeout, fetch_size=self._schema_meta_page_size, **kwargs)
3848+
self._cluster.metadata.refresh(
3849+
connection,
3850+
self._timeout,
3851+
fetch_size=self._schema_meta_page_size,
3852+
metadata_request_timeout=self._metadata_request_timeout,
3853+
**kwargs)
38343854

38353855
return True
38363856

cassandra/metadata.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,12 @@ def export_schema_as_string(self):
134134
"""
135135
return "\n\n".join(ks.export_as_string() for ks in self.keyspaces.values())
136136

137-
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None, **kwargs):
137+
def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None,
138+
metadata_request_timeout=None, **kwargs):
138139

139140
server_version = self.get_host(connection.original_endpoint).release_version
140141
dse_version = self.get_host(connection.original_endpoint).dse_version
141-
parser = get_schema_parser(connection, server_version, dse_version, timeout, fetch_size)
142+
parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size)
142143

143144
if not target_type:
144145
self._rebuild_all(parser)
@@ -1946,11 +1947,11 @@ def export_as_string(self):
19461947

19471948

19481949
class _SchemaParser(object):
1949-
1950-
def __init__(self, connection, timeout, fetch_size):
1950+
def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
19511951
self.connection = connection
19521952
self.timeout = timeout
19531953
self.fetch_size = fetch_size
1954+
self.metadata_request_timeout = metadata_request_timeout
19541955

19551956
def _handle_results(self, success, result, expected_failures=tuple(), query_msg=None, timeout=None):
19561957
"""
@@ -2054,8 +2055,8 @@ class SchemaParserV22(_SchemaParser):
20542055
"compression",
20552056
"default_time_to_live")
20562057

2057-
def __init__(self, connection, timeout, fetch_size):
2058-
super(SchemaParserV22, self).__init__(connection, timeout, fetch_size)
2058+
def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
2059+
super(SchemaParserV22, self).__init__(connection, timeout, fetch_size, metadata_request_timeout)
20592060
self.keyspaces_result = []
20602061
self.tables_result = []
20612062
self.columns_result = []
@@ -2575,8 +2576,8 @@ class SchemaParserV3(SchemaParserV22):
25752576
'read_repair_chance',
25762577
'speculative_retry')
25772578

2578-
def __init__(self, connection, timeout, fetch_size):
2579-
super(SchemaParserV3, self).__init__(connection, timeout, fetch_size)
2579+
def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
2580+
super(SchemaParserV3, self).__init__(connection, timeout, fetch_size, metadata_request_timeout)
25802581
self.indexes_result = []
25812582
self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list))
25822583
self.keyspace_view_rows = defaultdict(list)
@@ -2860,8 +2861,8 @@ class SchemaParserV4(SchemaParserV3):
28602861
_SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables'
28612862
_SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns'
28622863

2863-
def __init__(self, connection, timeout, fetch_size):
2864-
super(SchemaParserV4, self).__init__(connection, timeout, fetch_size)
2864+
def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
2865+
super(SchemaParserV4, self).__init__(connection, timeout, fetch_size, metadata_request_timeout)
28652866
self.virtual_keyspaces_rows = defaultdict(list)
28662867
self.virtual_tables_rows = defaultdict(list)
28672868
self.virtual_columns_rows = defaultdict(lambda: defaultdict(list))
@@ -2995,8 +2996,8 @@ class SchemaParserDSE68(SchemaParserDSE67):
29952996

29962997
_table_metadata_class = TableMetadataDSE68
29972998

2998-
def __init__(self, connection, timeout, fetch_size):
2999-
super(SchemaParserDSE68, self).__init__(connection, timeout, fetch_size)
2999+
def __init__(self, connection, timeout, fetch_size, metadata_request_timeout):
3000+
super(SchemaParserDSE68, self).__init__(connection, timeout, fetch_size, metadata_request_timeout)
30003001
self.keyspace_table_vertex_rows = defaultdict(lambda: defaultdict(list))
30013002
self.keyspace_table_edge_rows = defaultdict(lambda: defaultdict(list))
30023003

@@ -3361,25 +3362,25 @@ def __init__(
33613362
self.to_clustering_columns = to_clustering_columns
33623363

33633364

3364-
def get_schema_parser(connection, server_version, dse_version, timeout, fetch_size=None):
3365+
def get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size=None):
33653366
version = Version(server_version)
33663367
if dse_version:
33673368
v = Version(dse_version)
33683369
if v >= Version('6.8.0'):
3369-
return SchemaParserDSE68(connection, timeout, fetch_size)
3370+
return SchemaParserDSE68(connection, timeout, fetch_size, metadata_request_timeout)
33703371
elif v >= Version('6.7.0'):
3371-
return SchemaParserDSE67(connection, timeout, fetch_size)
3372+
return SchemaParserDSE67(connection, timeout, fetch_size, metadata_request_timeout)
33723373
elif v >= Version('6.0.0'):
3373-
return SchemaParserDSE60(connection, timeout, fetch_size)
3374+
return SchemaParserDSE60(connection, timeout, fetch_size, metadata_request_timeout)
33743375

33753376
if version >= Version('4-a'):
3376-
return SchemaParserV4(connection, timeout, fetch_size)
3377+
return SchemaParserV4(connection, timeout, fetch_size, metadata_request_timeout)
33773378
elif version >= Version('3.0.0'):
3378-
return SchemaParserV3(connection, timeout, fetch_size)
3379+
return SchemaParserV3(connection, timeout, fetch_size, metadata_request_timeout)
33793380
else:
33803381
# we could further specialize by version. Right now just refactoring the
33813382
# multi-version parser we have as of C* 2.2.0rc1.
3382-
return SchemaParserV22(connection, timeout, fetch_size)
3383+
return SchemaParserV22(connection, timeout, fetch_size, metadata_request_timeout)
33833384

33843385

33853386
def _cql_from_cass_type(cass_type):

tests/integration/standard/test_metadata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ def test_basic_table_meta_properties(self):
243243
cc,
244244
self.cluster.metadata.get_host(cc.host).release_version,
245245
self.cluster.metadata.get_host(cc.host).dse_version,
246-
1
246+
1,
247+
None,
247248
)
248249

249250
for option in tablemeta.options:
@@ -1968,7 +1969,8 @@ def setup_class(cls):
19681969
connection,
19691970
cls.cluster.metadata.get_host(connection.host).release_version,
19701971
cls.cluster.metadata.get_host(connection.host).dse_version,
1971-
timeout=20
1972+
20,
1973+
None,
19721974
).__class__
19731975
cls.cluster.control_connection.reconnect = Mock()
19741976

tests/unit/test_metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ def test_build_index_as_cql(self):
618618
column_meta.table.name = 'table_name_here'
619619
column_meta.table.keyspace_name = 'keyspace_name_here'
620620
column_meta.table.columns = {column_meta.name: column_meta}
621-
parser = get_schema_parser(Mock(), '2.1.0', None, 0.1)
621+
parser = get_schema_parser(Mock(), '2.1.0', None, 0.1, None)
622622

623623
row = {'index_name': 'index_name_here', 'index_type': 'index_type_here'}
624624
index_meta = parser._build_index_metadata(column_meta, row)

0 commit comments

Comments
 (0)