Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix-issue-2676] repair a snapshot-split bug: #2968

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition,
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
if (chunkEnd != null
&& ObjectUtils.dbCompare(
chunkEnd,
minMaxOfSplitColumn[1],
jdbcConnection,
tableId,
splitColumn.name())
<= 0) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(
jdbcConnection,
Expand Down Expand Up @@ -348,7 +355,7 @@ private Object nextChunkEnd(
return null;
}
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
if (ObjectUtils.dbCompare(chunkEnd, max, jdbcConnection, tableId, splitColumnName) >= 0) {
return null;
} else {
return chunkEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.flink.cdc.connectors.mysql.source.utils;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.StringUtils;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.SQLException;
import java.util.Objects;

/** Utilities for operation on {@link Object}. */
public class ObjectUtils {
Expand Down Expand Up @@ -89,6 +95,48 @@ public static int compare(Object obj1, Object obj2) {
}
}

/**
* Compares two comparable objects. When the compared objects are not instance of {@link
* Comparable} or instace of String, we will compare them in db, ignore the charset of table
* effects the table-split result exmaple: primaryKey:
* ['0000','1111','2222','3333','4444','aaaa','bbbb','cccc','ZZZZ'] collate: utf8mb4_general_ci
* when chunkSize = 3 we want : split1: ['0000','1111'] split2: ['3333','4444'] split3:
* ['aaaa','bbbb'] split3: ['cccc','ZZZZ'] but if we use a table with
* COLLATE='utf8mb4_general_ci' and compare them with {@link ObjectUtils#compare(Object,
* Object)}, we whill get: split1: ['0000','1111'] split2: ['3333','4444'] split3:
* ['aaaa','bbbb','cccc','ZZZZ'....] the split3 whill contains all of the remain rows
Comment on lines +106 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Object)}, we whill get: split1: ['0000','1111'] split2: ['3333','4444'] split3:
* ['aaaa','bbbb','cccc','ZZZZ'....] the split3 whill contains all of the remain rows
* Object)}, we will get: split1: ['0000','1111'] split2: ['3333','4444'] split3:
* ['aaaa','bbbb','cccc','ZZZZ'....] the split3 will contains all of the remain rows

*
* @return The value {@code 0} if {@code num1} is equal to the {@code num2}; a value less than
* {@code 0} if the {@code num1} is numerically less than the {@code num2}; and a value
* greater than {@code 0} if the {@code num1} is numerically greater than the {@code num2}.
* @throws ClassCastException if the compared objects are not instance of {@link Comparable} or
* not <i>mutually comparable</i> (for example, strings and integers).
*/
@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this suppress?

public static int dbCompare(
Object obj1,
Object obj2,
JdbcConnection jdbcConnection,
TableId tableId,
String splitColumn)
throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function throw an SQLException since there's a fallback logic (normal compare(obj1, obj2)?

if (Objects.equals(obj1, obj2)) {
return 0;
}
if (obj1 instanceof String && obj2 instanceof String) {
// if instance of String, we will compare them in db, to ignore effects of the charset
// and collation of the column
String columnCollation =
StatementUtils.queryColumnCollation(jdbcConnection, tableId, splitColumn);
Comment on lines +129 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC dbCompare might be called many times during the chunk splitting process, and my concern is performing a JDBC query execution every time might greatly deteriorate the performance. Is it possible to implement a native comparison compatible with MySQL?

Copy link
Author

@AidenPerce AidenPerce May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. during the chunk splitting process, db queries have been changed from once to three times. But at this stage, I cannot fully cover different MySQL versions as well as various database character sets and collations. So I temporarily chose a relatively simple approach to solve it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, this solution looks reasonable.

if (!StringUtils.isEmpty(columnCollation)) {
return StatementUtils.compareValueByQuery(
obj1, obj2, jdbcConnection, columnCollation);
}
// cannot get column collation, use default compare
}
return compare(obj1, obj2);
}

/**
* Compares two Double numeric object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,72 @@ private static String buildSelectWithBoundaryRowLimits(
private static String quotedTableIdString(TableId tableId) {
return tableId.toQuotedString('`');
}

/** query 'collate' of the column. */
public static String queryColumnCollation(
JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException {
final String template = "SHOW FULL COLUMNS FROM %s WHERE Field = '%s';";
String querySql = String.format(template, quotedTableIdString(tableId), columnName);
return jdbc.queryAndMap(
querySql,
rs -> {
if (rs.next()) {
return rs.getString("collation");
}
throw new SQLException(
String.format("No result returned for query: %s", querySql));
});
}

/**
* @param jdbc
* @param o1
* @param o2
* @return the value {@code 0} if {@code str1} is equal to the {@code str2} in mysql; a value
* {@code -1} if the {@code str1} is less than the {@code str2} in mysql; and a value {@code
* 1} if the {@code str1} is numerically greater than the {@code str2} in mysql.
* @throws SQLException
*/
public static int compareValueByQuery(
Object o1, Object o2, JdbcConnection jdbc, String columnCollation) throws SQLException {
// if str1.equals(str2) we don't need to query mysql
if (o1 != null && o1.equals(o2)) {
return 0;
}
String charset = columnCollation.split("_")[0];

final String compareQueryTemplate =
"SELECT "
+ "CASE WHEN _%s ? COLLATE %s > _%s ? COLLATE %s THEN 1 "
+ "WHEN _%s ? COLLATE %s < _%s ? COLLATE %s COLLATE %s THEN -1 "
+ "ELSE 0 END";
String compareSql =
String.format(
compareQueryTemplate,
charset,
columnCollation,
charset,
columnCollation,
charset,
columnCollation,
charset,
columnCollation,
columnCollation);

return jdbc.prepareQueryAndMap(
compareSql,
(preparedStatement) -> {
preparedStatement.setObject(1, o1);
preparedStatement.setObject(2, o2);
preparedStatement.setObject(3, o1);
preparedStatement.setObject(4, o2);
},
rs -> {
if (!rs.next()) {
throw new SQLException(
String.format("No result returned for query: %s", compareSql));
}
return rs.getInt(1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,56 @@

package org.apache.flink.cdc.connectors.mysql.source.utils;

import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;

import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.relational.TableId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;

/** Tests for {@link org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils}. */
public class ObjectUtilsTest {
/** Tests for {@link ObjectUtils}. */
public class ObjectUtilsTest extends MySqlSourceTestBase {

private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private static MySqlConnection mySqlConnection;

@BeforeClass
public static void init() throws SQLException {
customerDatabase.createAndInitialize();
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customerDatabase.getUsername());
properties.put("database.password", customerDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
mySqlConnection = DebeziumUtils.createMySqlConnection(configuration, new Properties());
}

@AfterClass
public static void afterClass() throws Exception {
if (mySqlConnection != null) {
mySqlConnection.close();
}
}

@Test
public void testMinus() {
Expand All @@ -42,4 +84,60 @@ public void testMinus() {
new BigDecimal("99.12344"),
ObjectUtils.minus(new BigDecimal("100.12345"), new BigDecimal("1.00001")));
}

@Test
public void testDbCompare() throws SQLException {
TableId tableId =
new TableId(
customerDatabase.getDatabaseName(),
null,
"unevenly_shopping_cart_utf8_bin");
int resultBin1 =
ObjectUtils.dbCompare("aaaa", "zzzz", mySqlConnection, tableId, "pk_varchar");
int resultBin2 =
ObjectUtils.dbCompare("zzzz", "ZZZZ", mySqlConnection, tableId, "pk_varchar");
int resultBin3 = ObjectUtils.dbCompare("9", "a", mySqlConnection, tableId, "pk_varchar");
Assertions.assertEquals(-1, resultBin1);
Assertions.assertEquals(1, resultBin2);
Assertions.assertEquals(-1, resultBin3);

TableId tableId2 =
new TableId(
customerDatabase.getDatabaseName(), null, "unevenly_shopping_cart_utf8_ci");
int resultCi1 =
ObjectUtils.dbCompare("aaaa", "zzzz", mySqlConnection, tableId2, "pk_varchar");
int resultCi2 =
ObjectUtils.dbCompare("zzzz", "ZZZZ", mySqlConnection, tableId2, "pk_varchar");
int resultCi3 =
ObjectUtils.dbCompare("ZZZZ", "aaaa", mySqlConnection, tableId2, "pk_varchar");
int resultCi4 = ObjectUtils.dbCompare("9", "a", mySqlConnection, tableId2, "pk_varchar");
Assertions.assertEquals(-1, resultCi1);
Assertions.assertEquals(0, resultCi2);
Assertions.assertEquals(1, resultCi3);
Assertions.assertEquals(-1, resultCi4);
}

public static MySqlSourceConfig getConfig(
UniqueDatabase database,
String[] captureTables,
int splitSize,
boolean skipSnapshotBackfill) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.databaseList(database.getDatabaseName())
.tableList(captureTableIds)
.serverId("1001-1002")
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(database.getUsername())
.splitSize(splitSize)
.fetchSize(2)
.password(database.getPassword())
.skipSnapshotBackfill(skipSnapshotBackfill)
.createConfig(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);

-- create a table to test unevently-split a table with varchar primary key
CREATE TABLE unevenly_shopping_cart_utf8_bin (
pk_varchar varchar(100) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number INTEGER DEFAULT ' 123 '
) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

INSERT INTO unevenly_shopping_cart_utf8_bin
VALUES ('1111','user1','Shanghai',123567),
('2222','user2','Shanghai',123567),
('3333','user1','Shanghai',123567),
('aaaa','user2','Shanghai',123567),
('cccc','user1','Shanghai',123567),
('zzzz','user2','Shanghai',123567);

CREATE TABLE unevenly_shopping_cart_utf8_ci (
pk_varchar varchar(100) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number INTEGER DEFAULT ' 123 '
) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

INSERT INTO unevenly_shopping_cart_utf8_ci
VALUES ('1111','user1','Shanghai',123567),
('2222','user2','Shanghai',123567),
('3333','user1','Shanghai',123567),
('aaaa','user2','Shanghai',123567),
('cccc','user1','Shanghai',123567),
('zzzz','user2','Shanghai',123567);
Loading