diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java index df478da616d..491f1b14a15 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java @@ -65,13 +65,32 @@ public BinaryType() { this(DEFAULT_LENGTH); } + /** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */ + private BinaryType(int length, boolean isNullable) { + super(isNullable, DataTypeRoot.BINARY); + this.length = length; + } + + /** + * The SQL standard defines that character string literals are allowed to be zero-length strings + * (i.e., to contain no characters) even though it is not permitted to declare a type that is + * zero. For consistent behavior, the same logic applies to binary strings. + * + *
This method enables this special kind of binary string. + * + *
Zero-length binary strings have no serializable string representation. + */ + public static BinaryType ofEmptyLiteral() { + return new BinaryType(EMPTY_LITERAL_LENGTH, false); + } + public int getLength() { return length; } @Override public DataType copy(boolean isNullable) { - return new BinaryType(isNullable, length); + return new BinaryType(length, isNullable); } @Override diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java index a8ccaa322e6..8c2079649d6 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java @@ -64,13 +64,32 @@ public CharType() { this(DEFAULT_LENGTH); } + /** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */ + private CharType(int length, boolean isNullable) { + super(isNullable, DataTypeRoot.CHAR); + this.length = length; + } + + /** + * The SQL standard defines that character string literals are allowed to be zero-length strings + * (i.e., to contain no characters) even though it is not permitted to declare a type that is + * zero. + * + *
This method enables this special kind of character string. + * + *
Zero-length character strings have no serializable string representation.
+ */
+ public static CharType ofEmptyLiteral() {
+ return new CharType(EMPTY_LITERAL_LENGTH, false);
+ }
+
public int getLength() {
return length;
}
@Override
public DataType copy(boolean isNullable) {
- return new CharType(isNullable, length);
+ return new CharType(length, isNullable);
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
index a3bb16ea343..0e70ed6d91f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.mysql.utils;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
@@ -198,7 +200,9 @@ private static DataType convertFromColumn(Column column) {
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ(0);
case CHAR:
- return DataTypes.CHAR(column.length());
+ return column.length() > 0
+ ? DataTypes.CHAR(column.length())
+ : column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1);
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TINYTEXT:
@@ -218,7 +222,9 @@ private static DataType convertFromColumn(Column column) {
case MULTILINESTRING:
return DataTypes.STRING();
case BINARY:
- return DataTypes.BINARY(column.length());
+ return column.length() > 0
+ ? DataTypes.BINARY(column.length())
+ : column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1);
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case TINYBLOB:
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index e09ac58b405..5eb6ce0e598 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -31,6 +31,8 @@
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
@@ -301,6 +303,48 @@ public void testParseAlterStatement() throws Exception {
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols5", DataTypes.BOOLEAN())))));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "cols6", BinaryType.ofEmptyLiteral())))));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("cols7", DataTypes.BINARY(1))))));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "cols8", CharType.ofEmptyLiteral())))));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
}
List