From 6be5a77aaa8e81ccd25b88b288f7cf1f473f3b7f Mon Sep 17 00:00:00 2001 From: Julien Duponchelle Date: Mon, 10 Mar 2025 22:14:45 +0100 Subject: [PATCH] Allow to force the encoding for old MySQL release For MySQL before 5.XX we guess the encoding, this allow to manually set it. --- pymysqlreplication/binlogstream.py | 43 ++++++++++++++------------ pymysqlreplication/event.py | 10 +++--- pymysqlreplication/packet.py | 6 ++-- pymysqlreplication/row_event.py | 20 ++++++------ pymysqlreplication/tests/test_basic.py | 18 +++++------ 5 files changed, 53 insertions(+), 44 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 0ba4ca5c..29233b4a 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -1,43 +1,43 @@ -import struct import logging -from packaging.version import Version +import struct import pymysql +from packaging.version import Version from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE from pymysql.cursors import DictCursor -from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT +from .constants.BINLOG import FORMAT_DESCRIPTION_EVENT, ROTATE_EVENT, TABLE_MAP_EVENT from .event import ( - QueryEvent, - RotateEvent, - FormatDescriptionEvent, - XidEvent, - GtidEvent, - StopEvent, - XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, + FormatDescriptionEvent, + GtidEvent, HeartbeatLogEvent, - NotImplementedEvent, - MariadbGtidEvent, MariadbAnnotateRowsEvent, - RandEvent, + MariadbBinLogCheckPointEvent, + MariadbGtidEvent, + MariadbGtidListEvent, MariadbStartEncryptionEvent, + NotImplementedEvent, + PreviousGtidsEvent, + QueryEvent, + RandEvent, + RotateEvent, RowsQueryLogEvent, - MariadbGtidListEvent, - MariadbBinLogCheckPointEvent, + StopEvent, UserVarEvent, - PreviousGtidsEvent, + XAPrepareEvent, + XidEvent, ) from .exceptions import BinLogNotEnabled from .gtid import GtidSet from .packet import BinLogPacketWrapper from .row_event import ( - UpdateRowsEvent, - WriteRowsEvent, DeleteRowsEvent, - TableMapEvent, PartialUpdateRowsEvent, + TableMapEvent, + UpdateRowsEvent, + WriteRowsEvent, ) try: @@ -185,6 +185,7 @@ def __init__( slave_heartbeat=None, is_mariadb=False, annotate_rows_event=False, + force_encoding=None, ignore_decode_errors=False, verify_checksum=False, enable_logging=True, @@ -225,6 +226,8 @@ def __init__( to point to Mariadb specific GTID. annotate_rows_event: Parameter value to enable annotate rows event in mariadb, used with 'is_mariadb' + force_encoding: Force the encoding to decode a string this for MySQL 5.XXX This is the charset + to use example latin-1 ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. verify_checksum: If true, verify events read from the binary log by examining checksums. @@ -252,6 +255,7 @@ def __init__( only_events, ignored_events, filter_non_implemented_events ) self.__ignore_decode_errors = ignore_decode_errors + self.__force_encoding = force_encoding self.__verify_checksum = verify_checksum self.__optional_meta_data = False @@ -628,6 +632,7 @@ def fetchone(self): self.__ignored_schemas, self.__freeze_schema, self.__ignore_decode_errors, + self.__force_encoding, self.__verify_checksum, self.__optional_meta_data, ) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 868866de..51fe402a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -1,15 +1,15 @@ import binascii -import struct import datetime import decimal -import zlib +import json import logging +import struct +import zlib +from typing import Optional, Union from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch from pymysqlreplication.util.bytes import parse_decimal_from_bytes -from typing import Union, Optional -import json class BinLogEvent(object): @@ -26,6 +26,7 @@ def __init__( ignored_schemas=None, freeze_schema=False, ignore_decode_errors=False, + force_encoding=None, verify_checksum=False, optional_meta_data=False, ): @@ -37,6 +38,7 @@ def __init__( self._ctl_connection = ctl_connection self.mysql_version = mysql_version self._ignore_decode_errors = ignore_decode_errors + self._force_encoding = force_encoding self._verify_checksum = verify_checksum self._is_event_valid = None # The event have been fully processed, if processed is false diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index b70628fa..8ed07a6e 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -1,7 +1,7 @@ from pymysqlreplication import constants, event, row_event -from pymysqlreplication.json_binary import parse_json, JsonDiff, JsonDiffOperation -from pymysqlreplication.util.bytes import * from pymysqlreplication.constants import BINLOG +from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation, parse_json +from pymysqlreplication.util.bytes import * # Constants from PyMYSQL source code NULL_COLUMN = 251 @@ -72,6 +72,7 @@ def __init__( ignored_schemas, freeze_schema, ignore_decode_errors, + force_encoding, verify_checksum, optional_meta_data, ): @@ -125,6 +126,7 @@ def __init__( ignored_schemas=ignored_schemas, freeze_schema=freeze_schema, ignore_decode_errors=ignore_decode_errors, + force_encoding=force_encoding, verify_checksum=verify_checksum, optional_meta_data=optional_meta_data, ) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 11429f74..bffb548f 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -1,18 +1,15 @@ -import struct -import decimal import datetime +import decimal +import struct +from enum import Enum from pymysql.charset import charset_by_name -from enum import Enum -from .event import BinLogEvent -from .constants import FIELD_TYPE -from .constants import BINLOG -from .constants import CHARSET -from .constants import NONE_SOURCE +from .bitmap import BitCount, BitGet from .column import Column +from .constants import BINLOG, CHARSET, FIELD_TYPE, NONE_SOURCE +from .event import BinLogEvent from .table import Table -from .bitmap import BitCount, BitGet class RowsEvent(BinLogEvent): @@ -332,7 +329,10 @@ def __read_string(self, size, column): else: # MYSQL 5.xx Version Goes Here # We don't know encoding type So apply Default Utf-8 - string = string.decode(errors=decode_errors) + if self._force_encoding: + string = string.decode(encoding=self._force_encoding, errors=decode_errors) + else: + string = string.decode(errors=decode_errors) return string def __read_bit(self, column): diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index fc3b635a..0cf41bef 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1,19 +1,19 @@ import io import time import unittest +from unittest.mock import patch + +from pymysql.protocol import MysqlPacket -from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation -from pymysqlreplication.tests import base from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.gtid import GtidSet, Gtid -from pymysqlreplication.event import * from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.constants.NONE_SOURCE import * -from pymysqlreplication.row_event import * +from pymysqlreplication.event import * +from pymysqlreplication.gtid import Gtid, GtidSet +from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation from pymysqlreplication.packet import BinLogPacketWrapper -from pymysql.protocol import MysqlPacket -from unittest.mock import patch - +from pymysqlreplication.row_event import * +from pymysqlreplication.tests import base __all__ = [ "TestBasicBinLogStreamReader", @@ -2198,4 +2198,4 @@ def test_json_partial_update_two_column(self): if __name__ == "__main__": import unittest - unittest.main() + unittest.main() \ No newline at end of file