From 214d37853339d9381ef239b338638fc351f60a18 Mon Sep 17 00:00:00 2001 From: zhongqs Date: Tue, 19 Mar 2024 20:23:05 +0800 Subject: [PATCH 1/2] [hotfix][cdc-connector][mysql] Fix the default length of CHAR/BINARY data type of Add column DDL --- .../mysql/utils/MySqlTypeUtils.java | 6 ++++-- .../mysql/source/MySqlPipelineITCase.java | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index a3bb16ea343..e38553a0fca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -198,7 +198,7 @@ private static DataType convertFromColumn(Column column) { ? DataTypes.TIMESTAMP_LTZ(column.length()) : DataTypes.TIMESTAMP_LTZ(0); case CHAR: - return DataTypes.CHAR(column.length()); + return column.length() > 0 ? DataTypes.CHAR(column.length()) : DataTypes.CHAR(1); case VARCHAR: return DataTypes.VARCHAR(column.length()); case TINYTEXT: @@ -218,7 +218,9 @@ private static DataType convertFromColumn(Column column) { case MULTILINESTRING: return DataTypes.STRING(); case BINARY: - return DataTypes.BINARY(column.length()); + return column.length() > 0 + ? DataTypes.BINARY(column.length()) + : DataTypes.BINARY(1); case VARBINARY: return DataTypes.VARBINARY(column.length()); case TINYBLOB: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index e09ac58b405..da820475e62 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -301,6 +301,26 @@ public void testParseAlterStatement() throws Exception { Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("cols5", DataTypes.BOOLEAN()))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("cols6", DataTypes.BINARY(1)))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` CHAR NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("cols7", DataTypes.CHAR(1)))))); } List actual = fetchResults(events, expected.size()); assertThat(actual).isEqualTo(expected); From a93e001e61feeccea2e72ad6afadc8fd921cd311 Mon Sep 17 00:00:00 2001 From: Qishang Zhong Date: Wed, 10 Apr 2024 21:27:02 +0800 Subject: [PATCH 2/2] [FLINK-34905] Support `CHAR(0)` and `BINARY(0)` in ALTER SQL --- .../flink/cdc/common/types/BinaryType.java | 21 +++++++++++- .../flink/cdc/common/types/CharType.java | 21 +++++++++++- .../mysql/utils/MySqlTypeUtils.java | 8 +++-- .../mysql/source/MySqlPipelineITCase.java | 32 ++++++++++++++++--- .../serializer/schema/DataTypeSerializer.java | 10 ++++-- 5 files changed, 82 insertions(+), 10 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java index df478da616d..491f1b14a15 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java @@ -65,13 +65,32 @@ public BinaryType() { this(DEFAULT_LENGTH); } + /** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */ + private BinaryType(int length, boolean isNullable) { + super(isNullable, DataTypeRoot.BINARY); + this.length = length; + } + + /** + * The SQL standard defines that character string literals are allowed to be zero-length strings + * (i.e., to contain no characters) even though it is not permitted to declare a type that is + * zero. For consistent behavior, the same logic applies to binary strings. + * + *

This method enables this special kind of binary string. + * + *

Zero-length binary strings have no serializable string representation. + */ + public static BinaryType ofEmptyLiteral() { + return new BinaryType(EMPTY_LITERAL_LENGTH, false); + } + public int getLength() { return length; } @Override public DataType copy(boolean isNullable) { - return new BinaryType(isNullable, length); + return new BinaryType(length, isNullable); } @Override diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java index a8ccaa322e6..8c2079649d6 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java @@ -64,13 +64,32 @@ public CharType() { this(DEFAULT_LENGTH); } + /** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */ + private CharType(int length, boolean isNullable) { + super(isNullable, DataTypeRoot.CHAR); + this.length = length; + } + + /** + * The SQL standard defines that character string literals are allowed to be zero-length strings + * (i.e., to contain no characters) even though it is not permitted to declare a type that is + * zero. + * + *

This method enables this special kind of character string. + * + *

Zero-length character strings have no serializable string representation. + */ + public static CharType ofEmptyLiteral() { + return new CharType(EMPTY_LITERAL_LENGTH, false); + } + public int getLength() { return length; } @Override public DataType copy(boolean isNullable) { - return new CharType(isNullable, length); + return new CharType(length, isNullable); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index e38553a0fca..0e70ed6d91f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.mysql.utils; +import org.apache.flink.cdc.common.types.BinaryType; +import org.apache.flink.cdc.common.types.CharType; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; @@ -198,7 +200,9 @@ private static DataType convertFromColumn(Column column) { ? DataTypes.TIMESTAMP_LTZ(column.length()) : DataTypes.TIMESTAMP_LTZ(0); case CHAR: - return column.length() > 0 ? DataTypes.CHAR(column.length()) : DataTypes.CHAR(1); + return column.length() > 0 + ? DataTypes.CHAR(column.length()) + : column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1); case VARCHAR: return DataTypes.VARCHAR(column.length()); case TINYTEXT: @@ -220,7 +224,7 @@ private static DataType convertFromColumn(Column column) { case BINARY: return column.length() > 0 ? DataTypes.BINARY(column.length()) - : DataTypes.BINARY(1); + : column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1); case VARBINARY: return DataTypes.VARBINARY(column.length()); case TINYBLOB: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index da820475e62..5eb6ce0e598 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -31,6 +31,8 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.BinaryType; +import org.apache.flink.cdc.common.types.CharType; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; @@ -303,24 +305,46 @@ public void testParseAlterStatement() throws Exception { Column.physicalColumn("cols5", DataTypes.BOOLEAN()))))); statement.execute( String.format( - "ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY NULL;", + "ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;", inventoryDatabase.getDatabaseName())); expected.add( new AddColumnEvent( tableId, Collections.singletonList( new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("cols6", DataTypes.BINARY(1)))))); + Column.physicalColumn( + "cols6", BinaryType.ofEmptyLiteral()))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("cols7", DataTypes.BINARY(1)))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "cols8", CharType.ofEmptyLiteral()))))); statement.execute( String.format( - "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` CHAR NULL;", + "ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;", inventoryDatabase.getDatabaseName())); expected.add( new AddColumnEvent( tableId, Collections.singletonList( new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("cols7", DataTypes.CHAR(1)))))); + Column.physicalColumn("cols9", DataTypes.CHAR(1)))))); } List actual = fetchResults(events, expected.size()); assertThat(actual).isEqualTo(expected); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java index d977ef276cf..5d6779cc0f8 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java @@ -183,7 +183,10 @@ public DataType deserialize(DataInputView source) throws IOException { boolean isNullable = source.readBoolean(); switch (dataTypeClass) { case BINARY: - return new BinaryType(isNullable, source.readInt()); + int binaryLength = source.readInt(); + return binaryLength == 0 + ? BinaryType.ofEmptyLiteral() + : new BinaryType(isNullable, binaryLength); case ARRAY: return new ArrayType(isNullable, this.deserialize(source)); case BOOLEAN: @@ -197,7 +200,10 @@ public DataType deserialize(DataInputView source) throws IOException { case VARBINARY: return new VarBinaryType(isNullable, source.readInt()); case CHAR: - return new CharType(isNullable, source.readInt()); + int charLength = source.readInt(); + return charLength == 0 + ? CharType.ofEmptyLiteral() + : new CharType(isNullable, charLength); case SMALLINT: return new SmallIntType(isNullable); case TIMESTAMP: