Skip to content

improve performance and documentation, skip the first 2 events after reconnected #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 87 additions & 39 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pymysql
import struct
import time
from distutils.version import LooseVersion

from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
Expand Down Expand Up @@ -30,6 +31,7 @@
# 2006 MySQL server has gone away
MYSQL_EXPECTED_ERROR_CODES = [2013, 2006]

PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6")

class ReportSlave(object):

Expand Down Expand Up @@ -142,20 +144,48 @@ def __init__(self, connection_settings, server_id,
slave_heartbeat=None,
is_mariadb=False):
"""
Attributes:
Parameters:
connection_settings: a dict of parameters passed to `pymysql.connect`
or `pymysql_wrapper`, of which "db" parameter is not necessary
pymysql_wrapper: custom replacement for `pymysql.connect`
ctl_connection_settings: Connection settings for cluster holding
schema information
resume_stream: Start for event from position or the latest event of
binlog or from older available event
schema information, which could be None, in which case
`connection_settings` will be used as ctl_connection_settings,
except for that "db" will be replaced to "information_schema"
resume_stream: True or False. control the start point of the returned
events, only works when `auto_position` is None.
`fetchone` will fetch data from:
1.the begining of `log_file`: if `resume_stream` is False
2.`log_pos` of `log_file`: if resume_stream is True, and it's
the first time to fetch the data
3.the event right next to the last fetched event: when resume_stream
is True and it's not the first time to fetch data
note: the log position will be set back to the begging of `log_file`
each time the client is disconnected and then reconnected
to the mysql server (OperationalError 2006/2013) if resume_stream
is False. so it's suggested to set resume_stream to True.

blocking: When master has finished reading/sending binlog it will
send EOF instead of blocking connection.
only_events: Array of allowed events
ignored_events: Array of ignored events
log_file: Set replication start log file
log_file: Set replication start log file. if ether `log_file` or
`log_pos` is None, and auto_position is None, then log_pos
and log_file will be set as the values returned by the query
"SHOW MASTER STATUS"
log_pos: Set replication start log pos (resume_stream should be
true)
true). if ether `log_file` or `log_pos` is None, and auto_position
is None, then log_pos and log_file will be set as the values
returned by the query "SHOW MASTER STATUS", and log_pos will
be set as 4 (the start position of any log file) if resume_stream
is a false value
end_log_pos: Set replication end log pos
auto_position: Use master_auto_position gtid to set position
auto_position: a string of replicated GTIDs. all the events except
for thoses included in `auto_position` and those purged by
the source server will be sent to the client. a valid `auto_position`
looks like:
19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
only_tables: An array with the tables you want to watch (only works
in binlog_format ROW)
ignored_tables: An array with the tables you want to skip
Expand All @@ -177,13 +207,17 @@ def __init__(self, connection_settings, server_id,
for semantics
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
to point to Mariadb specific GTID.

Notes:
the log position will be set back to the begging of `log_file`
each time the client is disconnected and then auto-reconnected
to the mysql server (OperationalError 2006/2013) if resume_stream
is False. so it's suggested to set resume_stream to True.
"""

self.__connection_settings = connection_settings
self.__connection_settings.setdefault("charset", "utf8")

self.__connected_stream = False
self.__connected_ctl = False
self.__resume_stream = resume_stream
self.__blocking = blocking
self._ctl_connection_settings = ctl_connection_settings
Expand Down Expand Up @@ -232,22 +266,24 @@ def __init__(self, connection_settings, server_id,
def close(self):
if self.__connected_stream:
self._stream_connection.close()
self.__connected_stream = False
if self.__connected_ctl:
if getattr(self, '_ctl_connection', None):
# break reference cycle between stream reader and underlying
# mysql connection object
self._ctl_connection._get_table_information = None
self._ctl_connection.close()
self.__connected_ctl = False
if self._ctl_connection.open:
self._ctl_connection.close()

def __connect_to_ctl(self):
def __connect_to_ctl(self, force_reconnect=False):
if self.__connected_ctl:
if not force_reconnect:
return
self._ctl_connection.close()
if not self._ctl_connection_settings:
self._ctl_connection_settings = dict(self.__connection_settings)
self._ctl_connection_settings["db"] = "information_schema"
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
self._ctl_connection._get_table_information = self.__get_table_information
self.__connected_ctl = True

def __checksum_enabled(self):
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
Expand All @@ -269,7 +305,7 @@ def _register_slave(self):

packet = self.report_slave.encoded(self.__server_id)

if pymysql.__version__ < LooseVersion("0.6"):
if PYMYSQL_VERSION_LT_06:
self._stream_connection.wfile.write(packet)
self._stream_connection.wfile.flush()
self._stream_connection.read_packet()
Expand All @@ -278,12 +314,29 @@ def _register_slave(self):
self._stream_connection._next_seq_id = 1
self._stream_connection._read_packet()

def __connect_to_stream(self):
@property
def __connected_stream(self):
return bool(getattr(self, '_stream_connection', None) and \
self._stream_connection.open)

@property
def __connected_ctl(self):
return bool(getattr(self, '_ctl_connection', None) and \
self._ctl_connection.open)

def __connect_to_stream(self, force_reconnect=False):
if self.__connected_stream:
if not force_reconnect:
return
self._stream_connection.close()

# log_pos (4) -- position in the binlog-file to start the stream with
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
# server_id (4) -- server id of this slave
# log_file (string.EOF) -- filename of the binlog on the master
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
if PYMYSQL_VERSION_LT_06:
self._stream_connection._read_packet = self._stream_connection.read_packet

self.__use_checksum = self.__checksum_enabled()

Expand All @@ -305,9 +358,7 @@ def __connect_to_stream(self):
4294967))
# If heartbeat is too low, the connection will disconnect before,
# this is also the behavior in mysql
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
if heartbeat > 4294967:
heartbeat = 4294967
heartbeat = float(min(net_timeout/2., self.slave_heartbeat, 4294967))

