Skip to content

Commit fcc447b

Browse files
sean-k1mjs1995mikanizstarcat37heehehe
authored
Feature/optional meta data extraction (#471)
Co-authored-by: mjs1995 <[email protected]> Co-authored-by: mikaniz <[email protected]> Co-authored-by: starcat37 <[email protected]> Co-authored-by: heehehe <[email protected]> Co-authored-by: dongwook-chan <[email protected]>
1 parent ab0e20e commit fcc447b

File tree

6 files changed

+494
-4
lines changed

6 files changed

+494
-4
lines changed

pymysqlreplication/binlogstream.py

+13
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ def __connect_to_ctl(self):
262262
self._ctl_connection_settings["autocommit"] = True
263263
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
264264
self._ctl_connection._get_table_information = self.__get_table_information
265+
self._ctl_connection._get_dbms = self.__get_dbms
265266
self.__connected_ctl = True
266267

267268
def __checksum_enabled(self):
@@ -674,5 +675,17 @@ def __get_table_information(self, schema, table):
674675
else:
675676
raise error
676677

678+
def __get_dbms(self):
679+
if not self.__connected_ctl:
680+
self.__connect_to_ctl()
681+
682+
cur = self._ctl_connection.cursor()
683+
cur.execute("SELECT VERSION();")
684+
version_info = cur.fetchone().get('VERSION()', '')
685+
686+
if 'MariaDB' in version_info:
687+
return 'mariadb'
688+
return 'mysql'
689+
677690
def __iter__(self):
678691
return iter(self.fetchone, None)

pymysqlreplication/constants/BINLOG.py

+3
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,6 @@
4949
MARIADB_GTID_EVENT = 0xa2
5050
MARIADB_GTID_GTID_LIST_EVENT = 0xa3
5151
MARIADB_START_ENCRYPTION_EVENT = 0xa4
52+
53+
# Common-Footer
54+
BINLOG_CHECKSUM_LEN = 4

pymysqlreplication/packet.py

+3
Original file line numberDiff line numberDiff line change
@@ -500,3 +500,6 @@ def read_string(self):
500500
string += char
501501

502502
return string
503+
504+
def bytes_to_read(self):
505+
return len(self.packet._data) - self.packet._position

pymysqlreplication/row_event.py

+288-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import json
77

88
from pymysql.charset import charset_by_name
9+
from enum import Enum
910

1011
from .event import BinLogEvent
1112
from .exceptions import TableMetadataUnavailableError
@@ -552,7 +553,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
552553
super().__init__(from_packet, event_size,
553554
table_map, ctl_connection, **kwargs)
554555
if self._processed:
555-
#Body
556+
# Body
556557
self.columns_present_bitmap = self.packet.read(
557558
(self.number_of_columns + 7) / 8)
558559
self.columns_present_bitmap2 = self.packet.read(
@@ -577,6 +578,40 @@ def _dump(self):
577578
row["before_values"][key],
578579
row["after_values"][key]))
579580

581+
class OptionalMetaData:
582+
def __init__(self):
583+
self.unsigned_column_list = []
584+
self.default_charset_collation = None
585+
self.charset_collation = {}
586+
self.column_charset = []
587+
self.column_name_list = []
588+
self.set_str_value_list = []
589+
self.set_enum_str_value_list = []
590+
self.geometry_type_list = []
591+
self.simple_primary_key_list = []
592+
self.primary_keys_with_prefix = {}
593+
self.enum_and_set_default_charset = None
594+
self.enum_and_set_charset_collation = {}
595+
self.enum_and_set_default_column_charset_list = []
596+
self.charset_collation_list = []
597+
self.enum_and_set_collation_list = []
598+
self.visibility_list = []
599+
600+
def dump(self):
601+
print("=== %s ===" % self.__class__.__name__)
602+
print("unsigned_column_list: %s" % self.unsigned_column_list)
603+
print("default_charset_collation: %s" % self.default_charset_collation)
604+
print("charset_collation: %s" % self.charset_collation)
605+
print("column_charset: %s" % self.column_charset)
606+
print("column_name_list: %s" % self.column_name_list)
607+
print("set_str_value_list : %s" % self.set_str_value_list)
608+
print("set_enum_str_value_list : %s" % self.set_enum_str_value_list)
609+
print("geometry_type_list : %s" % self.geometry_type_list)
610+
print("simple_primary_key_list: %s" % self.simple_primary_key_list)
611+
print("primary_keys_with_prefix: %s" % self.primary_keys_with_prefix)
612+
print("visibility_list: %s" % self.visibility_list)
613+
print("charset_collation_list: %s" % self.charset_collation_list)
614+
print("enum_and_set_collation_list: %s" % self.enum_and_set_collation_list)
580615

