diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index bb219d65..45406ad3 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -136,7 +136,8 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, report_slave=None, slave_uuid=None, pymysql_wrapper=None, fail_on_table_metadata_unavailable=False, - slave_heartbeat=None): + slave_heartbeat=None, + date_tostr=None): """ Attributes: ctl_connection_settings: Connection settings for cluster holding schema information @@ -164,6 +165,10 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, on replication resumption (in case many event to skip in binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation for semantics + date_tostr: False by default, convert MySQL.datetime to string not python.datetime. + handle '0000-00-00 00:00:00' to str rather than None, to keep data consistent for applying + timestamp does not convert, but also set 0 to '0000-00-00' rather than '1970-01-01 08:01:00' + ref: https://dev.mysql.com/doc/refman/5.7/en/datetime.html """ self.__connection_settings = connection_settings @@ -176,6 +181,7 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, self._ctl_connection_settings = ctl_connection_settings if ctl_connection_settings: self._ctl_connection_settings.setdefault("charset", "utf8") + self.date_tostr = date_tostr self.__only_tables = only_tables self.__ignored_tables = ignored_tables @@ -227,6 +233,13 @@ def __connect_to_ctl(self): self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor + if self.date_tostr: + from pymysql.converters import conversions # decoders + myconv = conversions.copy() + conv_date_tostr = {12: str, 18: str, 10: str} # see FIELD_TYPE.py + myconv.update(conv_date_tostr) + self._ctl_connection_settings.setdefault("conv", myconv) + self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information self.__connected_ctl = True diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 45308cff..b167c3fd 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -25,6 +25,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] + if ctl_connection.decoders.get(FIELD_TYPE.DATETIME2) is str: # also affect DATETIME, DATE, see binlogstream.py:__connect_to_ctl() + # map datetime to string, in case of inproper convertions + self.__date_tostr = 1 + else: + self.__date_tostr = 0 #Header self.table_id = self._read_table_id() @@ -141,8 +146,11 @@ def _read_column_data(self, cols_bitmap): elif column.type == FIELD_TYPE.DATE: values[name] = self.__read_date() elif column.type == FIELD_TYPE.TIMESTAMP: - values[name] = datetime.datetime.fromtimestamp( - self.packet.read_uint32()) + time_valid = self.packet.read_uint32() + if time_valid: + values[name] = datetime.datetime.fromtimestamp(time_valid) + else: + values[name] = '0000-00-00 00:00:00' # For new date format: elif column.type == FIELD_TYPE.DATETIME2: @@ -150,9 +158,13 @@ def _read_column_data(self, cols_bitmap): elif column.type == FIELD_TYPE.TIME2: values[name] = self.__read_time2(column) elif column.type == FIELD_TYPE.TIMESTAMP2: - values[name] = self.__add_fsp_to_time( - datetime.datetime.fromtimestamp( - self.packet.read_int_be_by_size(4)), column) + time_valid = self.packet.read_int_be_by_size(4) + if time_valid: + values[name] = self.__add_fsp_to_time( + datetime.datetime.fromtimestamp(time_valid), column) + else: + # if timestamp is 0, only this value, no other or microseconds + values[name] = self.__add_fsp_to_time('0000-00-00 00:00:00', column) elif column.type == FIELD_TYPE.LONGLONG: if unsigned: values[name] = self.packet.read_uint64() @@ -194,8 +206,12 @@ def __add_fsp_to_time(self, time, column): """ microsecond = self.__read_fsp(column) if microsecond > 0: - time = time.replace(microsecond=microsecond) - return time + if isinstance(time ,str): + return "{0}.{1:0>6}".format(time, microsecond) + else: + return time.replace(microsecond=microsecond) + else: + return time def __read_fsp(self, column): read = 0 @@ -280,13 +296,18 @@ def __read_time2(self, column): def __read_date(self): time = self.packet.read_uint24() - if time == 0: # nasty mysql 0000-00-00 dates - return None year = (time & ((1 << 15) - 1) << 9) >> 9 month = (time & ((1 << 4) - 1) << 5) >> 5 day = (time & ((1 << 5) - 1)) - if year == 0 or month == 0 or day == 0: + + if self.__date_tostr: + fmt = "{0:0>4}-{1:0>2}-{2:0>2}" + date_str = fmt.format(year, month, day) + return date_str + + if time == 0 or year == 0 or month == 0 or day == 0: + # nasty mysql 0000-00-00 dates return None date = datetime.date( @@ -298,8 +319,6 @@ def __read_date(self): def __read_datetime(self): value = self.packet.read_uint64() - if value == 0: # nasty mysql 0000-00-00 dates - return None date = value / 1000000 time = int(value % 1000000) @@ -307,16 +326,26 @@ def __read_datetime(self): year = int(date / 10000) month = int((date % 10000) / 100) day = int(date % 100) - if year == 0 or month == 0 or day == 0: + hour=int(time / 10000) + minute=int((time % 10000) / 100) + second=int(time % 100) + + if self.__date_tostr: + fmt = "{0:0>4}-{1:0>2}-{2:0>2} {3:0>2}:{4:0>2}:{5:0>2}" + datetime_str = fmt.format(year, month, day, hour, minute, second) + return datetime_str + + if value == 0 or year == 0 or month == 0 or day == 0: + # nasty mysql 0000-00-00 dates return None date = datetime.datetime( year=year, month=month, day=day, - hour=int(time / 10000), - minute=int((time % 10000) / 100), - second=int(time % 100)) + hour=hour, + minute=minute, + second=second) return date def __read_datetime2(self, column): @@ -333,16 +362,27 @@ def __read_datetime2(self, column): """ data = self.packet.read_int_be_by_size(5) year_month = self.__read_binary_slice(data, 1, 17, 40) - try: - t = datetime.datetime( - year=int(year_month / 13), - month=year_month % 13, - day=self.__read_binary_slice(data, 18, 5, 40), - hour=self.__read_binary_slice(data, 23, 5, 40), - minute=self.__read_binary_slice(data, 28, 6, 40), - second=self.__read_binary_slice(data, 34, 6, 40)) - except ValueError: - return None + year = int(year_month / 13) + month = year_month % 13 + day = self.__read_binary_slice(data, 18, 5, 40) + hour = self.__read_binary_slice(data, 23, 5, 40) + minute = self.__read_binary_slice(data, 28, 6, 40) + second = self.__read_binary_slice(data, 34, 6, 40) + + if self.__date_tostr: + fmt = "{0:0>4}-{1:0>2}-{2:0>2} {3:0>2}:{4:0>2}:{5:0>2}" + t = fmt.format(year, month, day, hour, minute, second) # datetime_str + else: + try: + t = datetime.datetime( + year=year, + month=month, + day=day, + hour=hour, + minute=minute, + second=second) + except ValueError: + return None return self.__add_fsp_to_time(t, column) def __read_new_decimal(self, column):