Skip to content

Commit 98d1b70

Browse files
committed
Get mysql version from FormatDescriptionEvent, and it's used while parsing GtidEvent
1 parent 3e34d85 commit 98d1b70

File tree

3 files changed

+31
-7
lines changed

3 files changed

+31
-7
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pymysql.util import int2byte
99

1010
from .packet import BinLogPacketWrapper
11-
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
11+
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
1212
from .gtid import GtidSet
1313
from .event import (
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
@@ -211,6 +211,7 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
211211
self.pymysql_wrapper = pymysql_wrapper
212212
else:
213213
self.pymysql_wrapper = pymysql.connect
214+
self.mysql_version = (0, 0, 0)
214215

215216
def close(self):
216217
if self.__connected_stream:
@@ -428,6 +429,7 @@ def fetchone(self):
428429

429430
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
430431
self._ctl_connection,
432+
self.mysql_version,
431433
self.__use_checksum,
432434
self.__allowed_events_in_packet,
433435
self.__only_tables,
@@ -490,6 +492,9 @@ def fetchone(self):
490492
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
491493
continue
492494

495+
if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
496+
self.mysql_version = binlog_event.event.mysql_version
497+
493498
return binlog_event.event
494499

495500
def _allowed_event_list(self, only_events, ignored_events,

pymysqlreplication/event.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
class BinLogEvent(object):
1111
def __init__(self, from_packet, event_size, table_map, ctl_connection,
12+
mysql_version=(0,0,0),
1213
only_tables=None,
1314
ignored_tables=None,
1415
only_schemas=None,
@@ -21,6 +22,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2122
self.timestamp = self.packet.timestamp
2223
self.event_size = event_size
2324
self._ctl_connection = ctl_connection
25+
self.mysql_version = mysql_version
2426
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2527
# The event have been fully processed, if processed is false
2628
# the event will be skipped
@@ -59,8 +61,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5961
self.sid = self.packet.read(16)
6062
self.gno = struct.unpack('<Q', self.packet.read(8))[0]
6163
self.lt_type = byte2int(self.packet.read(1))
62-
self.last_committed = struct.unpack('<Q', self.packet.read(8))[0]
63-
self.sequence_number = struct.unpack('<Q', self.packet.read(8))[0]
64+
65+
if self.mysql_version >= (5, 7):
66+
self.last_committed = struct.unpack('<Q', self.packet.read(8))[0]
67+
self.sequence_number = struct.unpack('<Q', self.packet.read(8))[0]
6468

6569
@property
6670
def gtid(self):
@@ -76,8 +80,9 @@ def gtid(self):
7680
def _dump(self):
7781
print("Commit: %s" % self.commit_flag)
7882
print("GTID_NEXT: %s" % self.gtid)
79-
print("last_committed: %d" % self.last_committed)
80-
print("sequence_number: %d" % self.sequence_number)
83+
if hasattr(self, "last_committed"):
84+
print("last_committed: %d" % self.last_committed)
85+
print("sequence_number: %d" % self.sequence_number)
8186

8287
def __repr__(self):
8388
return '<GtidEvent "%s">' % self.gtid
@@ -135,7 +140,17 @@ def _dump(self):
135140

136141

137142
class FormatDescriptionEvent(BinLogEvent):
138-
pass
143+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
144+
super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
145+
ctl_connection, **kwargs)
146+
self.binlog_version = struct.unpack('<H', self.packet.read(2))
147+
self.mysql_version_str = self.packet.read(50).rstrip(b'\0').decode()
148+
numbers = self.mysql_version_str.split('-')[0]
149+
self.mysql_version = tuple(map(int, numbers.split('.')))
150+
151+
def _dump(self):
152+
print("Binlog version: %s" % self.binlog_version)
153+
print("MySQL version: %s" % self.mysql_version_str)
139154

140155

141156
class StopEvent(BinLogEvent):

pymysqlreplication/packet.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ class BinLogPacketWrapper(object):
8686

8787
}
8888

89-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
89+
def __init__(self, from_packet, table_map,
90+
ctl_connection,
91+
mysql_version,
92+
use_checksum,
9093
allowed_events,
9194
only_tables,
9295
ignored_tables,
@@ -132,6 +135,7 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
132135
return
133136
self.event = event_class(self, event_size_without_header, table_map,
134137
ctl_connection,
138+
mysql_version=mysql_version,
135139
only_tables=only_tables,
136140
ignored_tables=ignored_tables,
137141
only_schemas=only_schemas,

0 commit comments

Comments
 (0)