83
83
from cassandra .marshal import int64_pack
84
84
from cassandra .tablets import Tablet , Tablets
85
85
from cassandra .timestamps import MonotonicTimestampGenerator
86
- from cassandra .util import _resolve_contact_points_to_string_map , Version
86
+ from cassandra .util import _resolve_contact_points_to_string_map , Version , maybe_add_timeout_to_query
87
87
88
88
from cassandra .datastax .insights .reporter import MonitorReporter
89
89
from cassandra .datastax .insights .util import version_supports_insights
@@ -3725,8 +3725,10 @@ def _try_connect(self, host):
3725
3725
3726
3726
sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3727
3727
sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
3728
- peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3729
- local_query = QueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE )
3728
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3729
+ consistency_level = ConsistencyLevel .ONE )
3730
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3731
+ consistency_level = ConsistencyLevel .ONE )
3730
3732
(peers_success , peers_result ), (local_success , local_result ) = connection .wait_for_responses (
3731
3733
peers_query , local_query , timeout = self ._timeout , fail_on_error = False )
3732
3734
@@ -3737,7 +3739,8 @@ def _try_connect(self, host):
3737
3739
# error with the peers v2 query, fallback to peers v1
3738
3740
self ._uses_peers_v2 = False
3739
3741
sel_peers = self ._get_peers_query (self .PeersQueryType .PEERS , connection )
3740
- peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
3742
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3743
+ consistency_level = ConsistencyLevel .ONE )
3741
3744
peers_result = connection .wait_for_response (
3742
3745
peers_query , timeout = self ._timeout )
3743
3746
@@ -3881,8 +3884,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3881
3884
else :
3882
3885
log .debug ("[control connection] Refreshing node list and token map" )
3883
3886
sel_local = self ._SELECT_LOCAL
3884
- peers_query = QueryMessage (query = sel_peers , consistency_level = cl )
3885
- local_query = QueryMessage (query = sel_local , consistency_level = cl )
3887
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (sel_peers , self ._metadata_request_timeout ),
3888
+ consistency_level = cl )
3889
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (sel_local , self ._metadata_request_timeout ),
3890
+ consistency_level = cl )
3886
3891
peers_result , local_result = connection .wait_for_responses (
3887
3892
peers_query , local_query , timeout = self ._timeout )
3888
3893
@@ -3937,8 +3942,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
3937
3942
# local rpc_address has not been queried yet, try to fetch it
3938
3943
# separately, which might fail because C* < 2.1.6 doesn't have rpc_address
3939
3944
# in system.local. See CASSANDRA-9436.
3940
- local_rpc_address_query = QueryMessage (query = self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS ,
3941
- consistency_level = ConsistencyLevel .ONE )
3945
+ local_rpc_address_query = QueryMessage (
3946
+ query = maybe_add_timeout_to_query (self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS , self ._metadata_request_timeout ),
3947
+ consistency_level = ConsistencyLevel .ONE )
3942
3948
success , local_rpc_address_result = connection .wait_for_response (
3943
3949
local_rpc_address_query , timeout = self ._timeout , fail_on_error = False )
3944
3950
if success :
@@ -4173,8 +4179,10 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4173
4179
select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
4174
4180
4175
4181
while elapsed < total_timeout :
4176
- peers_query = QueryMessage (query = select_peers_query , consistency_level = cl )
4177
- local_query = QueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl )
4182
+ peers_query = QueryMessage (query = maybe_add_timeout_to_query (select_peers_query , self ._metadata_request_timeout ),
4183
+ consistency_level = cl )
4184
+ local_query = QueryMessage (query = maybe_add_timeout_to_query (self ._SELECT_SCHEMA_LOCAL , self ._metadata_request_timeout ),
4185
+ consistency_level = cl )
4178
4186
try :
4179
4187
timeout = min (self ._timeout , total_timeout - elapsed )
4180
4188
peers_result , local_result = connection .wait_for_responses (
0 commit comments