diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java index 514b11b9699..e0cad75256e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlValidator.java @@ -50,6 +50,7 @@ public class MySqlValidator implements Validator { private static final String BINLOG_FORMAT_ROW = "ROW"; private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL"; + private static final String BINLOG_ROW_VALUE_OPTIONS = ""; private final Properties dbzProperties; private final MySqlSourceConfig sourceConfig; @@ -70,6 +71,7 @@ public void validate() { checkVersion(connection); checkBinlogFormat(connection); checkBinlogRowImage(connection); + checkBinlogRowValueOptions(connection); checkTimeZone(connection); } catch (SQLException ex) { throw new TableException( @@ -159,6 +161,27 @@ private void checkBinlogRowImage(JdbcConnection connection) throws SQLException } } + /** Check whether the binlog row value options is empty. */ + private void checkBinlogRowValueOptions(JdbcConnection connection) throws SQLException { + String rowValueOptions = + connection + .queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'binlog_row_value_options'", + rs -> rs.next() ? rs.getString(2) : "") + .trim() + .toUpperCase(); + // This setting was introduced in MySQL 8.0+ with default of empty string '' + // For older versions, assume empty string '' + if (!rowValueOptions.equals(BINLOG_ROW_VALUE_OPTIONS)) { + throw new ValidationException( + String.format( + "The MySQL server is configured with binlog_row_value_options %s rather than %s, which is " + + "required for this connector to work properly. Change the MySQL configuration to use a " + + "binlog_row_image='' and restart the connector.", + rowValueOptions, BINLOG_ROW_VALUE_OPTIONS)); + } + } + /** Check whether the server timezone matches the configured timezone. */ private void checkTimeZone(JdbcConnection connection) throws SQLException { String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone");