From 2c5e06ec381dcc0e5fc818a176d43cfe45883154 Mon Sep 17 00:00:00 2001 From: AidenPerce Date: Fri, 5 Jan 2024 09:09:39 +0800 Subject: [PATCH 1/3] rebase from ververica to apache --- .../source/assigners/MySqlChunkSplitter.java | 11 ++- .../mysql/source/utils/ObjectUtils.java | 48 +++++++++ .../mysql/source/utils/StatementUtils.java | 68 +++++++++++++ .../mysql/source/utils/ObjectUtilsTest.java | 99 +++++++++++++++++++ .../src/test/resources/ddl/customer.sql | 31 ++++++ 5 files changed, 255 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index ec0bab77fe7..471521a4499 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -182,7 +182,14 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, chunkSize); // may sleep a while to avoid DDOS on MySQL server maySleep(nextChunkId, tableId); - if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) { + if (chunkEnd != null + && ObjectUtils.dbCompare( + chunkEnd, + minMaxOfSplitColumn[1], + jdbcConnection, + tableId, + splitColumn.name()) + <= 0) { nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd); return createSnapshotSplit( jdbcConnection, @@ -348,7 +355,7 @@ private Object nextChunkEnd( return null; } } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { + if (ObjectUtils.dbCompare(chunkEnd, max, jdbcConnection, tableId, splitColumnName) >= 0) { return null; } else { return chunkEnd; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java index a3a7281a857..a1b437e113f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java @@ -17,8 +17,14 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import org.apache.commons.lang3.StringUtils; + import java.math.BigDecimal; import java.math.BigInteger; +import java.sql.SQLException; +import java.util.Objects; /** Utilities for operation on {@link Object}. */ public class ObjectUtils { @@ -89,6 +95,48 @@ public static int compare(Object obj1, Object obj2) { } } + /** + * Compares two comparable objects. When the compared objects are not instance of {@link + * Comparable} or instace of String, we will compare them in db, ignore the charset of table + * effects the table-split result exmaple: primaryKey: + * ['0000','1111','2222','3333','4444','aaaa','bbbb','cccc','ZZZZ'] collate: utf8mb4_general_ci + * when chunkSize = 3 we want : split1: ['0000','1111'] split2: ['3333','4444'] split3: + * ['aaaa','bbbb'] split3: ['cccc','ZZZZ'] but if we use a table with + * COLLATE='utf8mb4_general_ci' and compare them with {@link ObjectUtils#compare(Object, + * Object)}, we whill get: split1: ['0000','1111'] split2: ['3333','4444'] split3: + * ['aaaa','bbbb','cccc','ZZZZ'....] the split3 whill contains all of the remain rows + * + * @return The value {@code 0} if {@code num1} is equal to the {@code num2}; a value less than + * {@code 0} if the {@code num1} is numerically less than the {@code num2}; and a value + * greater than {@code 0} if the {@code num1} is numerically greater than the {@code num2}. + * @throws ClassCastException if the compared objects are not instance of {@link Comparable} or + * not mutually comparable (for example, strings and integers). + */ + @SuppressWarnings("unchecked") + public static int dbCompare( + Object obj1, + Object obj2, + JdbcConnection jdbcConnection, + TableId tableId, + String splitColumn) + throws SQLException { + if (Objects.equals(obj1, obj2)) { + return 0; + } + if (obj1 instanceof String && obj2 instanceof String) { + // if instance of String, we will compare them in db, to ignore effects of the charset + // and collation of the column + String columnCollation = + StatementUtils.queryColumnCollation(jdbcConnection, tableId, splitColumn); + if (!StringUtils.isEmpty(columnCollation)) { + return StatementUtils.compareValueByQuery( + obj1, obj2, jdbcConnection, columnCollation); + } + // cannot get column collation, use default compare + } + return compare(obj1, obj2); + } + /** * Compares two Double numeric object. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index 64787eb9ac7..ef856af8eb9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -322,4 +322,72 @@ private static String buildSelectWithBoundaryRowLimits( private static String quotedTableIdString(TableId tableId) { return tableId.toQuotedString('`'); } + + /** query 'collate' of the column. */ + public static String queryColumnCollation( + JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { + final String template = "SHOW FULL COLUMNS FROM %s WHERE Field = '%s';"; + String querySql = String.format(template, quotedTableIdString(tableId), columnName); + return jdbc.queryAndMap( + querySql, + rs -> { + if (rs.next()) { + return rs.getString("collation"); + } + throw new SQLException( + String.format("No result returned for query: %s", querySql)); + }); + } + + /** + * @param jdbc + * @param o1 + * @param o2 + * @return the value {@code 0} if {@code str1} is equal to the {@code str2} in mysql; a value + * {@code -1} if the {@code str1} is less than the {@code str2} in mysql; and a value {@code + * 1} if the {@code str1} is numerically greater than the {@code str2} in mysql. + * @throws SQLException + */ + public static int compareValueByQuery( + Object o1, Object o2, JdbcConnection jdbc, String columnCollation) throws SQLException { + // if str1.equals(str2) we don't need to query mysql + if (o1 != null && o1.equals(o2)) { + return 0; + } + String charset = columnCollation.split("_")[0]; + + final String compareQueryTemplate = + "SELECT " + + "CASE WHEN _%s ? COLLATE %s > _%s ? COLLATE %s THEN 1 " + + "WHEN _%s ? COLLATE %s < _%s ? COLLATE %s COLLATE %s THEN -1 " + + "ELSE 0 END"; + String compareSql = + String.format( + compareQueryTemplate, + charset, + columnCollation, + charset, + columnCollation, + charset, + columnCollation, + charset, + columnCollation, + columnCollation); + + return jdbc.prepareQueryAndMap( + compareSql, + (preparedStatement) -> { + preparedStatement.setObject(1, o1); + preparedStatement.setObject(2, o2); + preparedStatement.setObject(3, o1); + preparedStatement.setObject(4, o2); + }, + rs -> { + if (!rs.next()) { + throw new SQLException( + String.format("No result returned for query: %s", compareSql)); + } + return rs.getInt(1); + }); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java index 8a735847b0e..53600fa2d70 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java @@ -17,12 +17,55 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; +import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.relational.TableId; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.math.BigDecimal; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; +/** Tests for {@link ObjectUtils}. */ +public class ObjectUtilsTest extends MySqlSourceTestBase { + + private static final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + private static MySqlConnection mySqlConnection; + + @BeforeClass + public static void init() throws SQLException { + customerDatabase.createAndInitialize(); + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customerDatabase.getUsername()); + properties.put("database.password", customerDatabase.getPassword()); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + mySqlConnection = DebeziumUtils.createMySqlConnection(configuration, new Properties()); + } + + @AfterClass + public static void afterClass() throws Exception { + if (mySqlConnection != null) { + mySqlConnection.close(); + } + } /** Tests for {@link org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils}. */ public class ObjectUtilsTest { @@ -42,4 +85,60 @@ public void testMinus() { new BigDecimal("99.12344"), ObjectUtils.minus(new BigDecimal("100.12345"), new BigDecimal("1.00001"))); } + + @Test + public void testDbCompare() throws SQLException { + TableId tableId = + new TableId( + customerDatabase.getDatabaseName(), + null, + "unevenly_shopping_cart_utf8_bin"); + int resultBin1 = + ObjectUtils.dbCompare("aaaa", "zzzz", mySqlConnection, tableId, "pk_varchar"); + int resultBin2 = + ObjectUtils.dbCompare("zzzz", "ZZZZ", mySqlConnection, tableId, "pk_varchar"); + int resultBin3 = ObjectUtils.dbCompare("9", "a", mySqlConnection, tableId, "pk_varchar"); + Assertions.assertEquals(-1, resultBin1); + Assertions.assertEquals(1, resultBin2); + Assertions.assertEquals(-1, resultBin3); + + TableId tableId2 = + new TableId( + customerDatabase.getDatabaseName(), null, "unevenly_shopping_cart_utf8_ci"); + int resultCi1 = + ObjectUtils.dbCompare("aaaa", "zzzz", mySqlConnection, tableId2, "pk_varchar"); + int resultCi2 = + ObjectUtils.dbCompare("zzzz", "ZZZZ", mySqlConnection, tableId2, "pk_varchar"); + int resultCi3 = + ObjectUtils.dbCompare("ZZZZ", "aaaa", mySqlConnection, tableId2, "pk_varchar"); + int resultCi4 = ObjectUtils.dbCompare("9", "a", mySqlConnection, tableId2, "pk_varchar"); + Assertions.assertEquals(-1, resultCi1); + Assertions.assertEquals(0, resultCi2); + Assertions.assertEquals(1, resultCi3); + Assertions.assertEquals(-1, resultCi4); + } + + public static MySqlSourceConfig getConfig( + UniqueDatabase database, + String[] captureTables, + int splitSize, + boolean skipSnapshotBackfill) { + String[] captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> database.getDatabaseName() + "." + tableName) + .toArray(String[]::new); + + return new MySqlSourceConfigFactory() + .databaseList(database.getDatabaseName()) + .tableList(captureTableIds) + .serverId("1001-1002") + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(database.getUsername()) + .splitSize(splitSize) + .fetchSize(2) + .password(database.getPassword()) + .skipSnapshotBackfill(skipSnapshotBackfill) + .createConfig(0); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index e4df63f1a33..885b2e6f1cb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -326,3 +326,34 @@ CREATE TABLE default_value_test ( INSERT INTO default_value_test VALUES (1,'user1','Shanghai',123567), (2,'user2','Shanghai',123567); + +-- create a table to test unevently-split a table with varchar primary key +CREATE TABLE unevenly_shopping_cart_utf8_bin ( + pk_varchar varchar(100) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number INTEGER DEFAULT ' 123 ' +) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +INSERT INTO unevenly_shopping_cart_utf8_bin +VALUES ('1111','user1','Shanghai',123567), + ('2222','user2','Shanghai',123567), + ('3333','user1','Shanghai',123567), + ('aaaa','user2','Shanghai',123567), + ('cccc','user1','Shanghai',123567), + ('zzzz','user2','Shanghai',123567); + +CREATE TABLE unevenly_shopping_cart_utf8_ci ( + pk_varchar varchar(100) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number INTEGER DEFAULT ' 123 ' +) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +INSERT INTO unevenly_shopping_cart_utf8_ci +VALUES ('1111','user1','Shanghai',123567), + ('2222','user2','Shanghai',123567), + ('3333','user1','Shanghai',123567), + ('aaaa','user2','Shanghai',123567), + ('cccc','user1','Shanghai',123567), + ('zzzz','user2','Shanghai',123567); From 197e5bdddd29c857ee482a22cd6d8c16be06f779 Mon Sep 17 00:00:00 2001 From: AidenPerce Date: Mon, 6 May 2024 11:00:19 +0800 Subject: [PATCH 2/3] [fix] repair a snapshot-split bug: resolved the conflict of org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtilsTest --- .../mysql/source/utils/ObjectUtilsTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java index 53600fa2d70..dc6839006a3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java @@ -17,11 +17,11 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; -import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; -import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; -import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import org.junit.AfterClass; @@ -66,8 +66,6 @@ public static void afterClass() throws Exception { mySqlConnection.close(); } } -/** Tests for {@link org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils}. */ -public class ObjectUtilsTest { @Test public void testMinus() { From 075cde323bf25e56c4d6c79187fa46451a936d26 Mon Sep 17 00:00:00 2001 From: AidenPerce Date: Mon, 6 May 2024 15:31:38 +0800 Subject: [PATCH 3/3] [fix] repair a snapshot-split bug: resolved the conflict of org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtilsTest --- .../flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java index dc6839006a3..697024afb91 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtilsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; + import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import org.junit.AfterClass;