Skip to content

Commit c711397

Browse files
committedApr 17, 2023
api: remove join and subscribe
This is a breaking change. Current join and subscribe implementations are rather useless. Connector does not provide any API to process incoming replication requests. The only supported scenario is to "connect as replica, skip everything that has been sent through replication, close on error". Current Tarantool team product strategy is to develop CDC features, including replication support in language connectors, as Enterprise edition products [1]. Since we don't plan to provide proper join and subscribe implementation in the open-source connector in the future, we decide to drop current half-baked implementation to not confuse new users. 1. tarantool/go-tarantool#203
1 parent 3f7a145 commit c711397

File tree

3 files changed

+1
-177
lines changed

3 files changed

+1
-177
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
exceptions. `datetime.datetime` exceptions will
2222
be thrown instead of them.
2323
- Drop the support of `__eq__` operator for `pandas.Timestamp`.
24+
- **Breaking**: Remove `join` and `subscribe` connection methods.
2425

2526
## 0.12.1 - 2023-02-28
2627

‎tarantool/connection.py

-115
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,17 @@
2525

2626
from tarantool.response import (
2727
unpacker_factory as default_unpacker_factory,
28-
Response,
2928
)
3029
from tarantool.request import (
3130
packer_factory as default_packer_factory,
3231
Request,
33-
# RequestOK,
3432
RequestCall,
3533
RequestDelete,
3634
RequestEval,
3735
RequestInsert,
38-
RequestJoin,
3936
RequestReplace,
4037
RequestPing,
4138
RequestSelect,
42-
RequestSubscribe,
4339
RequestUpdate,
4440
RequestUpsert,
4541
RequestAuthenticate,
@@ -60,8 +56,6 @@
6056
DEFAULT_SSL_CIPHERS,
6157
DEFAULT_SSL_PASSWORD,
6258
DEFAULT_SSL_PASSWORD_FILE,
63-
REQUEST_TYPE_OK,
64-
REQUEST_TYPE_ERROR,
6559
IPROTO_GREETING_SIZE,
6660
ITERATOR_EQ,
6761
ITERATOR_ALL,
@@ -1520,61 +1514,6 @@ def _get_auth_type(self):
15201514

15211515
return auth_type
15221516

1523-
def _join_v16(self, server_uuid):
1524-
"""
1525-
Execute a JOIN request for Tarantool 1.6 and older.
1526-
1527-
:param server_uuid: UUID of Tarantool server to join.
1528-
:type server_uuid: :obj:`str`
1529-
1530-
:raise: :exc:`~AssertionError`,
1531-
:exc:`~tarantool.error.DatabaseError`,
1532-
:exc:`~tarantool.error.SchemaError`,
1533-
:exc:`~tarantool.error.NetworkError`,
1534-
:exc:`~tarantool.error.SslError`
1535-
"""
1536-
1537-
request = RequestJoin(self, server_uuid)
1538-
self._socket.sendall(bytes(request))
1539-
1540-
while True:
1541-
resp = Response(self, self._read_response())
1542-
yield resp
1543-
if resp.code == REQUEST_TYPE_OK or resp.code >= REQUEST_TYPE_ERROR:
1544-
return
1545-
self.close() # close connection after JOIN
1546-
1547-
def _join_v17(self, server_uuid):
1548-
"""
1549-
Execute a JOIN request for Tarantool 1.7 and newer.
1550-
1551-
:param server_uuid: UUID of Tarantool server to join.
1552-
:type server_uuid: :obj:`str`
1553-
1554-
:raise: :exc:`~AssertionError`,
1555-
:exc:`~tarantool.error.DatabaseError`,
1556-
:exc:`~tarantool.error.SchemaError`,
1557-
:exc:`~tarantool.error.NetworkError`,
1558-
:exc:`~tarantool.error.SslError`
1559-
"""
1560-
1561-
request = RequestJoin(self, server_uuid)
1562-
self._socket.sendall(bytes(request))
1563-
state = JoinState.HANDSHAKE
1564-
while True:
1565-
resp = Response(self, self._read_response())
1566-
yield resp
1567-
if resp.code >= REQUEST_TYPE_ERROR:
1568-
return
1569-
if resp.code == REQUEST_TYPE_OK:
1570-
if state == JoinState.HANDSHAKE:
1571-
state = JoinState.INITIAL
1572-
elif state == JoinState.INITIAL:
1573-
state = JoinState.FINAL
1574-
elif state == JoinState.FINAL:
1575-
state = JoinState.DONE
1576-
return
1577-
15781517
def _ops_process(self, space, update_ops):
15791518
new_ops = []
15801519
for operation in update_ops:
@@ -1584,60 +1523,6 @@ def _ops_process(self, space, update_ops):
15841523
new_ops.append(operation)
15851524
return new_ops
15861525

1587-
def join(self, server_uuid):
1588-
"""
1589-
Execute a JOIN request: `join`_ a replicaset.
1590-
1591-
:param server_uuid: UUID of connector "server".
1592-
:type server_uuid: :obj:`str`
1593-
1594-
:raise: :exc:`~AssertionError`,
1595-
:exc:`~tarantool.error.DatabaseError`,
1596-
:exc:`~tarantool.error.SchemaError`,
1597-
:exc:`~tarantool.error.NetworkError`,
1598-
:exc:`~tarantool.error.SslError`
1599-
1600-
.. _join: https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#iproto-join-0x41
1601-
"""
1602-
1603-
self._opt_reconnect()
1604-
if self.version_id < version_id(1, 7, 0):
1605-
return self._join_v16(server_uuid)
1606-
return self._join_v17(server_uuid)
1607-
1608-
def subscribe(self, cluster_uuid, server_uuid, vclock=None):
1609-
"""
1610-
Execute a SUBSCRIBE request: `subscribe`_ to a replicaset
1611-
updates. Connection is closed after subscribing.
1612-
1613-
:param cluster_uuid: UUID of replicaset cluster.
1614-
:type cluster_uuid: :obj:`str`
1615-
1616-
:param server_uuid: UUID of connector "server".
1617-
:type server_uuid: :obj:`str`
1618-
1619-
:param vclock: Connector "server" vclock.
1620-
:type vclock: :obj:`dict` or :obj:`None`, optional
1621-
1622-
:raise: :exc:`~AssertionError`,
1623-
:exc:`~tarantool.error.DatabaseError`,
1624-
:exc:`~tarantool.error.SchemaError`,
1625-
:exc:`~tarantool.error.NetworkError`,
1626-
:exc:`~tarantool.error.SslError`
1627-
1628-
.. _subscribe: https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#iproto-subscribe-0x42
1629-
"""
1630-
1631-
vclock = vclock or {}
1632-
request = RequestSubscribe(self, cluster_uuid, server_uuid, vclock)
1633-
self._socket.sendall(bytes(request))
1634-
while True:
1635-
resp = Response(self, self._read_response())
1636-
yield resp
1637-
if resp.code >= REQUEST_TYPE_ERROR:
1638-
return
1639-
self.close() # close connection after SUBSCRIBE
1640-
16411526
def insert(self, space_name, values, *, on_push=None, on_push_ctx=None):
16421527
"""
16431528
Execute an INSERT request: `insert`_ a tuple to the space.

‎tarantool/request.py

-62
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@
2020
IPROTO_TUPLE,
2121
IPROTO_FUNCTION_NAME,
2222
IPROTO_ITERATOR,
23-
IPROTO_SERVER_UUID,
24-
IPROTO_CLUSTER_UUID,
25-
IPROTO_VCLOCK,
2623
IPROTO_EXPR,
2724
IPROTO_OPS,
28-
# IPROTO_INDEX_BASE,
2925
IPROTO_SCHEMA_ID,
3026
IPROTO_SQL_TEXT,
3127
IPROTO_SQL_BIND,
@@ -44,8 +40,6 @@
4440
REQUEST_TYPE_EXECUTE,
4541
REQUEST_TYPE_EVAL,
4642
REQUEST_TYPE_AUTHENTICATE,
47-
REQUEST_TYPE_JOIN,
48-
REQUEST_TYPE_SUBSCRIBE,
4943
REQUEST_TYPE_ID,
5044
AUTH_TYPE_CHAP_SHA1,
5145
AUTH_TYPE_PAP_SHA256,
@@ -587,62 +581,6 @@ def __init__(self, conn, space_no, index_no, tuple_value, op_list):
587581
self._body = request_body
588582

589583

590-
class RequestJoin(Request):
591-
"""
592-
Represents JOIN request.
593-
"""
594-
595-
request_type = REQUEST_TYPE_JOIN
596-
597-
def __init__(self, conn, server_uuid):
598-
"""
599-
:param conn: Request sender.
600-
:type conn: :class:`~tarantool.Connection`
601-
602-
:param server_uuid: UUID of connector "server".
603-
:type server_uuid: :obj:`str`
604-
"""
605-
606-
super().__init__(conn)
607-
request_body = self._dumps({IPROTO_SERVER_UUID: server_uuid})
608-
self._body = request_body
609-
610-
611-
class RequestSubscribe(Request):
612-
"""
613-
Represents SUBSCRIBE request.
614-
"""
615-
616-
request_type = REQUEST_TYPE_SUBSCRIBE
617-
618-
def __init__(self, conn, cluster_uuid, server_uuid, vclock):
619-
"""
620-
:param conn: Request sender.
621-
:type conn: :class:`~tarantool.Connection`
622-
623-
:param server_uuid: UUID of connector "server".
624-
:type server_uuid: :obj:`str`
625-
626-
:param server_uuid: UUID of connector "server".
627-
:type server_uuid: :obj:`str`
628-
629-
:param vclock: Connector "server" vclock.
630-
:type vclock: :obj:`dict`
631-
632-
:raise: :exc:`~AssertionError`
633-
"""
634-
635-
super().__init__(conn)
636-
assert isinstance(vclock, dict)
637-
638-
request_body = self._dumps({
639-
IPROTO_CLUSTER_UUID: cluster_uuid,
640-
IPROTO_SERVER_UUID: server_uuid,
641-
IPROTO_VCLOCK: vclock
642-
})
643-
self._body = request_body
644-
645-
646584
class RequestOK(Request):
647585
"""
648586
Represents OK acknowledgement.

0 commit comments

Comments
 (0)
Please sign in to comment.