diff --git a/CHANGELOG.md b/CHANGELOG.md index e02a335d..eb9a1ee9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -134,6 +134,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 datetime: Timestamp('2022-03-31 00:00:00'), tz: "" ``` +- Support iproto feature discovery (#206). + ### Changed - Bump msgpack requirement to 1.0.4 (PR #223). The only reason of this bump is various vulnerability fixes, diff --git a/tarantool/connection.py b/tarantool/connection.py index f971367e..c86a89db 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -37,7 +37,8 @@ RequestUpdate, RequestUpsert, RequestAuthenticate, - RequestExecute + RequestExecute, + RequestProtocolVersion, ) from tarantool.space import Space from tarantool.const import ( @@ -55,7 +56,14 @@ REQUEST_TYPE_ERROR, IPROTO_GREETING_SIZE, ITERATOR_EQ, - ITERATOR_ALL + ITERATOR_ALL, + CONNECTOR_IPROTO_VERSION, + CONNECTOR_FEATURES, + IPROTO_FEATURE_STREAMS, + IPROTO_FEATURE_TRANSACTIONS, + IPROTO_FEATURE_ERROR_EXTENSION, + IPROTO_FEATURE_WATCHERS, + IPROTO_FEATURE_GRACEFUL_SHUTDOWN, ) from tarantool.error import ( Error, @@ -498,6 +506,15 @@ def __init__(self, host, port, self.ssl_cert_file = ssl_cert_file self.ssl_ca_file = ssl_ca_file self.ssl_ciphers = ssl_ciphers + self._protocol_version = None + self._features = { + IPROTO_FEATURE_STREAMS: False, + IPROTO_FEATURE_TRANSACTIONS: False, + IPROTO_FEATURE_ERROR_EXTENSION: False, + IPROTO_FEATURE_WATCHERS: False, + IPROTO_FEATURE_GRACEFUL_SHUTDOWN: False, + } + if connect_now: self.connect() @@ -686,6 +703,7 @@ def connect(self): self.wrap_socket_ssl() self.handshake() self.load_schema() + self._check_features() except SslError as e: raise e except Exception as e: @@ -1602,3 +1620,44 @@ def execute(self, query, params=None): request = RequestExecute(self, query, params) response = self._send_request(request) return response + + def _check_features(self): + """ + Execute an ID request: inform the server about the protocol + version and features connector support and get server-side + information about it. + + After executing this request, the connector will choose a + protocol version and features supported both by connector and + server. + + :raise: :exc:`~AssertionError`, + :exc:`~tarantool.error.DatabaseError`, + :exc:`~tarantool.error.SchemaError`, + :exc:`~tarantool.error.NetworkError`, + :exc:`~tarantool.error.SslError` + """ + + try: + request = RequestProtocolVersion(self, + CONNECTOR_IPROTO_VERSION, + CONNECTOR_FEATURES) + response = self._send_request(request) + server_protocol_version = response.protocol_version + server_features = response.features + except DatabaseError as exc: + ER_UNKNOWN_REQUEST_TYPE = 48 + if exc.code == ER_UNKNOWN_REQUEST_TYPE: + server_protocol_version = None + server_features = [] + else: + raise exc + + if server_protocol_version is not None: + self._protocol_version = min(server_protocol_version, + CONNECTOR_IPROTO_VERSION) + + # Intercept lists of features + features_list = [val for val in CONNECTOR_FEATURES if val in server_features] + for val in features_list: + self._features[val] = True diff --git a/tarantool/const.py b/tarantool/const.py index 8035f479..52d5ea81 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -1,6 +1,6 @@ # pylint: disable=C0301,W0105,W0401,W0614 -IPROTO_CODE = 0x00 +IPROTO_REQUEST_TYPE = 0x00 IPROTO_SYNC = 0x01 # replication keys (header) IPROTO_SERVER_ID = 0x02 @@ -35,25 +35,29 @@ IPROTO_SQL_INFO = 0x42 IPROTO_SQL_INFO_ROW_COUNT = 0x00 IPROTO_SQL_INFO_AUTOINCREMENT_IDS = 0x01 +# +IPROTO_VERSION = 0x54 +IPROTO_FEATURES = 0x55 IPROTO_GREETING_SIZE = 128 IPROTO_BODY_MAX_LEN = 2147483648 -REQUEST_TYPE_OK = 0 -REQUEST_TYPE_SELECT = 1 -REQUEST_TYPE_INSERT = 2 -REQUEST_TYPE_REPLACE = 3 -REQUEST_TYPE_UPDATE = 4 -REQUEST_TYPE_DELETE = 5 -REQUEST_TYPE_CALL16 = 6 -REQUEST_TYPE_AUTHENTICATE = 7 -REQUEST_TYPE_EVAL = 8 -REQUEST_TYPE_UPSERT = 9 -REQUEST_TYPE_CALL = 10 -REQUEST_TYPE_EXECUTE = 11 -REQUEST_TYPE_PING = 64 -REQUEST_TYPE_JOIN = 65 -REQUEST_TYPE_SUBSCRIBE = 66 +REQUEST_TYPE_OK = 0x00 +REQUEST_TYPE_SELECT = 0x01 +REQUEST_TYPE_INSERT = 0x02 +REQUEST_TYPE_REPLACE = 0x03 +REQUEST_TYPE_UPDATE = 0x04 +REQUEST_TYPE_DELETE = 0x05 +REQUEST_TYPE_CALL16 = 0x06 +REQUEST_TYPE_AUTHENTICATE = 0x07 +REQUEST_TYPE_EVAL = 0x08 +REQUEST_TYPE_UPSERT = 0x09 +REQUEST_TYPE_CALL = 0x0a +REQUEST_TYPE_EXECUTE = 0x0b +REQUEST_TYPE_PING = 0x40 +REQUEST_TYPE_JOIN = 0x41 +REQUEST_TYPE_SUBSCRIBE = 0x42 +REQUEST_TYPE_ID = 0x49 REQUEST_TYPE_ERROR = 1 << 15 SPACE_SCHEMA = 272 @@ -85,6 +89,12 @@ ITERATOR_OVERLAPS = 10 ITERATOR_NEIGHBOR = 11 +IPROTO_FEATURE_STREAMS = 0 +IPROTO_FEATURE_TRANSACTIONS = 1 +IPROTO_FEATURE_ERROR_EXTENSION = 2 +IPROTO_FEATURE_WATCHERS = 3 +IPROTO_FEATURE_GRACEFUL_SHUTDOWN = 4 + # Default value for connection timeout (seconds) CONNECTION_TIMEOUT = None # Default value for socket timeout (seconds) @@ -113,3 +123,8 @@ POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0 # Default delay between attempts to reconnect (seconds) POOL_INSTANCE_RECONNECT_DELAY = 0 + +# Tarantool 2.10 protocol version is 3 +CONNECTOR_IPROTO_VERSION = 3 +# List of connector-supported features +CONNECTOR_FEATURES = [] diff --git a/tarantool/error.py b/tarantool/error.py index b2da32e7..c8690a0b 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -41,6 +41,24 @@ class DatabaseError(Error): Exception raised for errors that are related to the database. """ + def __init__(self, *args): + """ + :param args: ``(code, message)`` or ``(message,)``. + :type args: :obj:`tuple` + """ + + super().__init__(*args) + + if (len(args) == 2) and isinstance(args[0], int) and isinstance(args[1], (str, bytes)): + self.code = args[0] + self.message = args[1] + elif (len(args) == 1) and isinstance(args[0], (str, bytes)): + self.code = 0 + self.message = args[0] + else: + self.code = 0 + self.message = '' + class DataError(DatabaseError): """ @@ -206,8 +224,6 @@ def __init__(self, message, schema_version): """ super(SchemaReloadException, self).__init__(109, message) - self.code = 109 - self.message = message self.schema_version = schema_version def __str__(self): diff --git a/tarantool/request.py b/tarantool/request.py index 68d49714..164047cd 100644 --- a/tarantool/request.py +++ b/tarantool/request.py @@ -13,7 +13,7 @@ from tarantool.error import DatabaseError from tarantool.const import ( - IPROTO_CODE, + IPROTO_REQUEST_TYPE, IPROTO_SYNC, IPROTO_SPACE_ID, IPROTO_INDEX_ID, @@ -33,6 +33,8 @@ IPROTO_SCHEMA_ID, IPROTO_SQL_TEXT, IPROTO_SQL_BIND, + IPROTO_VERSION, + IPROTO_FEATURES, REQUEST_TYPE_OK, REQUEST_TYPE_PING, REQUEST_TYPE_SELECT, @@ -47,9 +49,14 @@ REQUEST_TYPE_EVAL, REQUEST_TYPE_AUTHENTICATE, REQUEST_TYPE_JOIN, - REQUEST_TYPE_SUBSCRIBE + REQUEST_TYPE_SUBSCRIBE, + REQUEST_TYPE_ID, +) +from tarantool.response import ( + Response, + ResponseExecute, + ResponseProtocolVersion, ) -from tarantool.response import Response, ResponseExecute from tarantool.utils import ( strxor, ) @@ -161,7 +168,7 @@ def header(self, length): """ self._sync = self.conn.generate_sync() - header = self._dumps({IPROTO_CODE: self.request_type, + header = self._dumps({IPROTO_REQUEST_TYPE: self.request_type, IPROTO_SYNC: self._sync, IPROTO_SCHEMA_ID: self.conn.schema_version}) @@ -259,7 +266,7 @@ def header(self, length): self._sync = self.conn.generate_sync() # Set IPROTO_SCHEMA_ID: 0 to avoid SchemaReloadException # It is ok to use 0 in auth every time. - header = self._dumps({IPROTO_CODE: self.request_type, + header = self._dumps({IPROTO_REQUEST_TYPE: self.request_type, IPROTO_SYNC: self._sync, IPROTO_SCHEMA_ID: 0}) @@ -618,7 +625,7 @@ def __init__(self, conn, sync): """ super(RequestOK, self).__init__(conn) - request_body = self._dumps({IPROTO_CODE: self.request_type, + request_body = self._dumps({IPROTO_REQUEST_TYPE: self.request_type, IPROTO_SYNC: sync}) self._body = request_body @@ -656,3 +663,31 @@ def __init__(self, conn, sql, args): self._body = request_body self.response_class = ResponseExecute + +class RequestProtocolVersion(Request): + """ + Represents ID request: inform the server about the protocol + version and features connector support. + """ + + request_type = REQUEST_TYPE_ID + + def __init__(self, conn, protocol_version, features): + """ + :param conn: Request sender. + :type conn: :class:`~tarantool.Connection` + + :param protocol_version: Connector protocol version. + :type protocol_version: :obj:`int` + + :param features: List of supported features. + :type features: :obj:`list` + """ + + super(RequestProtocolVersion, self).__init__(conn) + + request_body = self._dumps({IPROTO_VERSION: protocol_version, + IPROTO_FEATURES: features}) + + self._body = request_body + self.response_class = ResponseProtocolVersion diff --git a/tarantool/response.py b/tarantool/response.py index 2832fba9..ce7320f7 100644 --- a/tarantool/response.py +++ b/tarantool/response.py @@ -9,7 +9,7 @@ import msgpack from tarantool.const import ( - IPROTO_CODE, + IPROTO_REQUEST_TYPE, IPROTO_DATA, IPROTO_ERROR, IPROTO_SYNC, @@ -17,7 +17,9 @@ REQUEST_TYPE_ERROR, IPROTO_SQL_INFO, IPROTO_SQL_INFO_ROW_COUNT, - IPROTO_SQL_INFO_AUTOINCREMENT_IDS + IPROTO_SQL_INFO_AUTOINCREMENT_IDS, + IPROTO_VERSION, + IPROTO_FEATURES, ) from tarantool.error import ( DatabaseError, @@ -93,7 +95,7 @@ def __init__(self, conn, response): self.conn = conn self._sync = header.get(IPROTO_SYNC, 0) - self._code = header[IPROTO_CODE] + self._code = header[IPROTO_REQUEST_TYPE] self._body = {} self._schema_version = header.get(IPROTO_SCHEMA_ID, None) try: @@ -324,3 +326,35 @@ def affected_row_count(self): return None return info.get(IPROTO_SQL_INFO_ROW_COUNT) + + +class ResponseProtocolVersion(Response): + """ + Represents an ID request response: information about server protocol + version and features it supports. + """ + + @property + def protocol_version(self): + """ + Server protocol version. + + :rtype: :obj:`int` or :obj:`None` + """ + + if self._return_code != 0: + return None + return self._body.get(IPROTO_VERSION) + + @property + def features(self): + """ + Server supported features. + + :rtype: :obj:`list` + """ + + if self._return_code != 0: + return [] + return self._body.get(IPROTO_FEATURES) + diff --git a/test/suites/lib/skip.py b/test/suites/lib/skip.py index 71bfce13..b34a445b 100644 --- a/test/suites/lib/skip.py +++ b/test/suites/lib/skip.py @@ -28,14 +28,11 @@ def wrapper(self, *args, **kwargs): assert srv is not None - self.__class__.tnt_version = re.match( - r'[\d.]+', srv.admin('box.info.version')[0] - ).group() + self.__class__.tnt_version = srv.admin.tnt_version - tnt_version = pkg_resources.parse_version(self.tnt_version) support_version = pkg_resources.parse_version(REQUIRED_TNT_VERSION) - if tnt_version < support_version: + if self.tnt_version < support_version: self.skipTest('Tarantool %s %s' % (self.tnt_version, msg)) if func.__name__ != 'setUp': diff --git a/test/suites/lib/tarantool_admin.py b/test/suites/lib/tarantool_admin.py index 82a8aa47..33a6d61f 100644 --- a/test/suites/lib/tarantool_admin.py +++ b/test/suites/lib/tarantool_admin.py @@ -1,5 +1,7 @@ import socket import yaml +import re +import pkg_resources class TarantoolAdmin(object): @@ -8,6 +10,7 @@ def __init__(self, host, port): self.port = port self.is_connected = False self.socket = None + self._tnt_version = None def connect(self): self.socket = socket.create_connection((self.host, self.port)) @@ -62,3 +65,16 @@ def execute(self, command): break return yaml.safe_load(res) + + @property + def tnt_version(self): + if self._tnt_version is not None: + return self._tnt_version + + raw_version = re.match( + r'[\d.]+', self.execute('box.info.version')[0] + ).group() + + self._tnt_version = pkg_resources.parse_version(raw_version) + + return self._tnt_version diff --git a/test/suites/test_protocol.py b/test/suites/test_protocol.py index 9442b0b0..61ac3cd8 100644 --- a/test/suites/test_protocol.py +++ b/test/suites/test_protocol.py @@ -1,13 +1,36 @@ import sys +import pkg_resources import unittest -from tarantool.utils import greeting_decode, version_id import uuid +import tarantool +from tarantool.utils import greeting_decode, version_id + +from .lib.tarantool_server import TarantoolServer + +from tarantool.const import ( + IPROTO_FEATURE_STREAMS, + IPROTO_FEATURE_TRANSACTIONS, + IPROTO_FEATURE_ERROR_EXTENSION, + IPROTO_FEATURE_WATCHERS, + IPROTO_FEATURE_GRACEFUL_SHUTDOWN, +) + class TestSuite_Protocol(unittest.TestCase): @classmethod def setUpClass(self): print(' PROTOCOL '.center(70, '='), file=sys.stderr) print('-' * 70, file=sys.stderr) + self.srv = TarantoolServer() + self.srv.script = 'test/suites/box.lua' + self.srv.start() + self.con = tarantool.Connection(self.srv.host, self.srv.args['primary']) + self.adm = self.srv.admin + + def setUp(self): + # prevent a remote tarantool from clean our session + if self.srv.is_started(): + self.srv.touch_lock() def test_00_greeting_1_6(self): buf = "Tarantool 1.6.6 \n" + \ @@ -45,3 +68,28 @@ def test_03_greeting_1_6_7(self): self.assertEqual(greeting.uuid, uuid.UUID('52dc2837-8001-48fe-bdce-c493c04599ce')) self.assertIsNotNone(greeting.salt) + + def test_04_protocol(self): + # First Tarantool protocol version (1) was introduced between + # 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also + # introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4 + # was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted + # back to version 3 in the same version interval. + # Tarantool 2.10.3 still has version 3. + if self.adm.tnt_version >= pkg_resources.parse_version('2.10.0'): + self.assertTrue(self.con._protocol_version >= 3) + else: + self.assertIsNone(self.con._protocol_version) + + self.assertEqual(self.con._features[IPROTO_FEATURE_STREAMS], False) + self.assertEqual(self.con._features[IPROTO_FEATURE_TRANSACTIONS], False) + self.assertEqual(self.con._features[IPROTO_FEATURE_ERROR_EXTENSION], False) + self.assertEqual(self.con._features[IPROTO_FEATURE_WATCHERS], False) + self.assertEqual(self.con._features[IPROTO_FEATURE_GRACEFUL_SHUTDOWN], False) + + @classmethod + def tearDownClass(self): + self.con.close() + self.srv.stop() + self.srv.clean() +