# master_heartbeat_period is nanoseconds
heartbeat = int(heartbeat * 1000000000)
Expand Down Expand Up @@ -454,35 +505,33 @@ def __connect_to_stream(self):
# encoded_data
prelude += gtid_set.encoded()

if pymysql.__version__ < LooseVersion("0.6"):
if PYMYSQL_VERSION_LT_06:
self._stream_connection.wfile.write(prelude)
self._stream_connection.wfile.flush()
else:
self._stream_connection._write_bytes(prelude)
self._stream_connection._next_seq_id = 1
self.__connected_stream = True

def fetchone(self):
def fetchone(self, force_reconnect=False):
self.__prefetch(force_reconnect=force_reconnect)
return self.__fetchone()

def __prefetch(self, force_reconnect=False):
self.__connect_to_ctl(force_reconnect=force_reconnect)
self.__connect_to_stream(force_reconnect=force_reconnect)

def __fetchone(self):
# let `__fetchone` be as light weight as possible.
while True:
if self.end_log_pos and self.is_past_end_log_pos:
return None

if not self.__connected_stream:
self.__connect_to_stream()

if not self.__connected_ctl:
self.__connect_to_ctl()

try:
if pymysql.__version__ < LooseVersion("0.6"):
pkt = self._stream_connection.read_packet()
else:
pkt = self._stream_connection._read_packet()
pkt = self._stream_connection._read_packet()
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self._stream_connection.close()
self.__connected_stream = False
self.__connect_to_stream(force_reconnect=True)
continue
raise

Expand Down Expand Up @@ -597,9 +646,8 @@ def _allowed_event_list(self, only_events, ignored_events,

def __get_table_information(self, schema, table):
for i in range(1, 3):
self.__connect_to_ctl()
try:
if not self.__connected_ctl:
self.__connect_to_ctl()

cur = self._ctl_connection.cursor()
cur.execute("""
Expand All @@ -617,10 +665,10 @@ def __get_table_information(self, schema, table):
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self.__connected_ctl = False
continue
else:
raise error

def __iter__(self):
return iter(self.fetchone, None)
self.__prefetch(force_reconnect=False)
return iter(self.__fetchone, None)
18 changes: 16 additions & 2 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,21 +503,35 @@ def test_end_log_pos(self):

binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]

self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
log_pos=0,
log_file=binlog)
# fetch several events then get the end position of
# the last event
# do not use a fixed int as end position, cause that
# may be an invalid position
for i in range(13):
_ = self.stream.fetchone()
binlog = self.stream.log_file
end_position = self.stream.log_pos
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
log_pos=0,
log_file=binlog,
end_log_pos=888)
end_log_pos=end_position)

last_log_pos = 0
last_event_type = 0
for event in self.stream:
last_log_pos = self.stream.log_pos
last_event_type = event.event_type

self.assertEqual(last_log_pos, 888)
self.assertEqual(last_log_pos, end_position)
self.assertEqual(last_event_type, TABLE_MAP_EVENT)

class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
Expand Down
2 changes: 1 addition & 1 deletion pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ def test_null_bitmask(self):
column_type = "INT"
column_definition.append(column_type)

nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else ""
nullability = "NOT NULL" if not RowsEvent._RowsEvent__is_null(None, bit_mask, i) else ""
column_definition.append(nullability)

columns.append(" ".join(column_definition))
Expand Down