Skip to content

Support iproto feature discovery #243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
datetime: Timestamp('2022-03-31 00:00:00'), tz: ""
```

- Support iproto feature discovery (#206).

### Changed
- Bump msgpack requirement to 1.0.4 (PR #223).
The only reason of this bump is various vulnerability fixes,
Expand Down
63 changes: 61 additions & 2 deletions tarantool/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
RequestUpdate,
RequestUpsert,
RequestAuthenticate,
RequestExecute
RequestExecute,
RequestProtocolVersion,
)
from tarantool.space import Space
from tarantool.const import (
Expand All @@ -55,7 +56,14 @@
REQUEST_TYPE_ERROR,
IPROTO_GREETING_SIZE,
ITERATOR_EQ,
ITERATOR_ALL
ITERATOR_ALL,
CONNECTOR_IPROTO_VERSION,
CONNECTOR_FEATURES,
IPROTO_FEATURE_STREAMS,
IPROTO_FEATURE_TRANSACTIONS,
IPROTO_FEATURE_ERROR_EXTENSION,
IPROTO_FEATURE_WATCHERS,
IPROTO_FEATURE_GRACEFUL_SHUTDOWN,
)
from tarantool.error import (
Error,
Expand Down Expand Up @@ -498,6 +506,15 @@ def __init__(self, host, port,
self.ssl_cert_file = ssl_cert_file
self.ssl_ca_file = ssl_ca_file
self.ssl_ciphers = ssl_ciphers
self._protocol_version = None
self._features = {
IPROTO_FEATURE_STREAMS: False,
IPROTO_FEATURE_TRANSACTIONS: False,
IPROTO_FEATURE_ERROR_EXTENSION: False,
IPROTO_FEATURE_WATCHERS: False,
IPROTO_FEATURE_GRACEFUL_SHUTDOWN: False,
}

if connect_now:
self.connect()

Expand Down Expand Up @@ -686,6 +703,7 @@ def connect(self):
self.wrap_socket_ssl()
self.handshake()
self.load_schema()
self._check_features()
except SslError as e:
raise e
except Exception as e:
Expand Down Expand Up @@ -1602,3 +1620,44 @@ def execute(self, query, params=None):
request = RequestExecute(self, query, params)
response = self._send_request(request)
return response

def _check_features(self):
"""
Execute an ID request: inform the server about the protocol
version and features connector support and get server-side
information about it.

After executing this request, the connector will choose a
protocol version and features supported both by connector and
server.

:raise: :exc:`~AssertionError`,
:exc:`~tarantool.error.DatabaseError`,
:exc:`~tarantool.error.SchemaError`,
:exc:`~tarantool.error.NetworkError`,
:exc:`~tarantool.error.SslError`
"""

try:
request = RequestProtocolVersion(self,
CONNECTOR_IPROTO_VERSION,
CONNECTOR_FEATURES)
response = self._send_request(request)
server_protocol_version = response.protocol_version
server_features = response.features
except DatabaseError as exc:
ER_UNKNOWN_REQUEST_TYPE = 48
if exc.code == ER_UNKNOWN_REQUEST_TYPE:
server_protocol_version = None
server_features = []
else:
raise exc

if server_protocol_version is not None:
self._protocol_version = min(server_protocol_version,
CONNECTOR_IPROTO_VERSION)

# Intercept lists of features
features_list = [val for val in CONNECTOR_FEATURES if val in server_features]
for val in features_list:
self._features[val] = True
47 changes: 31 additions & 16 deletions tarantool/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pylint: disable=C0301,W0105,W0401,W0614

IPROTO_CODE = 0x00
IPROTO_REQUEST_TYPE = 0x00
IPROTO_SYNC = 0x01
# replication keys (header)
IPROTO_SERVER_ID = 0x02
Expand Down Expand Up @@ -35,25 +35,29 @@
IPROTO_SQL_INFO = 0x42
IPROTO_SQL_INFO_ROW_COUNT = 0x00
IPROTO_SQL_INFO_AUTOINCREMENT_IDS = 0x01
#
IPROTO_VERSION = 0x54
IPROTO_FEATURES = 0x55

IPROTO_GREETING_SIZE = 128
IPROTO_BODY_MAX_LEN = 2147483648

REQUEST_TYPE_OK = 0
REQUEST_TYPE_SELECT = 1
REQUEST_TYPE_INSERT = 2
REQUEST_TYPE_REPLACE = 3
REQUEST_TYPE_UPDATE = 4
REQUEST_TYPE_DELETE = 5
REQUEST_TYPE_CALL16 = 6
REQUEST_TYPE_AUTHENTICATE = 7
REQUEST_TYPE_EVAL = 8
REQUEST_TYPE_UPSERT = 9
REQUEST_TYPE_CALL = 10
REQUEST_TYPE_EXECUTE = 11
REQUEST_TYPE_PING = 64
REQUEST_TYPE_JOIN = 65
REQUEST_TYPE_SUBSCRIBE = 66
REQUEST_TYPE_OK = 0x00
REQUEST_TYPE_SELECT = 0x01
REQUEST_TYPE_INSERT = 0x02
REQUEST_TYPE_REPLACE = 0x03
REQUEST_TYPE_UPDATE = 0x04
REQUEST_TYPE_DELETE = 0x05
REQUEST_TYPE_CALL16 = 0x06
REQUEST_TYPE_AUTHENTICATE = 0x07
REQUEST_TYPE_EVAL = 0x08
REQUEST_TYPE_UPSERT = 0x09
REQUEST_TYPE_CALL = 0x0a
REQUEST_TYPE_EXECUTE = 0x0b
REQUEST_TYPE_PING = 0x40
REQUEST_TYPE_JOIN = 0x41
REQUEST_TYPE_SUBSCRIBE = 0x42
REQUEST_TYPE_ID = 0x49
REQUEST_TYPE_ERROR = 1 << 15

SPACE_SCHEMA = 272
Expand Down Expand Up @@ -85,6 +89,12 @@
ITERATOR_OVERLAPS = 10
ITERATOR_NEIGHBOR = 11

IPROTO_FEATURE_STREAMS = 0
IPROTO_FEATURE_TRANSACTIONS = 1
IPROTO_FEATURE_ERROR_EXTENSION = 2
IPROTO_FEATURE_WATCHERS = 3
IPROTO_FEATURE_GRACEFUL_SHUTDOWN = 4

# Default value for connection timeout (seconds)
CONNECTION_TIMEOUT = None
# Default value for socket timeout (seconds)
Expand Down Expand Up @@ -113,3 +123,8 @@
POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0
# Default delay between attempts to reconnect (seconds)
POOL_INSTANCE_RECONNECT_DELAY = 0

# Tarantool 2.10 protocol version is 3
CONNECTOR_IPROTO_VERSION = 3
# List of connector-supported features
CONNECTOR_FEATURES = []
20 changes: 18 additions & 2 deletions tarantool/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ class DatabaseError(Error):
Exception raised for errors that are related to the database.
"""

def __init__(self, *args):
"""
:param args: ``(code, message)`` or ``(message,)``.
:type args: :obj:`tuple`
"""

super().__init__(*args)

if (len(args) == 2) and isinstance(args[0], int) and isinstance(args[1], (str, bytes)):
self.code = args[0]
self.message = args[1]
elif (len(args) == 1) and isinstance(args[0], (str, bytes)):
self.code = 0
self.message = args[0]
else:
self.code = 0
self.message = ''


class DataError(DatabaseError):
"""
Expand Down Expand Up @@ -206,8 +224,6 @@ def __init__(self, message, schema_version):
"""

super(SchemaReloadException, self).__init__(109, message)
self.code = 109
self.message = message
self.schema_version = schema_version

def __str__(self):
Expand Down
47 changes: 41 additions & 6 deletions tarantool/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from tarantool.error import DatabaseError
from tarantool.const import (
IPROTO_CODE,
IPROTO_REQUEST_TYPE,
IPROTO_SYNC,
IPROTO_SPACE_ID,
IPROTO_INDEX_ID,
Expand All @@ -33,6 +33,8 @@
IPROTO_SCHEMA_ID,
IPROTO_SQL_TEXT,
IPROTO_SQL_BIND,
IPROTO_VERSION,
IPROTO_FEATURES,
REQUEST_TYPE_OK,
REQUEST_TYPE_PING,
REQUEST_TYPE_SELECT,
Expand All @@ -47,9 +49,14 @@
REQUEST_TYPE_EVAL,
REQUEST_TYPE_AUTHENTICATE,
REQUEST_TYPE_JOIN,
REQUEST_TYPE_SUBSCRIBE
REQUEST_TYPE_SUBSCRIBE,
REQUEST_TYPE_ID,
)
from tarantool.response import (
Response,
ResponseExecute,
ResponseProtocolVersion,
)
from tarantool.response import Response, ResponseExecute
from tarantool.utils import (
strxor,
)
Expand Down Expand Up @@ -161,7 +168,7 @@ def header(self, length):
"""

self._sync = self.conn.generate_sync()
header = self._dumps({IPROTO_CODE: self.request_type,
header = self._dumps({IPROTO_REQUEST_TYPE: self.request_type,
IPROTO_SYNC: self._sync,
IPROTO_SCHEMA_ID: self.conn.schema_version})

Expand Down Expand Up @@ -259,7 +266,7 @@ def header(self, length):
self._sync = self.conn.generate_sync()
# Set IPROTO_SCHEMA_ID: 0 to avoid SchemaReloadException
# It is ok to use 0 in auth every time.
header = self._dumps({IPROTO_CODE: self.request_type,
header = self._dumps({IPROTO_REQUEST_TYPE: self.request_type,
IPROTO_SYNC: self._sync,
IPROTO_SCHEMA_ID: 0})

Expand Down Expand Up @@ -618,7 +625,7 @@ def __init__(self, conn, sync):
"""

super(RequestOK, self).__init__(conn)
request_body = self._dumps({IPROTO_CODE: self.request_type,
request_body = self._dumps({IPROTO_REQUEST_TYPE: self.request_type,
IPROTO_SYNC: sync})
self._body = request_body

Expand Down Expand Up @@ -656,3 +663,31 @@ def __init__(self, conn, sql, args):

self._body = request_body
self.response_class = ResponseExecute

class RequestProtocolVersion(Request):
"""
Represents ID request: inform the server about the protocol
version and features connector support.
"""

request_type = REQUEST_TYPE_ID

def __init__(self, conn, protocol_version, features):
"""
:param conn: Request sender.
:type conn: :class:`~tarantool.Connection`

:param protocol_version: Connector protocol version.
:type protocol_version: :obj:`int`

:param features: List of supported features.
:type features: :obj:`list`
"""

super(RequestProtocolVersion, self).__init__(conn)

request_body = self._dumps({IPROTO_VERSION: protocol_version,
IPROTO_FEATURES: features})

self._body = request_body
self.response_class = ResponseProtocolVersion
40 changes: 37 additions & 3 deletions tarantool/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
import msgpack

from tarantool.const import (
IPROTO_CODE,
IPROTO_REQUEST_TYPE,
IPROTO_DATA,
IPROTO_ERROR,
IPROTO_SYNC,
IPROTO_SCHEMA_ID,
REQUEST_TYPE_ERROR,
IPROTO_SQL_INFO,
IPROTO_SQL_INFO_ROW_COUNT,
IPROTO_SQL_INFO_AUTOINCREMENT_IDS
IPROTO_SQL_INFO_AUTOINCREMENT_IDS,
IPROTO_VERSION,
IPROTO_FEATURES,
)
from tarantool.error import (
DatabaseError,
Expand Down Expand Up @@ -93,7 +95,7 @@ def __init__(self, conn, response):

self.conn = conn
self._sync = header.get(IPROTO_SYNC, 0)
self._code = header[IPROTO_CODE]
self._code = header[IPROTO_REQUEST_TYPE]
self._body = {}
self._schema_version = header.get(IPROTO_SCHEMA_ID, None)
try:
Expand Down Expand Up @@ -324,3 +326,35 @@ def affected_row_count(self):
return None

return info.get(IPROTO_SQL_INFO_ROW_COUNT)


class ResponseProtocolVersion(Response):
"""
Represents an ID request response: information about server protocol
version and features it supports.
"""

@property
def protocol_version(self):
"""
Server protocol version.

:rtype: :obj:`int` or :obj:`None`
"""

if self._return_code != 0:
return None
return self._body.get(IPROTO_VERSION)

@property
def features(self):
"""
Server supported features.

:rtype: :obj:`list`
"""

if self._return_code != 0:
return []
return self._body.get(IPROTO_FEATURES)

7 changes: 2 additions & 5 deletions test/suites/lib/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ def wrapper(self, *args, **kwargs):

assert srv is not None

self.__class__.tnt_version = re.match(
r'[\d.]+', srv.admin('box.info.version')[0]
).group()
self.__class__.tnt_version = srv.admin.tnt_version

tnt_version = pkg_resources.parse_version(self.tnt_version)
support_version = pkg_resources.parse_version(REQUIRED_TNT_VERSION)

if tnt_version < support_version:
if self.tnt_version < support_version:
self.skipTest('Tarantool %s %s' % (self.tnt_version, msg))

if func.__name__ != 'setUp':
Expand Down
Loading