19
19
from __future__ import absolute_import
20
20
21
21
import atexit
22
+ import datetime
22
23
from binascii import hexlify
23
24
from collections import defaultdict
24
25
from collections .abc import Mapping
82
83
from cassandra .marshal import int64_pack
83
84
from cassandra .tablets import Tablet , Tablets
84
85
from cassandra .timestamps import MonotonicTimestampGenerator
85
- from cassandra .util import _resolve_contact_points_to_string_map , Version
86
+ from cassandra .util import _resolve_contact_points_to_string_map , Version , maybe_add_timeout_to_query
86
87
87
88
from cassandra .datastax .insights .reporter import MonitorReporter
88
89
from cassandra .datastax .insights .util import version_supports_insights
@@ -1033,6 +1034,12 @@ def default_retry_policy(self, policy):
1033
1034
or to disable the shardaware port (advanced shardaware)
1034
1035
"""
1035
1036
1037
+ metadata_request_timeout = datetime .timedelta (seconds = 2 )
1038
+ """
1039
+ Timeout for all queries used by driver it self.
1040
+ Supported only by Scylla clusters.
1041
+ """
1042
+
1036
1043
@property
1037
1044
def schema_metadata_enabled (self ):
1038
1045
"""
@@ -1148,7 +1155,9 @@ def __init__(self,
1148
1155
client_id = None ,
1149
1156
cloud = None ,
1150
1157
scylla_cloud = None ,
1151
- shard_aware_options = None ):
1158
+ shard_aware_options = None ,
1159
+ metadata_request_timeout = None ,
1160
+ ):
1152
1161
"""
1153
1162
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
1154
1163
extablishing connection pools or refreshing metadata.
@@ -1240,6 +1249,8 @@ def __init__(self,
1240
1249
self .no_compact = no_compact
1241
1250
1242
1251
self .auth_provider = auth_provider
1252
+ if metadata_request_timeout is not None :
1253
+ self .metadata_request_timeout = metadata_request_timeout
1243
1254
1244
1255
if load_balancing_policy is not None :
1245
1256
if isinstance (load_balancing_policy , type ):
@@ -3549,6 +3560,7 @@ class PeersQueryType(object):
3549
3560
_is_shutdown = False
3550
3561
_timeout = None
3551
3562
_protocol_version = None
3563
+ _metadata_request_timeout = None
3552
3564
3553
3565
_schema_event_refresh_window = None
3554
3566
_topology_event_refresh_window = None
@@ -3648,7 +3660,7 @@ def _reconnect_internal(self):
3648
3660
(conn , _ ) = self ._connect_host_in_lbp ()
3649
3661
if conn is not None :
3650
3662
return conn
3651
-
3663
+
3652
3664
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
3653
3665
self ._cluster ._resolve_hostnames ()
3654
3666
@@ -3693,7 +3705,10 @@ def _try_connect(self, host):
3693
3705
# If sharding information is available, it's a ScyllaDB cluster, so do not use peers_v2 table.
3694
3706
if connection .features .sharding_info is not None :
3695
3707
self ._uses_peers_v2 = False
3696
-
3708
+
3709
+ # Cassandra does not support "USING TIMEOUT"
3710
+ self ._metadata_request_timeout = None if connection .features .sharding_info is None \
3711
+ else datetime .timedelta (seconds = self ._cluster .control_connection_timeout )
3697
3712
self ._tablets_routing_v1 = connection .features .tablets_routing_v1
3698
3713
3699
3714
# use weak references in both directions
@@ -3710,8 +3725,10 @@ def _try_connect(self, host):
3710
3725
3711
3726
sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3712
3727
sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
3713
- peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3714
- local_query = QueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE )
3728
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3729
+ consistency_level = ConsistencyLevel .ONE )
3730
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3731
+ consistency_level = ConsistencyLevel .ONE )
3715
3732
(peers_success , peers_result ), (local_success , local_result ) = connection .wait_for_responses (
3716
3733
peers_query , local_query , timeout = self ._timeout , fail_on_error = False )
3717
3734
@@ -3722,7 +3739,8 @@ def _try_connect(self, host):
3722
3739
# error with the peers v2 query, fallback to peers v1
3723
3740
self ._uses_peers_v2 = False
3724
3741
sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3725
- peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3742
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3743
+ consistency_level = ConsistencyLevel .ONE )
3726
3744
peers_result = connection .wait_for_response (
3727
3745
peers_query , timeout = self ._timeout )
3728
3746
@@ -3830,7 +3848,12 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
3830
3848
log .debug ("Skipping schema refresh due to lack of schema agreement" )
3831
3849
return False
3832
3850
3833
- self ._cluster .metadata .refresh (connection , self ._timeout , fetch_size = self ._schema_meta_page_size , ** kwargs )
3851
+ self ._cluster .metadata .refresh (
3852
+ connection ,
3853
+ self ._timeout ,
3854
+ fetch_size = self ._schema_meta_page_size ,
3855
+ metadata_request_timeout = self ._metadata_request_timeout ,
3856
+ ** kwargs )
3834
3857
3835
3858
return True
3836
3859
@@ -3861,8 +3884,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3861
3884
else :
3862
3885
log .debug ("[control connection] Refreshing node list and token map" )
3863
3886
sel_local = self ._SELECT_LOCAL
3864
- peers_query = QueryMessage (query = sel_peers , consistency_level = cl )
3865
- local_query = QueryMessage (query = sel_local , consistency_level = cl )
3887
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3888
+ consistency_level = cl )
3889
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3890
+ consistency_level = cl )
3866
3891
peers_result , local_result = connection .wait_for_responses (
3867
3892
peers_query , local_query , timeout = self ._timeout )
3868
3893
@@ -3917,8 +3942,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3917
3942
# local rpc_address has not been queried yet, try to fetch it
3918
3943
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
3919
3944
# in system.local. See CASSANDRA-9436.
3920
- local_rpc_address_query = QueryMessage (query = self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS ,
3921
- consistency_level = ConsistencyLevel .ONE )
3945
+ local_rpc_address_query = QueryMessage (
3946
+ query = maybe_add_timeout_to_query (self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS , self ._metadata_request_timeout ),
3947
+ consistency_level = ConsistencyLevel .ONE )
3922
3948
success , local_rpc_address_result = connection .wait_for_response (
3923
3949
local_rpc_address_query , timeout = self ._timeout , fail_on_error = False )
3924
3950
if success :
@@ -4153,8 +4179,10 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4153
4179
select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
4154
4180
4155
4181
while elapsed < total_timeout :
4156
- peers_query = QueryMessage (query = select_peers_query , consistency_level = cl )
4157
- local_query = QueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl )
4182
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (select_peers_query , self ._metadata_request_timeout ),
4183
+ consistency_level = cl )
4184
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (self ._SELECT_SCHEMA_LOCAL , self ._metadata_request_timeout ),
4185
+ consistency_level = cl )
4158
4186
try :
4159
4187
timeout = min (self ._timeout , total_timeout - elapsed )
4160
4188
peers_result , local_result = connection .wait_for_responses (
0 commit comments