Skip to content

Commit 68838e0

Browse files
Merge remote-tracking branch 'origin/master'
2 parents b306dd7 + 296472b commit 68838e0

File tree

4 files changed

+67
-4
lines changed

4 files changed

+67
-4
lines changed

pymysqlreplication/binlogstream.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
from pymysql.cursors import DictCursor
99

1010
from .packet import BinLogPacketWrapper
11-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
11+
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
1212
from .gtid import GtidSet
1313
from .event import (
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
15-
XidEvent, GtidEvent, StopEvent,
15+
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1717
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
1818
from .exceptions import BinLogNotEnabled
@@ -232,6 +232,7 @@ def __init__(self, connection_settings, server_id,
232232
self.pymysql_wrapper = pymysql_wrapper
233233
else:
234234
self.pymysql_wrapper = pymysql.connect
235+
self.mysql_version = (0, 0, 0)
235236

236237
def close(self):
237238
if self.__connected_stream:
@@ -505,6 +506,7 @@ def fetchone(self):
505506

506507
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
507508
self._ctl_connection,
509+
self.mysql_version,
508510
self.__use_checksum,
509511
self.__allowed_events_in_packet,
510512
self.__only_tables,
@@ -572,6 +574,9 @@ def fetchone(self):
572574
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
573575
continue
574576

577+
if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
578+
self.mysql_version = binlog_event.event.mysql_version
579+
575580
return binlog_event.event
576581

577582
def _allowed_event_list(self, only_events, ignored_events,
@@ -584,6 +589,7 @@ def _allowed_event_list(self, only_events, ignored_events,
584589
RotateEvent,
585590
StopEvent,
586591
FormatDescriptionEvent,
592+
XAPrepareEvent,
587593
XidEvent,
588594
GtidEvent,
589595
BeginLoadQueryEvent,

pymysqlreplication/constants/BINLOG.py

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
GTID_LOG_EVENT = 0x21
3737
ANONYMOUS_GTID_LOG_EVENT = 0x22
3838
PREVIOUS_GTIDS_LOG_EVENT = 0x23
39+
XA_PREPARE_EVENT = 0x26
3940

4041
# INTVAR types
4142
INTVAR_INVALID_INT_EVENT = 0x00

pymysqlreplication/event.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
class BinLogEvent(object):
1111
def __init__(self, from_packet, event_size, table_map, ctl_connection,
12+
mysql_version=(0,0,0),
1213
only_tables=None,
1314
ignored_tables=None,
1415
only_schemas=None,
@@ -22,6 +23,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2223
self.timestamp = self.packet.timestamp
2324
self.event_size = event_size
2425
self._ctl_connection = ctl_connection
26+
self.mysql_version = mysql_version
2527
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2628
self._ignore_decode_errors = ignore_decode_errors
2729
# The event have been fully processed, if processed is false
@@ -60,6 +62,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
6062
self.commit_flag = struct.unpack("!B", self.packet.read(1))[0] == 1
6163
self.sid = self.packet.read(16)
6264
self.gno = struct.unpack('<Q', self.packet.read(8))[0]
65+
self.lt_type = byte2int(self.packet.read(1))
66+
67+
if self.mysql_version >= (5, 7):
68+
self.last_committed = struct.unpack('<Q', self.packet.read(8))[0]
69+
self.sequence_number = struct.unpack('<Q', self.packet.read(8))[0]
6370

6471
@property
6572
def gtid(self):
@@ -75,6 +82,9 @@ def gtid(self):
7582
def _dump(self):
7683
print("Commit: %s" % self.commit_flag)
7784
print("GTID_NEXT: %s" % self.gtid)
85+
if hasattr(self, "last_committed"):
86+
print("last_committed: %d" % self.last_committed)
87+
print("sequence_number: %d" % self.sequence_number)
7888

7989
def __repr__(self):
8090
return '<GtidEvent "%s">' % self.gtid
@@ -121,8 +131,49 @@ def dump(self):
121131
print()
122132

123133

134+
class XAPrepareEvent(BinLogEvent):
135+
"""An XA prepare event is generated for a XA prepared transaction.
136+
Like Xid_event it contans XID of the *prepared* transaction
137+
138+
Attributes:
139+
one_phase: current XA transaction commit method
140+
xid: serialized XID representation of XA transaction
141+
"""
142+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
143+
super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map,
144+
ctl_connection, **kwargs)
145+
146+
# one_phase is True: XA COMMIT ... ONE PHASE
147+
# one_phase is False: XA PREPARE
148+
self.one_phase = (self.packet.read(1) != b'\x00')
149+
self.xid_format_id = struct.unpack('<I', self.packet.read(4))[0]
150+
gtrid_length = struct.unpack('<I', self.packet.read(4))[0]
151+
bqual_length = struct.unpack('<I', self.packet.read(4))[0]
152+
self.xid_gtrid = self.packet.read(gtrid_length)
153+
self.xid_bqual = self.packet.read(bqual_length)
154+
155+
@property
156+
def xid(self):
157+
return self.xid_gtrid.decode() + self.xid_bqual.decode()
158+
159+
def _dump(self):
160+
print("One phase: %s" % self.one_phase)
161+
print("XID formatID: %d" % self.xid_format_id)
162+
print("XID: %s" % self.xid)
163+
164+
124165
class FormatDescriptionEvent(BinLogEvent):
125-
pass
166+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
167+
super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
168+
ctl_connection, **kwargs)
169+
self.binlog_version = struct.unpack('<H', self.packet.read(2))
170+
self.mysql_version_str = self.packet.read(50).rstrip(b'\0').decode()
171+
numbers = self.mysql_version_str.split('-')[0]
172+
self.mysql_version = tuple(map(int, numbers.split('.')))
173+
174+
def _dump(self):
175+
print("Binlog version: %s" % self.binlog_version)
176+
print("MySQL version: %s" % self.mysql_version_str)
126177

127178

128179
class StopEvent(BinLogEvent):

pymysqlreplication/packet.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class BinLogPacketWrapper(object):
6969
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7070
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7171
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
72+
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
7273
# row_event
7374
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7475
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
@@ -89,7 +90,10 @@ class BinLogPacketWrapper(object):
8990
constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent
9091
}
9192

92-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
93+
def __init__(self, from_packet, table_map,
94+
ctl_connection,
95+
mysql_version,
96+
use_checksum,
9397
allowed_events,
9498
only_tables,
9599
ignored_tables,
@@ -136,6 +140,7 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
136140
return
137141
self.event = event_class(self, event_size_without_header, table_map,
138142
ctl_connection,
143+
mysql_version=mysql_version,
139144
only_tables=only_tables,
140145
ignored_tables=ignored_tables,
141146
only_schemas=only_schemas,

0 commit comments

Comments
 (0)