Skip to content

Distinguish ambiguous column value of None #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 57 additions & 13 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.__ignored_tables = kwargs["ignored_tables"]
self.__only_schemas = kwargs["only_schemas"]
self.__ignored_schemas = kwargs["ignored_schemas"]
self.none_sources = {}

#Header
self.table_id = self._read_table_id()
Expand Down Expand Up @@ -123,11 +124,14 @@ def _read_column_data(self, cols_bitmap):

def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, zerofill,
fixed_binary_length, i):
name = self.table_map[self.table_id].columns[i].name

if BitGet(cols_bitmap, i) == 0:
self.none_sources[name] = 'cols_bitmap'
return None

if self._is_null(null_bitmap, null_bitmap_index):
self.none_sources[name] = 'null'
return None

if column.type == FIELD_TYPE.TINY:
Expand Down Expand Up @@ -182,18 +186,27 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap
elif column.type == FIELD_TYPE.BLOB:
return self.__read_string(column.length_size, column)
elif column.type == FIELD_TYPE.DATETIME:
return self.__read_datetime()
ret = self.__read_datetime()
if ret is None:
self.none_sources[name] = 'out of datetime range'
return ret
elif column.type == FIELD_TYPE.TIME:
return self.__read_time()
elif column.type == FIELD_TYPE.DATE:
return self.__read_date()
ret = self.__read_date()
if ret is None:
self.none_sources[name] = 'out of date range'
return ret
elif column.type == FIELD_TYPE.TIMESTAMP:
return datetime.datetime.fromtimestamp(
self.packet.read_uint32())

# For new date format:
elif column.type == FIELD_TYPE.DATETIME2:
return self.__read_datetime2(column)
ret = self.__read_datetime2(column)
if ret is None:
self.none_sources[name] = 'out of datetime2 range'
return ret
elif column.type == FIELD_TYPE.TIME2:
return self.__read_time2(column)
elif column.type == FIELD_TYPE.TIMESTAMP2:
Expand All @@ -217,10 +230,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap
# We read set columns as a bitmap telling us which options
# are enabled
bit_mask = self.packet.read_uint_by_size(column.size)
return set(
set_value = set(
val for idx, val in enumerate(column.set_values)
if bit_mask & 2 ** idx
) or None
)
if not set_value:
self.none_sources[column.name] = "empty set"
return None
return set_value

elif column.type == FIELD_TYPE.BIT:
return self.__read_bit(column)
Expand Down Expand Up @@ -459,6 +476,16 @@ def __read_binary_slice(self, binary, start, size, data_length):
mask = ((1 << size) - 1)
return binary & mask

def _categorize_none(self, column_data):
result = {}
for column_name, value in column_data.items():
if value is not None:
continue

category = self.none_sources.get(column_name, "null")
result[column_name] = category
return result

def _dump(self):
super()._dump()
print("Table: %s.%s" % (self.schema, self.table))
Expand Down Expand Up @@ -498,6 +525,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["category_of_none"] = self._categorize_none(row["values"])

return row

def _dump(self):
Expand All @@ -506,8 +535,8 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print("*", key, ":", row["values"][key])

print("*", key, ":", row["values"][key],
"(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "")

class WriteRowsEvent(RowsEvent):
"""This event is triggered when a row in database is added
Expand All @@ -526,6 +555,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["category_of_none"] = self._categorize_none(row["values"])

return row

def _dump(self):
Expand All @@ -534,7 +565,8 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print("*", key, ":", row["values"][key])
print("*", key, ":", row["values"][key],
"(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "")


class UpdateRowsEvent(RowsEvent):
Expand Down Expand Up @@ -562,8 +594,9 @@ def _fetch_one_row(self):
row = {}

row["before_values"] = self._read_column_data(self.columns_present_bitmap)

row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
row['before_category_of_none'] = self._categorize_none(row["before_values"])
row['after_values'] = self._read_column_data(self.columns_present_bitmap2)
row['after_category_of_none'] = self._categorize_none(row["after_values"])
return row

def _dump(self):
Expand All @@ -573,10 +606,21 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["before_values"]:
print("*%s:%s=>%s" % (key,
row["before_values"][key],
row["after_values"][key]))
if key in row["before_category_of_none"]:
before_value_info = "%s(%s)" % (row["before_values"][key],
row["before_category_of_none"][key])
else:
before_value_info = row["before_values"][key]

if key in row["after_category_of_none"]:
after_value_info = "%s(%s)" % (row["after_values"][key],
row["after_category_of_none"][key])
else:
after_value_info = row["after_values"][key]

print("*%s:%s=>%s" % (key,
before_value_info,
after_value_info))

class TableMapEvent(BinLogEvent):
"""This event describes the structure of a table.
Expand Down
18 changes: 18 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,24 @@ def create_binlog_packet_wrapper(pkt):
self.assertEqual(binlog_event.event._is_event_valid, True)
self.assertNotEqual(wrong_event.event._is_event_valid, True)

def test_categorize_none(self):
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
resume_stream=False,
only_events = [WriteRowsEvent]
)
query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);"
self.execute(query)
query = "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);"
self.execute(query)
self.execute("COMMIT")
write_rows_event = self.stream.fetchone()
self.assertIsInstance(write_rows_event, WriteRowsEvent)
self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null')



class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
Expand Down