diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index 04a04fa151c..f92ff1c4140 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -198,7 +198,7 @@ private static DataType convertFromColumn(Column column) { ? DataTypes.TIMESTAMP_LTZ(column.length()) : DataTypes.TIMESTAMP_LTZ(0); case CHAR: - return DataTypes.CHAR(column.length()); + return DataTypes.CHAR(column.length() > 0 ? column.length() : 1); case VARCHAR: return DataTypes.VARCHAR(column.length()); case TINYTEXT: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 99e18bc436b..da820475e62 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -311,6 +311,16 @@ public void testParseAlterStatement() throws Exception { Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("cols6", DataTypes.BINARY(1)))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` CHAR NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("cols7", DataTypes.CHAR(1)))))); } List actual = fetchResults(events, expected.size()); assertThat(actual).isEqualTo(expected);