19
19
from __future__ import absolute_import
20
20
21
21
import atexit
22
+ import datetime
22
23
from binascii import hexlify
23
24
from collections import defaultdict
24
25
from collections .abc import Mapping
66
67
RESULT_KIND_SET_KEYSPACE , RESULT_KIND_ROWS ,
67
68
RESULT_KIND_SCHEMA_CHANGE , ProtocolHandler ,
68
69
RESULT_KIND_VOID , ProtocolException )
69
- from cassandra .metadata import Metadata , protect_name , murmur3 , _NodeInfo
70
+ from cassandra .metadata import Metadata , protect_name , murmur3 , _NodeInfo , SchemaQueryMessage
70
71
from cassandra .policies import (TokenAwarePolicy , DCAwareRoundRobinPolicy , SimpleConvictionPolicy ,
71
72
ExponentialReconnectionPolicy , HostDistance ,
72
73
RetryPolicy , IdentityTranslator , NoSpeculativeExecutionPlan ,
82
83
from cassandra .marshal import int64_pack
83
84
from cassandra .tablets import Tablet , Tablets
84
85
from cassandra .timestamps import MonotonicTimestampGenerator
85
- from cassandra .util import _resolve_contact_points_to_string_map , Version
86
+ from cassandra .util import _resolve_contact_points_to_string_map , Version , add_timeout_to_query
86
87
87
88
from cassandra .datastax .insights .reporter import MonitorReporter
88
89
from cassandra .datastax .insights .util import version_supports_insights
@@ -1033,6 +1034,12 @@ def default_retry_policy(self, policy):
1033
1034
or to disable the shardaware port (advanced shardaware)
1034
1035
"""
1035
1036
1037
+ metadata_request_timeout = datetime .timedelta (seconds = 2 )
1038
+ """
1039
+ Timeout for all queries used by driver it self.
1040
+ Supported only by Scylla clusters.
1041
+ """
1042
+
1036
1043
@property
1037
1044
def schema_metadata_enabled (self ):
1038
1045
"""
@@ -1148,7 +1155,9 @@ def __init__(self,
1148
1155
client_id = None ,
1149
1156
cloud = None ,
1150
1157
scylla_cloud = None ,
1151
- shard_aware_options = None ):
1158
+ shard_aware_options = None ,
1159
+ metadata_request_timeout : datetime .timedelta | None = datetime .timedelta (seconds = 2 ),
1160
+ ):
1152
1161
"""
1153
1162
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
1154
1163
extablishing connection pools or refreshing metadata.
@@ -1240,6 +1249,8 @@ def __init__(self,
1240
1249
self .no_compact = no_compact
1241
1250
1242
1251
self .auth_provider = auth_provider
1252
+ if metadata_request_timeout is not None :
1253
+ self .metadata_request_timeout = metadata_request_timeout
1243
1254
1244
1255
if load_balancing_policy is not None :
1245
1256
if isinstance (load_balancing_policy , type ):
@@ -3549,6 +3560,7 @@ class PeersQueryType(object):
3549
3560
_is_shutdown = False
3550
3561
_timeout = None
3551
3562
_protocol_version = None
3563
+ _metadata_request_timeout : datetime .timedelta | None = None
3552
3564
3553
3565
_schema_event_refresh_window = None
3554
3566
_topology_event_refresh_window = None
@@ -3648,7 +3660,7 @@ def _reconnect_internal(self):
3648
3660
(conn , _ ) = self ._connect_host_in_lbp ()
3649
3661
if conn is not None :
3650
3662
return conn
3651
-
3663
+
3652
3664
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
3653
3665
self ._cluster ._resolve_hostnames ()
3654
3666
@@ -3689,11 +3701,16 @@ def _try_connect(self, host):
3689
3701
"registering watchers and refreshing schema and topology" ,
3690
3702
connection )
3691
3703
3704
+ host .sharding_info = connection .features .sharding_info
3705
+
3692
3706
# Indirect way to determine if conencted to a ScyllaDB cluster, which does not support peers_v2
3693
3707
# If sharding information is available, it's a ScyllaDB cluster, so do not use peers_v2 table.
3694
3708
if connection .features .sharding_info is not None :
3695
3709
self ._uses_peers_v2 = False
3696
-
3710
+
3711
+ # Cassandra does not support "USING TIMEOUT"
3712
+ self ._metadata_request_timeout = None if connection .features .sharding_info is None \
3713
+ else datetime .timedelta (seconds = self ._cluster .control_connection_timeout )
3697
3714
self ._tablets_routing_v1 = connection .features .tablets_routing_v1
3698
3715
3699
3716
# use weak references in both directions
@@ -3710,8 +3727,10 @@ def _try_connect(self, host):
3710
3727
3711
3728
sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3712
3729
sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
3713
- peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3714
- local_query = QueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE )
3730
+ peers_query = SchemaQueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE ,
3731
+ custom_timeout = self ._metadata_request_timeout )
3732
+ local_query = SchemaQueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE ,
3733
+ custom_timeout = self ._metadata_request_timeout )
3715
3734
(peers_success , peers_result ), (local_success , local_result ) = connection .wait_for_responses (
3716
3735
peers_query , local_query , timeout = self ._timeout , fail_on_error = False )
3717
3736
@@ -3830,7 +3849,12 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
3830
3849
log .debug ("Skipping schema refresh due to lack of schema agreement" )
3831
3850
return False
3832
3851
3833
- self ._cluster .metadata .refresh (connection , self ._timeout , fetch_size = self ._schema_meta_page_size , ** kwargs )
3852
+ self ._cluster .metadata .refresh (
3853
+ connection ,
3854
+ self ._timeout ,
3855
+ fetch_size = self ._schema_meta_page_size ,
3856
+ metadata_request_timeout = self ._actual_metadata_request_timeout ,
3857
+ ** kwargs )
3834
3858
3835
3859
return True
3836
3860
@@ -3861,8 +3885,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3861
3885
else :
3862
3886
log .debug ("[control connection] Refreshing node list and token map" )
3863
3887
sel_local = self ._SELECT_LOCAL
3864
- peers_query = QueryMessage (query = sel_peers , consistency_level = cl )
3865
- local_query = QueryMessage (query = sel_local , consistency_level = cl )
3888
+ peers_query = SchemaQueryMessage (query = sel_peers , consistency_level = cl ,
3889
+ custom_timeout = self ._metadata_request_timeout )
3890
+ local_query = SchemaQueryMessage (query = sel_local , consistency_level = cl ,
3891
+ custom_timeout = self ._metadata_request_timeout )
3866
3892
peers_result , local_result = connection .wait_for_responses (
3867
3893
peers_query , local_query , timeout = self ._timeout )
3868
3894
@@ -3917,8 +3943,11 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3917
3943
# local rpc_address has not been queried yet, try to fetch it
3918
3944
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
3919
3945
# in system.local. See CASSANDRA-9436.
3920
- local_rpc_address_query = QueryMessage (query = self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS ,
3921
- consistency_level = ConsistencyLevel .ONE )
3946
+ local_rpc_address_query = SchemaQueryMessage (
3947
+ query = self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS ,
3948
+ consistency_level = ConsistencyLevel .ONE ,
3949
+ custom_timeout = self ._metadata_request_timeout ,
3950
+ )
3922
3951
success , local_rpc_address_result = connection .wait_for_response (
3923
3952
local_rpc_address_query , timeout = self ._timeout , fail_on_error = False )
3924
3953
if success :
@@ -4153,8 +4182,10 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4153
4182
select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
4154
4183
4155
4184
while elapsed < total_timeout :
4156
- peers_query = QueryMessage (query = select_peers_query , consistency_level = cl )
4157
- local_query = QueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl )
4185
+ peers_query = SchemaQueryMessage (query = select_peers_query , consistency_level = cl ,
4186
+ custom_timeout = self ._metadata_request_timeout )
4187
+ local_query = SchemaQueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl ,
4188
+ custom_timeout = self ._metadata_request_timeout )
4158
4189
try :
4159
4190
timeout = min (self ._timeout , total_timeout - elapsed )
4160
4191
peers_result , local_result = connection .wait_for_responses (
0 commit comments