581616
class TableMapEvent(BinLogEvent):
582617
"""This event describes the structure of a table.
@@ -633,6 +668,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
633668
else:
634669
self.column_schemas = self._ctl_connection._get_table_information(self.schema, self.table)
635670

671+
self.dbms = self._ctl_connection._get_dbms()
636672
ordinal_pos_loc = 0
637673

638674
if self.column_count != 0:
@@ -675,6 +711,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
675711
# ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
676712
## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns
677713
self.null_bitmask = self.packet.read((self.column_count + 7) / 8)
714+
# optional meta Data
715+
self.optional_metadata = self._get_optional_meta_data()
678716

679717
def get_table(self):
680718
return self.table_obj
@@ -685,3 +723,252 @@ def _dump(self):
685723
print("Schema: %s" % (self.schema))
686724
print("Table: %s" % (self.table))
687725
print("Columns: %s" % (self.column_count))
726+
self.optional_metadata.dump()
727+
728+
def _get_optional_meta_data(self):
729+
"""
730+
DEFAULT_CHARSET and COLUMN_CHARSET don't appear together,
731+
and ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET don't appear together.
732+
They are just alternative ways to pack character set information.
733+
When binlogging, it logs character sets in the way that occupies least storage.
734+
735+
TLV format data (TYPE, LENGTH, VALUE)
736+
"""
737+
optional_metadata = OptionalMetaData()
738+
while self.packet.bytes_to_read() > BINLOG.BINLOG_CHECKSUM_LEN:
739+
option_metadata_type = self.packet.read(1)[0]
740+
length = self.packet.read_length_coded_binary()
741+
field_type: MetadataFieldType = MetadataFieldType.by_index(option_metadata_type)
742+
743+
if field_type == MetadataFieldType.SIGNEDNESS:
744+
signed_column_list = self._convert_include_non_numeric_column(
745+
self._read_bool_list(length, True))
746+
optional_metadata.unsigned_column_list = signed_column_list
747+
748+
elif field_type == MetadataFieldType.DEFAULT_CHARSET:
749+
optional_metadata.default_charset_collation, optional_metadata.charset_collation = self._read_default_charset(
750+
length)
751+
optional_metadata.charset_collation_list = self._parsed_column_charset_by_default_charset(
752+
optional_metadata.default_charset_collation,
753+
optional_metadata.charset_collation,
754+
self._is_character_column)
755+
756+
elif field_type == MetadataFieldType.COLUMN_CHARSET:
757+
optional_metadata.column_charset = self._read_ints(length)
758+
optional_metadata.charset_collation_list = self._parsed_column_charset_by_column_charset(
759+
optional_metadata.column_charset, self._is_character_column)
760+
761+
elif field_type == MetadataFieldType.COLUMN_NAME:
762+
optional_metadata.column_name_list = self._read_column_names(length)
763+
764+
elif field_type == MetadataFieldType.SET_STR_VALUE:
765+
optional_metadata.set_str_value_list = self._read_type_values(length)
766+
767+
elif field_type == MetadataFieldType.ENUM_STR_VALUE:
768+
optional_metadata.set_enum_str_value_list = self._read_type_values(length)
769+
770+
elif field_type == MetadataFieldType.GEOMETRY_TYPE:
771+
optional_metadata.geometry_type_list = self._read_ints(length)
772+
773+
elif field_type == MetadataFieldType.SIMPLE_PRIMARY_KEY:
774+
optional_metadata.simple_primary_key_list = self._read_ints(length)
775+
776+
elif field_type == MetadataFieldType.PRIMARY_KEY_WITH_PREFIX:
777+
optional_metadata.primary_keys_with_prefix = self._read_primary_keys_with_prefix(length)
778+
779+
elif field_type == MetadataFieldType.ENUM_AND_SET_DEFAULT_CHARSET:
780+
optional_metadata.enum_and_set_default_charset, optional_metadata.enum_and_set_charset_collation = self._read_default_charset(
781+
length)
782+
783+
optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_default_charset(
784+
optional_metadata.enum_and_set_default_charset,
785+
optional_metadata.enum_and_set_charset_collation,
786+
self._is_enum_or_set_column)
787+
788+
elif field_type == MetadataFieldType.ENUM_AND_SET_COLUMN_CHARSET:
789+
optional_metadata.enum_and_set_default_column_charset_list = self._read_ints(length)
790+
791+
optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_column_charset(
792+
optional_metadata.enum_and_set_default_column_charset_list, self._is_enum_or_set_column)
793+
794+
elif field_type == MetadataFieldType.VISIBILITY:
795+
optional_metadata.visibility_list = self._read_bool_list(length, False)
796+
797+
return optional_metadata
798+
799+
def _convert_include_non_numeric_column(self, signedness_bool_list):
800+
# The incoming order of columns in the packet represents the indices of the numeric columns.
801+
# Thus, it transforms non-numeric columns to align with the sorting.
802+
bool_list = []
803+
position = 0
804+
for i in range(self.column_count):
805+
column_type = self.columns[i].type
806+
if self._is_numeric_column(column_type):
807+
if signedness_bool_list[position]:
808+
bool_list.append(True)
809+
else:
810+
bool_list.append(False)
811+
position += 1
812+
else:
813+
bool_list.append(False)
814+
815+
return bool_list
816+
817+
def _parsed_column_charset_by_default_charset(self, default_charset_collation: int, column_charset_collation: dict,
818+
column_type_detect_function):
819+
column_charset = []
820+
for i in range(self.column_count):
821+
column_type = self.columns[i].type
822+
if not column_type_detect_function(column_type, dbms=self.dbms):
823+
continue
824+
elif i not in column_charset_collation.keys():
825+
column_charset.append(default_charset_collation)
826+
else:
827+
column_charset.append(column_charset_collation[i])
828+
829+
return column_charset
830+
831+
def _parsed_column_charset_by_column_charset(self, column_charset_list: list, column_type_detect_function):
832+
column_charset = []
833+
position = 0
834+
if len(column_charset_list) == 0:
835+
return
836+
for i in range(self.column_count):
837+
column_type = self.columns[i].type
838+
if not column_type_detect_function(column_type, dbms=self.dbms):
839+
continue
840+
else:
841+
column_charset.append(column_charset_list[position])
842+
position += 1
843+
844+
return column_charset
845+
846+
def _read_bool_list(self, read_byte_length, signedness_flag):
847+
# if signedness_flag true
848+
# The order of the index in the packet is only the index between the numeric_columns.
849+
# Therefore, we need to use numeric_column_count when calculating bits.
850+
bool_list = []
851+
bytes_data = self.packet.read(read_byte_length)
852+
853+
byte = 0
854+
byte_idx = 0
855+
bit_idx = 0
856+
857+
for i in range(self.column_count):
858+
column_type = self.columns[i].type
859+
if not self._is_numeric_column(column_type) and signedness_flag:
860+
continue
861+
if bit_idx == 0:
862+
byte = bytes_data[byte_idx]
863+
byte_idx += 1
864+
bool_list.append((byte & (0b10000000 >> bit_idx)) != 0)
865+
bit_idx = (bit_idx + 1) % 8
866+
return bool_list
867+
868+
def _read_default_charset(self, length):
869+
charset = {}
870+
read_until = self.packet.read_bytes + length
871+
if self.packet.read_bytes >= read_until:
872+
return
873+
default_charset_collation = self.packet.read_length_coded_binary()
874+
while self.packet.read_bytes < read_until:
875+
column_index = self.packet.read_length_coded_binary()
876+
charset_collation = self.packet.read_length_coded_binary()
877+
charset[column_index] = charset_collation
878+
879+
return default_charset_collation, charset
880+
881+
def _read_ints(self, length):
882+
result = []
883+
read_until = self.packet.read_bytes + length
884+
while self.packet.read_bytes < read_until:
885+
result.append(self.packet.read_length_coded_binary())
886+
return result
887+
888+
def _read_column_names(self, length):
889+
result = []
890+
read_until = self.packet.read_bytes + length
891+
while self.packet.read_bytes < read_until:
892+
result.append(self.packet.read_variable_length_string().decode())
893+
return result
894+
895+
def _read_type_values(self, length):
896+
result = []
897+
read_until = self.packet.read_bytes + length
898+
if self.packet.read_bytes >= read_until:
899+
return
900+
while self.packet.read_bytes < read_until:
901+
type_value_list = []
902+
value_count = self.packet.read_length_coded_binary()
903+
for i in range(value_count):
904+
value = self.packet.read_variable_length_string()
905+
decode_value = ""
906+
try:
907+
decode_value = value.decode()
908+
except UnicodeDecodeError:
909+
# ignore not utf-8 decode type
910+
pass
911+
type_value_list.append(decode_value)
912+
result.append(type_value_list)
913+
return result
914+
915+
def _read_primary_keys_with_prefix(self, length):
916+
ints = self._read_ints(length)
917+
result = {}
918+
for i in range(0, len(ints), 2):
919+
result[ints[i]] = ints[i + 1]
920+
return result
921+
922+
@staticmethod
923+
def _is_character_column(column_type, dbms='mysql'):
924+
if column_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING, FIELD_TYPE.VARCHAR, FIELD_TYPE.BLOB]:
925+
return True
926+
if column_type == FIELD_TYPE.GEOMETRY and dbms == 'mariadb':
927+
return True
928+
return False
929+
930+
@staticmethod
931+
def _is_enum_column(column_type):
932+
if column_type == FIELD_TYPE.ENUM:
933+
return True
934+
return False
935+
936+
@staticmethod
937+
def _is_set_column(column_type):
938+
if column_type == FIELD_TYPE.SET:
939+
return True
940+
return False
941+
942+
@staticmethod
943+
def _is_enum_or_set_column(column_type, dbms='mysql'):
944+
if column_type in [FIELD_TYPE.ENUM, FIELD_TYPE.SET]:
945+
return True
946+
return False
947+
948+
@staticmethod
949+
def _is_numeric_column(column_type):
950+
if column_type in [FIELD_TYPE.TINY, FIELD_TYPE.SHORT, FIELD_TYPE.INT24, FIELD_TYPE.LONG,
951+
FIELD_TYPE.LONGLONG, FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.FLOAT,
952+
FIELD_TYPE.DOUBLE,
953+
FIELD_TYPE.YEAR]:
954+
return True
955+
return False
956+
957+
class MetadataFieldType(Enum):
958+
SIGNEDNESS = 1 # Signedness of numeric columns
959+
DEFAULT_CHARSET = 2 # Charsets of character columns
960+
COLUMN_CHARSET = 3 # Charsets of character columns
961+
COLUMN_NAME = 4 # Names of columns
962+
SET_STR_VALUE = 5 # The string values of SET columns
963+
ENUM_STR_VALUE = 6 # The string values in ENUM columns
964+
GEOMETRY_TYPE = 7 # The real type of geometry columns
965+
SIMPLE_PRIMARY_KEY = 8 # The primary key without any prefix
966+
PRIMARY_KEY_WITH_PREFIX = 9 # The primary key with some prefix
967+
ENUM_AND_SET_DEFAULT_CHARSET = 10 # Charsets of ENUM and SET columns
968+
ENUM_AND_SET_COLUMN_CHARSET = 11 # Charsets of ENUM and SET columns
969+
VISIBILITY = 12
970+
UNKNOWN_METADATA_FIELD_TYPE = 128
971+
972+
@staticmethod
973+
def by_index(index):
974+
return MetadataFieldType(index)

pymysqlreplication/tests/base.py

+7
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ def isMySQL80AndMore(self):
6363
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
6464
return version >= 8.0
6565

66+
def isMySQL8014AndMore(self):
67+
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
68+
version_detail = int(self.getMySQLVersion().rsplit(".", 1)[1])
69+
if version > 8.0:
70+
return True
71+
return version == 8.0 and version_detail >= 14
72+
6673
def isMariaDB(self):
6774
if self.__is_mariaDB is None:
6875
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0]

0 commit comments

Comments
 (0)