Skip to content

Commit f36ba79

Browse files
Use RateLimitReached error
Now, the connection negotiates protocol features and uses them later in decoding. RateLimitReached is used instead of deafault.
1 parent 3ca24b2 commit f36ba79

File tree

2 files changed

+22
-9
lines changed

2 files changed

+22
-9
lines changed

cassandra/connection.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import random
3232
import itertools
3333

34+
from cassandra.protocol_features import ProtocolFeatures
35+
3436
if 'gevent.monkey' in sys.modules:
3537
from gevent.queue import Queue, Empty
3638
else:
@@ -772,6 +774,8 @@ class Connection(object):
772774

773775
_on_orphaned_stream_released = None
774776

777+
features = None
778+
775779
@property
776780
def _iobuf(self):
777781
# backward compatibility, to avoid any change in the reactors
@@ -1263,7 +1267,7 @@ def process_msg(self, header, body):
12631267
return
12641268

12651269
try:
1266-
response = decoder(header.version, self.user_type_map, stream_id,
1270+
response = decoder(header.version, self.features, self.user_type_map, stream_id,
12671271
header.flags, header.opcode, body, self.decompressor, result_metadata)
12681272
except Exception as exc:
12691273
log.exception("Error decoding response from Cassandra. "
@@ -1338,6 +1342,11 @@ def _handle_options_response(self, options_response):
13381342
remote_supported_compressions = options_response.options['COMPRESSION']
13391343
self._product_type = options_response.options.get('PRODUCT_TYPE', [None])[0]
13401344

1345+
protocol_features = ProtocolFeatures.parse_from_supported(options_response.options)
1346+
options = {}
1347+
protocol_features.add_startup_options(options)
1348+
self.features = protocol_features
1349+
13411350
if self.cql_version:
13421351
if self.cql_version not in supported_cql_versions:
13431352
raise ProtocolError(
@@ -1388,13 +1397,14 @@ def _handle_options_response(self, options_response):
13881397
self._compressor, self.decompressor = \
13891398
locally_supported_compressions[compression_type]
13901399

1391-
self._send_startup_message(compression_type, no_compact=self.no_compact)
1400+
self._send_startup_message(compression_type, no_compact=self.no_compact, extra_options=options)
13921401

13931402
@defunct_on_error
1394-
def _send_startup_message(self, compression=None, no_compact=False):
1403+
def _send_startup_message(self, compression=None, no_compact=False, extra_options=None):
13951404
log.debug("Sending StartupMessage on %s", self)
13961405
opts = {'DRIVER_NAME': DRIVER_NAME,
1397-
'DRIVER_VERSION': DRIVER_VERSION}
1406+
'DRIVER_VERSION': DRIVER_VERSION,
1407+
**extra_options}
13981408
if compression:
13991409
opts['COMPRESSION'] = compression
14001410
if no_compact:

cassandra/protocol.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,13 @@ def __init__(self, code, message, info):
126126
self.info = info
127127

128128
@classmethod
129-
def recv_body(cls, f, protocol_version, *args):
129+
def recv_body(cls, f, protocol_version, protocol_features, *args):
130130
code = read_int(f)
131131
msg = read_string(f)
132-
subcls = error_classes.get(code, cls)
132+
if code == protocol_features.rate_limit_error:
133+
subcls = RateLimitReachedException
134+
else:
135+
subcls = error_classes.get(code, cls)
133136
extra_info = subcls.recv_error_info(f, protocol_version)
134137
return subcls(code=code, message=msg, info=extra_info)
135138

@@ -751,7 +754,7 @@ def recv(self, f, protocol_version, user_type_map, result_metadata):
751754
raise DriverException("Unknown RESULT kind: %d" % self.kind)
752755

753756
@classmethod
754-
def recv_body(cls, f, protocol_version, user_type_map, result_metadata):
757+
def recv_body(cls, f, protocol_version, protocol_features, user_type_map, result_metadata):
755758
kind = read_int(f)
756759
msg = cls(kind)
757760
msg.recv(f, protocol_version, user_type_map, result_metadata)
@@ -1160,7 +1163,7 @@ def _write_header(f, version, flags, stream_id, opcode, length):
11601163
write_int(f, length)
11611164

11621165
@classmethod
1163-
def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcode, body,
1166+
def decode_message(cls, protocol_version, protocol_features, user_type_map, stream_id, flags, opcode, body,
11641167
decompressor, result_metadata):
11651168
"""
11661169
Decodes a native protocol message body
@@ -1206,7 +1209,7 @@ def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcod
12061209
log.warning("Unknown protocol flags set: %02x. May cause problems.", flags)
12071210

12081211
msg_class = cls.message_types_by_opcode[opcode]
1209-
msg = msg_class.recv_body(body, protocol_version, user_type_map, result_metadata)
1212+
msg = msg_class.recv_body(body, protocol_version, protocol_features, user_type_map, result_metadata)
12101213
msg.stream_id = stream_id
12111214
msg.trace_id = trace_id
12121215
msg.custom_payload = custom_payload

0 commit comments

Comments
 (0)