Skip to content

Commit 0ea9f9f

Browse files
committed
Add XAPrepareEvent, parse last_committed & sequence_number of GtidEvent
1 parent 5068e51 commit 0ea9f9f

File tree

4 files changed

+40
-1
lines changed

4 files changed

+40
-1
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from .gtid import GtidSet
1313
from .event import (
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
15-
XidEvent, GtidEvent, StopEvent,
15+
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1717
HeartbeatLogEvent, NotImplementedEvent)
1818
from .exceptions import BinLogNotEnabled
@@ -502,6 +502,7 @@ def _allowed_event_list(self, only_events, ignored_events,
502502
RotateEvent,
503503
StopEvent,
504504
FormatDescriptionEvent,
505+
XAPrepareEvent,
505506
XidEvent,
506507
GtidEvent,
507508
BeginLoadQueryEvent,

pymysqlreplication/constants/BINLOG.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
GTID_LOG_EVENT = 0x21
3737
ANONYMOUS_GTID_LOG_EVENT = 0x22
3838
PREVIOUS_GTIDS_LOG_EVENT = 0x23
39+
XA_PREPARE_EVENT = 0x26
3940

4041
# INTVAR types
4142
INTVAR_INVALID_INT_EVENT = 0x00

pymysqlreplication/event.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5858
self.commit_flag = byte2int(self.packet.read(1)) == 1
5959
self.sid = self.packet.read(16)
6060
self.gno = struct.unpack('<Q', self.packet.read(8))[0]
61+
self.lt_type = byte2int(self.packet.read(1))
62+
self.last_committed = struct.unpack('<Q', self.packet.read(8))[0]
63+
self.sequence_number = struct.unpack('<Q', self.packet.read(8))[0]
6164

6265
@property
6366
def gtid(self):
@@ -73,6 +76,8 @@ def gtid(self):
7376
def _dump(self):
7477
print("Commit: %s" % self.commit_flag)
7578
print("GTID_NEXT: %s" % self.gtid)
79+
print("last_committed: %d" % self.last_committed)
80+
print("sequence_number: %d" % self.sequence_number)
7681

7782
def __repr__(self):
7883
return '<GtidEvent "%s">' % self.gtid
@@ -98,6 +103,37 @@ def dump(self):
98103
print()
99104

100105

106+
class XAPrepareEvent(BinLogEvent):
107+
"""An XA prepare event is generated for a XA prepared transaction.
108+
Like Xid_event it contans XID of the *prepared* transaction
109+
110+
Attributes:
111+
one_phase: current XA transaction commit method
112+
xid: serialized XID representation of XA transaction
113+
"""
114+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
115+
super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map,
116+
ctl_connection, **kwargs)
117+
118+
# one_phase is True: XA COMMIT ... ONE PHASE
119+
# one_phase is False: XA PREPARE
120+
self.one_phase = (self.packet.read(1) != b'\x00')
121+
self.xid_format_id = struct.unpack('<I', self.packet.read(4))[0]
122+
gtrid_length = struct.unpack('<I', self.packet.read(4))[0]
123+
bqual_length = struct.unpack('<I', self.packet.read(4))[0]
124+
self.xid_gtrid = self.packet.read(gtrid_length)
125+
self.xid_bqual = self.packet.read(bqual_length)
126+
127+
@property
128+
def xid(self):
129+
return self.xid_gtrid.decode() + self.xid_bqual.decode()
130+
131+
def _dump(self):
132+
print("One phase: %s" % self.one_phase)
133+
print("XID formatID: %d" % self.xid_format_id)
134+
print("XID: %s" % self.xid)
135+
136+
101137
class FormatDescriptionEvent(BinLogEvent):
102138
pass
103139

pymysqlreplication/packet.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class BinLogPacketWrapper(object):
7171
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7272
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7373
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
74+
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
7475
# row_event
7576
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7677
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,

0 commit comments

Comments
 (0)