Skip to content

Feature/mariadb gtid list event #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,16 @@ services:
ports:
- 3307:3307
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307

mariadb-10.6:
image: mariadb:10.6
environment:
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
ports:
- "3308:3306"
command: |
--server-id=1
--default-authentication-plugin=mysql_native_password
--log-bin=master-bin
--binlog-format=row
--log-slave-updates=on

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사소한 것이지만 깃으로 형상 관리중인 파일은 맨 마지막에 빈칸 하나 넣어주세요~

5 changes: 3 additions & 2 deletions examples/mariadb_gtid/read_event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pymysql

from pymysqlreplication import BinLogStreamReader, gtid
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent, MariadbGtidListEvent
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent

MARIADB_SETTINGS = {
Expand Down Expand Up @@ -65,7 +65,8 @@ def query_server_id(self):
RotateEvent,
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent
DeleteRowsEvent,
MariadbGtidListEvent
],
auto_position=gtid,
is_mariadb=True
Expand Down
6 changes: 4 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
MariadbGtidListEvent)
from .exceptions import BinLogNotEnabled
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
Expand Down Expand Up @@ -600,7 +601,8 @@ def _allowed_event_list(self, only_events, ignored_events,
TableMapEvent,
HeartbeatLogEvent,
NotImplementedEvent,
MariadbGtidEvent
MariadbGtidEvent,
MariadbGtidListEvent
))
if ignored_events is not None:
for e in ignored_events:
Expand Down
39 changes: 39 additions & 0 deletions pymysqlreplication/event.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

크... Pythonic하게 잘 짜셨네요 👍🏻

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,45 @@ def _dump(self):
def __repr__(self):
return '<GtidEvent "%s">' % self.gtid

class MariadbGtidListEvent(BinLogEvent):
"""
GTID List event
https://mariadb.com/kb/en/gtid_list_event/

Attributes:
gtid_length: Number of GTIDs
gtid_list: list of 'MariadbGtidObejct'

'MariadbGtidObejct' Attributes:
domain_id: Replication Domain ID
server_id: Server_ID
gtid_seq_no: GTID sequence
gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no'
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):

super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)

class MariadbGtidObejct(BinLogEvent):
"""
Information class of elements in GTID list
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.domain_id = self.packet.read_uint32()
self.server_id = self.packet.server_id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

여기에서도 unsined int 4 bytes 읽어주셔야(read_uint32()) 합니다 ㅜㅜ

self.gtid_seq_no = self.packet.read_uint64()
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)


self.gtid_length = self.packet.read_uint32()
self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)]


def _dump(self):
super(MariadbGtidListEvent, self)._dump()
print("GTID length:",self.gtid_length)
print("GTID list: " + ",".join(list(map(lambda x: x.gtid,self.gtid_list))))

class MariadbGtidEvent(BinLogEvent):
"""
Expand Down
2 changes: 1 addition & 1 deletion pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class BinLogPacketWrapper(object):
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent,
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent
}

Expand Down
31 changes: 30 additions & 1 deletion pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def isMySQL80AndMore(self):

def isMariaDB(self):
if self.__is_mariaDB is None:
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻👍🏼👍🏽👍🏾👍🏿

return self.__is_mariaDB

@property
Expand Down Expand Up @@ -121,3 +121,32 @@ def bin_log_basename(self):
bin_log_basename = cursor.fetchone()[0]
bin_log_basename = bin_log_basename.split("/")[-1]
return bin_log_basename


class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase):
def setUp(self):
# default
self.database = {
"host": "localhost",
"user": "root",
"passwd": "",
"port": 3308,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}

self.conn_control = None
db = copy.copy(self.database)
db["db"] = None
self.connect_conn_control(db)
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.execute("CREATE DATABASE pymysqlreplication_test")
db = copy.copy(self.database)
self.connect_conn_control(db)
self.stream = None
self.resetBinLog()




Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사소한거긴 하지만 파일 마지막에는 빈 라인 하나만 넣어주세요~
Too many blank lines (3) (E303)

17 changes: 16 additions & 1 deletion pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreaReader"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
Expand Down Expand Up @@ -1002,6 +1002,21 @@ def test_parsing(self):
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")

class TestMariadbBinlogStreaReader(base.PyMySQLReplicationMariaDbTestCase):

def test_gtid_list_event(self):
event = self.stream.fetchone()
self.assertEqual(event.position, 4)

#FormatDescriptionEvent
event = self.stream.fetchone()
self.assertEqual(event.event_type,15)
self.assertIsInstance(event,FormatDescriptionEvent)

#MariadbAnnotateRowsEvent

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

주석의 이벤트명 틀린 것 같습니다!

event = self.stream.fetchone()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

최소한 1개의 필드값에 대해서라도 assert 해줘야할 것 같아요
이벤트 발생시켜서 트랜잭션 번호 올린다음에 구현해주신 MariadbGtidObejct가 잘 들어왔는지
assert하면 될 것 같습니다.

self.assertEqual(event.event_type,163)
self.assertIsInstance(event,MariadbGtidListEvent)

if __name__ == "__main__":
import unittest
Expand Down