|
30 | 30 | "TestStatementConnectionSetting",
|
31 | 31 | "TestRowsQueryLogEvents",
|
32 | 32 | "TestOptionalMetaData",
|
| 33 | + "TestColumnValueNoneSources", |
33 | 34 | ]
|
34 | 35 |
|
35 | 36 |
|
@@ -603,71 +604,6 @@ def create_binlog_packet_wrapper(pkt):
|
603 | 604 | self.assertEqual(binlog_event.event._is_event_valid, True)
|
604 | 605 | self.assertNotEqual(wrong_event.event._is_event_valid, True)
|
605 | 606 |
|
606 |
| - def test_get_none(self): |
607 |
| - self.stream.close() |
608 |
| - self.stream = BinLogStreamReader( |
609 |
| - self.database, |
610 |
| - server_id=1024, |
611 |
| - resume_stream=False, |
612 |
| - only_events=[WriteRowsEvent], |
613 |
| - ) |
614 |
| - query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" |
615 |
| - self.execute(query) |
616 |
| - query = ( |
617 |
| - "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" |
618 |
| - ) |
619 |
| - self.execute(query) |
620 |
| - self.execute("COMMIT") |
621 |
| - write_rows_event = self.stream.fetchone() |
622 |
| - self.assertIsInstance(write_rows_event, WriteRowsEvent) |
623 |
| - |
624 |
| - none_sources = write_rows_event.rows[0].get("none_sources") |
625 |
| - if none_sources: |
626 |
| - self.assertEqual(none_sources["col1"], "null") |
627 |
| - |
628 |
| - def test_get_none_invalid(self): |
629 |
| - self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") |
630 |
| - self.execute( |
631 |
| - "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" |
632 |
| - ) |
633 |
| - self.execute( |
634 |
| - "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" |
635 |
| - ) |
636 |
| - self.resetBinLog() |
637 |
| - self.execute( |
638 |
| - "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" |
639 |
| - ) |
640 |
| - self.execute("COMMIT") |
641 |
| - |
642 |
| - self.assertIsInstance(self.stream.fetchone(), RotateEvent) |
643 |
| - self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) |
644 |
| - self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
645 |
| - self.assertIsInstance(self.stream.fetchone(), TableMapEvent) |
646 |
| - |
647 |
| - event = self.stream.fetchone() |
648 |
| - if self.isMySQL56AndMore(): |
649 |
| - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) |
650 |
| - else: |
651 |
| - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) |
652 |
| - self.assertIsInstance(event, UpdateRowsEvent) |
653 |
| - |
654 |
| - before_none_sources = event.rows[0].get("before_none_sources") |
655 |
| - after_none_sources = event.rows[0].get("after_none_sources") |
656 |
| - |
657 |
| - if before_none_sources: |
658 |
| - self.assertEqual(before_none_sources["col0"], "null") |
659 |
| - self.assertEqual(before_none_sources["col1"], "null") |
660 |
| - self.assertEqual(before_none_sources["col2"], "out of datetime2 range") |
661 |
| - self.assertEqual(before_none_sources["col3"], "null") |
662 |
| - self.assertEqual(before_none_sources["col4"], "null") |
663 |
| - |
664 |
| - if after_none_sources: |
665 |
| - self.assertEqual(after_none_sources["col0"], "null") |
666 |
| - self.assertEqual(after_none_sources["col1"], "null") |
667 |
| - self.assertEqual(after_none_sources["col2"], "null") |
668 |
| - self.assertEqual(after_none_sources["col3"], "out of date range") |
669 |
| - self.assertEqual(after_none_sources["col4"], "empty set") |
670 |
| - |
671 | 607 |
|
672 | 608 | class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
|
673 | 609 | def setUp(self):
|
@@ -1808,6 +1744,85 @@ def tearDown(self):
|
1808 | 1744 | super(TestOptionalMetaData, self).tearDown()
|
1809 | 1745 |
|
1810 | 1746 |
|
| 1747 | +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): |
| 1748 | + def setUp(self): |
| 1749 | + super(TestColumnValueNoneSources, self).setUp() |
| 1750 | + self.stream.close() |
| 1751 | + self.stream = BinLogStreamReader( |
| 1752 | + self.database, |
| 1753 | + server_id=1024, |
| 1754 | + only_events=(TableMapEvent,), |
| 1755 | + ) |
| 1756 | + if not self.isMySQL8014AndMore(): |
| 1757 | + self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") |
| 1758 | + self.execute("SET GLOBAL binlog_row_metadata='FULL';") |
| 1759 | + |
| 1760 | + def test_get_none(self): |
| 1761 | + self.stream.close() |
| 1762 | + self.stream = BinLogStreamReader( |
| 1763 | + self.database, |
| 1764 | + server_id=1024, |
| 1765 | + resume_stream=False, |
| 1766 | + only_events=[WriteRowsEvent], |
| 1767 | + ) |
| 1768 | + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" |
| 1769 | + self.execute(query) |
| 1770 | + query = ( |
| 1771 | + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" |
| 1772 | + ) |
| 1773 | + self.execute(query) |
| 1774 | + self.execute("COMMIT") |
| 1775 | + write_rows_event = self.stream.fetchone() |
| 1776 | + self.assertIsInstance(write_rows_event, WriteRowsEvent) |
| 1777 | + |
| 1778 | + none_sources = write_rows_event.rows[0].get("none_sources") |
| 1779 | + if none_sources: |
| 1780 | + self.assertEqual(none_sources["col1"], "null") |
| 1781 | + |
| 1782 | + def test_get_none_invalid(self): |
| 1783 | + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") |
| 1784 | + self.execute( |
| 1785 | + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" |
| 1786 | + ) |
| 1787 | + self.execute( |
| 1788 | + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" |
| 1789 | + ) |
| 1790 | + self.resetBinLog() |
| 1791 | + self.execute( |
| 1792 | + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" |
| 1793 | + ) |
| 1794 | + self.execute("COMMIT") |
| 1795 | + |
| 1796 | + self.assertIsInstance(self.stream.fetchone(), RotateEvent) |
| 1797 | + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) |
| 1798 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 1799 | + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) |
| 1800 | + |
| 1801 | + event = self.stream.fetchone() |
| 1802 | + if self.isMySQL56AndMore(): |
| 1803 | + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) |
| 1804 | + else: |
| 1805 | + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) |
| 1806 | + self.assertIsInstance(event, UpdateRowsEvent) |
| 1807 | + |
| 1808 | + before_none_sources = event.rows[0].get("before_none_sources") |
| 1809 | + after_none_sources = event.rows[0].get("after_none_sources") |
| 1810 | + |
| 1811 | + if before_none_sources: |
| 1812 | + self.assertEqual(before_none_sources["col0"], "null") |
| 1813 | + self.assertEqual(before_none_sources["col1"], "null") |
| 1814 | + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") |
| 1815 | + self.assertEqual(before_none_sources["col3"], "null") |
| 1816 | + self.assertEqual(before_none_sources["col4"], "null") |
| 1817 | + |
| 1818 | + if after_none_sources: |
| 1819 | + self.assertEqual(after_none_sources["col0"], "null") |
| 1820 | + self.assertEqual(after_none_sources["col1"], "null") |
| 1821 | + self.assertEqual(after_none_sources["col2"], "null") |
| 1822 | + self.assertEqual(after_none_sources["col3"], "out of date range") |
| 1823 | + self.assertEqual(after_none_sources["col4"], "empty set") |
| 1824 | + |
| 1825 | + |
1811 | 1826 | if __name__ == "__main__":
|
1812 | 1827 | import unittest
|
1813 | 1828 |
|
|
0 commit comments