Skip to content

Commit 8357c53

Browse files
Merge pull request #16 from cucuridas/feature/mariadb-gtid-list-event
Feature/mariadb gtid list event
2 parents 92d3e66 + 09a7caa commit 8357c53

File tree

7 files changed

+106
-7
lines changed

7 files changed

+106
-7
lines changed

docker-compose.yml

+13
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,16 @@ services:
1515
ports:
1616
- 3307:3307
1717
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
18+
19+
mariadb-10.6:
20+
image: mariadb:10.6
21+
environment:
22+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
23+
ports:
24+
- "3308:3306"
25+
command: |
26+
--server-id=1
27+
--default-authentication-plugin=mysql_native_password
28+
--log-bin=master-bin
29+
--binlog-format=row
30+
--log-slave-updates=on

examples/mariadb_gtid/read_event.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pymysql
22

33
from pymysqlreplication import BinLogStreamReader, gtid
4-
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
4+
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent, MariadbGtidListEvent
55
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
66

77
MARIADB_SETTINGS = {
@@ -65,7 +65,8 @@ def query_server_id(self):
6565
RotateEvent,
6666
WriteRowsEvent,
6767
UpdateRowsEvent,
68-
DeleteRowsEvent
68+
DeleteRowsEvent,
69+
MariadbGtidListEvent
6970
],
7071
auto_position=gtid,
7172
is_mariadb=True

pymysqlreplication/binlogstream.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
1515
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17-
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
17+
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
18+
MariadbGtidListEvent)
1819
from .exceptions import BinLogNotEnabled
1920
from .row_event import (
2021
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
@@ -600,7 +601,8 @@ def _allowed_event_list(self, only_events, ignored_events,
600601
TableMapEvent,
601602
HeartbeatLogEvent,
602603
NotImplementedEvent,
603-
MariadbGtidEvent
604+
MariadbGtidEvent,
605+
MariadbGtidListEvent
604606
))
605607
if ignored_events is not None:
606608
for e in ignored_events:

pymysqlreplication/event.py

+39
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,45 @@ def _dump(self):
8989
def __repr__(self):
9090
return '<GtidEvent "%s">' % self.gtid
9191

92+
class MariadbGtidListEvent(BinLogEvent):
93+
"""
94+
GTID List event
95+
https://mariadb.com/kb/en/gtid_list_event/
96+
97+
Attributes:
98+
gtid_length: Number of GTIDs
99+
gtid_list: list of 'MariadbGtidObejct'
100+
101+
'MariadbGtidObejct' Attributes:
102+
domain_id: Replication Domain ID
103+
server_id: Server_ID
104+
gtid_seq_no: GTID sequence
105+
gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no'
106+
"""
107+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
108+
109+
super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
110+
111+
class MariadbGtidObejct(BinLogEvent):
112+
"""
113+
Information class of elements in GTID list
114+
"""
115+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
116+
super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
117+
self.domain_id = self.packet.read_uint32()
118+
self.server_id = self.packet.server_id
119+
self.gtid_seq_no = self.packet.read_uint64()
120+
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)
121+
122+
123+
self.gtid_length = self.packet.read_uint32()
124+
self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)]
125+
126+
127+
def _dump(self):
128+
super(MariadbGtidListEvent, self)._dump()
129+
print("GTID length:",self.gtid_length)
130+
print("GTID list: " + ",".join(list(map(lambda x: x.gtid,self.gtid_list))))
92131

93132
class MariadbGtidEvent(BinLogEvent):
94133
"""

pymysqlreplication/packet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class BinLogPacketWrapper(object):
8686
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent,
8787
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
8888
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
89-
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
89+
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
9090
constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent
9191
}
9292

pymysqlreplication/tests/base.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def isMySQL80AndMore(self):
6767

6868
def isMariaDB(self):
6969
if self.__is_mariaDB is None:
70-
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()
70+
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0]
7171
return self.__is_mariaDB
7272

7373
@property
@@ -121,3 +121,32 @@ def bin_log_basename(self):
121121
bin_log_basename = cursor.fetchone()[0]
122122
bin_log_basename = bin_log_basename.split("/")[-1]
123123
return bin_log_basename
124+
125+
126+
class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase):
127+
def setUp(self):
128+
# default
129+
self.database = {
130+
"host": "localhost",
131+
"user": "root",
132+
"passwd": "",
133+
"port": 3308,
134+
"use_unicode": True,
135+
"charset": "utf8",
136+
"db": "pymysqlreplication_test"
137+
}
138+
139+
self.conn_control = None
140+
db = copy.copy(self.database)
141+
db["db"] = None
142+
self.connect_conn_control(db)
143+
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
144+
self.execute("CREATE DATABASE pymysqlreplication_test")
145+
db = copy.copy(self.database)
146+
self.connect_conn_control(db)
147+
self.stream = None
148+
self.resetBinLog()
149+
150+
151+
152+

pymysqlreplication/tests/test_basic.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from pymysqlreplication.constants.BINLOG import *
1818
from pymysqlreplication.row_event import *
1919

20-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]
20+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreaReader"]
2121

2222

2323
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
@@ -1002,6 +1002,21 @@ def test_parsing(self):
10021002
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
10031003
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")
10041004

1005+
class TestMariadbBinlogStreaReader(base.PyMySQLReplicationMariaDbTestCase):
1006+
1007+
def test_gtid_list_event(self):
1008+
event = self.stream.fetchone()
1009+
self.assertEqual(event.position, 4)
1010+
1011+
#FormatDescriptionEvent
1012+
event = self.stream.fetchone()
1013+
self.assertEqual(event.event_type,15)
1014+
self.assertIsInstance(event,FormatDescriptionEvent)
1015+
1016+
#MariadbAnnotateRowsEvent
1017+
event = self.stream.fetchone()
1018+
self.assertEqual(event.event_type,163)
1019+
self.assertIsInstance(event,MariadbGtidListEvent)
10051020

10061021
if __name__ == "__main__":
10071022
import unittest

0 commit comments

Comments
 (0)