Skip to content

Commit 296472b

Browse files
Merge pull request julien-duponchelle#257 from etern/master
Add XAPrepareEvent, parse last_committed & sequence_number of GtidEvent
2 parents a1c1295 + 98d1b70 commit 296472b

File tree

5 files changed

+70
-7
lines changed

5 files changed

+70
-7
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from pymysql.util import int2byte
1010

1111
from .packet import BinLogPacketWrapper
12-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
12+
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
1313
from .gtid import GtidSet
1414
from .event import (
1515
QueryEvent, RotateEvent, FormatDescriptionEvent,
16-
XidEvent, GtidEvent, StopEvent,
16+
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1717
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1818
HeartbeatLogEvent, NotImplementedEvent)
1919
from .exceptions import BinLogNotEnabled
@@ -219,6 +219,7 @@ def __init__(self, connection_settings, server_id,
219219
self.pymysql_wrapper = pymysql_wrapper
220220
else:
221221
self.pymysql_wrapper = pymysql.connect
222+
self.mysql_version = (0, 0, 0)
222223

223224
def close(self):
224225
if self.__connected_stream:
@@ -446,6 +447,7 @@ def fetchone(self):
446447

447448
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
448449
self._ctl_connection,
450+
self.mysql_version,
449451
self.__use_checksum,
450452
self.__allowed_events_in_packet,
451453
self.__only_tables,
@@ -508,6 +510,9 @@ def fetchone(self):
508510
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
509511
continue
510512

513+
if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
514+
self.mysql_version = binlog_event.event.mysql_version
515+
511516
return binlog_event.event
512517

513518
def _allowed_event_list(self, only_events, ignored_events,
@@ -520,6 +525,7 @@ def _allowed_event_list(self, only_events, ignored_events,
520525
RotateEvent,
521526
StopEvent,
522527
FormatDescriptionEvent,
528+
XAPrepareEvent,
523529
XidEvent,
524530
GtidEvent,
525531
BeginLoadQueryEvent,

pymysqlreplication/constants/BINLOG.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
GTID_LOG_EVENT = 0x21
3737
ANONYMOUS_GTID_LOG_EVENT = 0x22
3838
PREVIOUS_GTIDS_LOG_EVENT = 0x23
39+
XA_PREPARE_EVENT = 0x26
3940

4041
# INTVAR types
4142
INTVAR_INVALID_INT_EVENT = 0x00

pymysqlreplication/event.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
class BinLogEvent(object):
1111
def __init__(self, from_packet, event_size, table_map, ctl_connection,
12+
mysql_version=(0,0,0),
1213
only_tables=None,
1314
ignored_tables=None,
1415
only_schemas=None,
@@ -21,6 +22,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2122
self.timestamp = self.packet.timestamp
2223
self.event_size = event_size
2324
self._ctl_connection = ctl_connection
25+
self.mysql_version = mysql_version
2426
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2527
# The event have been fully processed, if processed is false
2628
# the event will be skipped
@@ -58,6 +60,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5860
self.commit_flag = byte2int(self.packet.read(1)) == 1
5961
self.sid = self.packet.read(16)
6062
self.gno = struct.unpack('<Q', self.packet.read(8))[0]
63+
self.lt_type = byte2int(self.packet.read(1))
64+
65+
if self.mysql_version >= (5, 7):
66+
self.last_committed = struct.unpack('<Q', self.packet.read(8))[0]
67+
self.sequence_number = struct.unpack('<Q', self.packet.read(8))[0]
6168

6269
@property
6370
def gtid(self):
@@ -73,6 +80,9 @@ def gtid(self):
7380
def _dump(self):
7481
print("Commit: %s" % self.commit_flag)
7582
print("GTID_NEXT: %s" % self.gtid)
83+
if hasattr(self, "last_committed"):
84+
print("last_committed: %d" % self.last_committed)
85+
print("sequence_number: %d" % self.sequence_number)
7686

7787
def __repr__(self):
7888
return '<GtidEvent "%s">' % self.gtid
@@ -98,8 +108,49 @@ def dump(self):
98108
print()
99109

100110

111+
class XAPrepareEvent(BinLogEvent):
112+
"""An XA prepare event is generated for a XA prepared transaction.
113+
Like Xid_event it contans XID of the *prepared* transaction
114+
115+
Attributes:
116+
one_phase: current XA transaction commit method
117+
xid: serialized XID representation of XA transaction
118+
"""
119+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
120+
super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map,
121+
ctl_connection, **kwargs)
122+
123+
# one_phase is True: XA COMMIT ... ONE PHASE
124+
# one_phase is False: XA PREPARE
125+
self.one_phase = (self.packet.read(1) != b'\x00')
126+
self.xid_format_id = struct.unpack('<I', self.packet.read(4))[0]
127+
gtrid_length = struct.unpack('<I', self.packet.read(4))[0]
128+
bqual_length = struct.unpack('<I', self.packet.read(4))[0]
129+
self.xid_gtrid = self.packet.read(gtrid_length)
130+
self.xid_bqual = self.packet.read(bqual_length)
131+
132+
@property
133+
def xid(self):
134+
return self.xid_gtrid.decode() + self.xid_bqual.decode()
135+
136+
def _dump(self):
137+
print("One phase: %s" % self.one_phase)
138+
print("XID formatID: %d" % self.xid_format_id)
139+
print("XID: %s" % self.xid)
140+
141+
101142
class FormatDescriptionEvent(BinLogEvent):
102-
pass
143+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
144+
super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
145+
ctl_connection, **kwargs)
146+
self.binlog_version = struct.unpack('<H', self.packet.read(2))
147+
self.mysql_version_str = self.packet.read(50).rstrip(b'\0').decode()
148+
numbers = self.mysql_version_str.split('-')[0]
149+
self.mysql_version = tuple(map(int, numbers.split('.')))
150+
151+
def _dump(self):
152+
print("Binlog version: %s" % self.binlog_version)
153+
print("MySQL version: %s" % self.mysql_version_str)
103154

104155

105156
class StopEvent(BinLogEvent):

pymysqlreplication/packet.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class BinLogPacketWrapper(object):
7171
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7272
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7373
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
74+
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
7475
# row_event
7576
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7677
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
@@ -85,7 +86,10 @@ class BinLogPacketWrapper(object):
8586

8687
}
8788

88-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
89+
def __init__(self, from_packet, table_map,
90+
ctl_connection,
91+
mysql_version,
92+
use_checksum,
8993
allowed_events,
9094
only_tables,
9195
ignored_tables,
@@ -131,6 +135,7 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
131135
return
132136
self.event = event_class(self, event_size_without_header, table_map,
133137
ctl_connection,
138+
mysql_version=mysql_version,
134139
only_tables=only_tables,
135140
ignored_tables=ignored_tables,
136141
only_schemas=only_schemas,

pymysqlreplication/tests/test_basic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def ignoredEvents(self):
2525
return [GtidEvent]
2626

2727
def test_allowed_event_list(self):
28-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 14)
29-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 13)
30-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 13)
28+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 15)
29+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 14)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 14)
3131
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3232

3333
def test_read_query_event(self):

0 commit comments

Comments
 (0)