From 36eaa8f3c6afa1448cb91b5e5ba6fdc95d057876 Mon Sep 17 00:00:00 2001 From: SML0127 Date: Fri, 15 Mar 2024 12:53:53 +0900 Subject: [PATCH 1/3] ADD: implemented checkBinlogRowValueOptions within MySqlValidator --- .../cdc/connectors/mysql/MySqlValidator.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java index 514b11b9699..e0cad75256e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java @@ -50,6 +50,7 @@ public class MySqlValidator implements Validator { private static final String BINLOG_FORMAT_ROW = "ROW"; private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL"; + private static final String BINLOG_ROW_VALUE_OPTIONS = ""; private final Properties dbzProperties; private final MySqlSourceConfig sourceConfig; @@ -70,6 +71,7 @@ public void validate() { checkVersion(connection); checkBinlogFormat(connection); checkBinlogRowImage(connection); + checkBinlogRowValueOptions(connection); checkTimeZone(connection); } catch (SQLException ex) { throw new TableException( @@ -159,6 +161,27 @@ private void checkBinlogRowImage(JdbcConnection connection) throws SQLException } } + /** Check whether the binlog row value options is empty. */ + private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLException { + String rowValueOptions = + connection + .queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'", + rs -> rs.next() ? rs.getString(2) : "") + .trim() + .toUpperCase(); + // This setting was introduced in MySQL 8.0+ with default of empty string '' + // For older versions, assume empty string '' + if (!rowValueOptions.equals(BINLOG_ROW_VALUE_OPTIONS)) { + throw new ValidationException( + String.format( + "The MySQL server is configured with binlog_row_value_options %s rather than %s, which is " + + "required for this connector to work properly. Change the MySQL configuration to use a " + + "binlog_row_image='' and restart the connector.", + rowValueOptions, BINLOG_ROW_VALUE_OPTIONS)); + } + } + /** Check whether the server timezone matches the configured timezone. */ private void checkTimeZone(JdbcConnection connection) throws SQLException { String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone"); From ae1d27ad73fd627b1d63378335a0319c58a34dbb Mon Sep 17 00:00:00 2001 From: SML0127 Date: Wed, 10 Apr 2024 19:48:58 +0900 Subject: [PATCH 2/3] EDIT: updated log message for checkBinlogRowValueOptions --- .../apache/flink/cdc/connectors/mysql/MySqlValidator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java index e0cad75256e..cc9fbd72f2e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java @@ -175,9 +175,9 @@ private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLExc if (!rowValueOptions.equals(BINLOG_ROW_VALUE_OPTIONS)) { throw new ValidationException( String.format( - "The MySQL server is configured with binlog_row_value_options %s rather than %s, which is " - + "required for this connector to work properly. Change the MySQL configuration to use a " - + "binlog_row_image='' and restart the connector.", + "The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events" + + "for the mysql cdc connector. Please remove the binlog_row_value_options setting in the MySQL server and rerun the job." + + "See more details at https://dev.mysql.com/doc/refman/8.0/en/replication-features-json.html.", rowValueOptions, BINLOG_ROW_VALUE_OPTIONS)); } } From 7efd972e1de7fabe7ecce54ca707a39136442509 Mon Sep 17 00:00:00 2001 From: SML0127 Date: Thu, 11 Apr 2024 13:18:49 +0900 Subject: [PATCH 3/3] EDIT: renamed BINLOG_ROW_VALUE_OPTIONS -> DEFAULT_BINLOG_ROW_VALUE_OPTIONS --- .../flink/cdc/connectors/mysql/MySqlValidator.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java index cc9fbd72f2e..f97ef8ea7b6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java @@ -50,7 +50,7 @@ public class MySqlValidator implements Validator { private static final String BINLOG_FORMAT_ROW = "ROW"; private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL"; - private static final String BINLOG_ROW_VALUE_OPTIONS = ""; + private static final String DEFAULT_BINLOG_ROW_VALUE_OPTIONS = ""; private final Properties dbzProperties; private final MySqlSourceConfig sourceConfig; @@ -167,18 +167,21 @@ private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLExc connection .queryAndMap( "SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'", - rs -> rs.next() ? rs.getString(2) : "") + rs -> + rs.next() + ? rs.getString(2) + : DEFAULT_BINLOG_ROW_VALUE_OPTIONS) .trim() .toUpperCase(); // This setting was introduced in MySQL 8.0+ with default of empty string '' // For older versions, assume empty string '' - if (!rowValueOptions.equals(BINLOG_ROW_VALUE_OPTIONS)) { + if (!DEFAULT_BINLOG_ROW_VALUE_OPTIONS.equals(rowValueOptions)) { throw new ValidationException( String.format( - "The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events" + "The MySQL server is configured with binlog_row_value_options=%s, which is possible to cause losing some binlog events " + "for the mysql cdc connector. Please remove the binlog_row_value_options setting in the MySQL server and rerun the job." + "See more details at https://dev.mysql.com/doc/refman/8.0/en/replication-features-json.html.", - rowValueOptions, BINLOG_ROW_VALUE_OPTIONS)); + rowValueOptions)); } }