Skip to content

Commit

Permalink
[FLINK-34905][cdc-connector][mysql] Fix the default length of CHAR/BI…
Browse files Browse the repository at this point in the history
…NARY data type of Add column DDL (apache#3145)
  • Loading branch information
zhongqishang authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent d9eb607 commit 1cf1e8e
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,32 @@ public BinaryType() {
this(DEFAULT_LENGTH);
}

/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
private BinaryType(int length, boolean isNullable) {
super(isNullable, DataTypeRoot.BINARY);
this.length = length;
}

/**
* The SQL standard defines that character string literals are allowed to be zero-length strings
* (i.e., to contain no characters) even though it is not permitted to declare a type that is
* zero. For consistent behavior, the same logic applies to binary strings.
*
* <p>This method enables this special kind of binary string.
*
* <p>Zero-length binary strings have no serializable string representation.
*/
public static BinaryType ofEmptyLiteral() {
return new BinaryType(EMPTY_LITERAL_LENGTH, false);
}

public int getLength() {
return length;
}

@Override
public DataType copy(boolean isNullable) {
return new BinaryType(isNullable, length);
return new BinaryType(length, isNullable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,32 @@ public CharType() {
this(DEFAULT_LENGTH);
}

/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
private CharType(int length, boolean isNullable) {
super(isNullable, DataTypeRoot.CHAR);
this.length = length;
}

/**
* The SQL standard defines that character string literals are allowed to be zero-length strings
* (i.e., to contain no characters) even though it is not permitted to declare a type that is
* zero.
*
* <p>This method enables this special kind of character string.
*
* <p>Zero-length character strings have no serializable string representation.
*/
public static CharType ofEmptyLiteral() {
return new CharType(EMPTY_LITERAL_LENGTH, false);
}

public int getLength() {
return length;
}

@Override
public DataType copy(boolean isNullable) {
return new CharType(isNullable, length);
return new CharType(length, isNullable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.connectors.mysql.utils;

import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;

Expand Down Expand Up @@ -198,7 +200,9 @@ private static DataType convertFromColumn(Column column) {
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ(0);
case CHAR:
return DataTypes.CHAR(column.length());
return column.length() > 0
? DataTypes.CHAR(column.length())
: column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1);
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TINYTEXT:
Expand All @@ -218,7 +222,9 @@ private static DataType convertFromColumn(Column column) {
case MULTILINESTRING:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(column.length());
return column.length() > 0
? DataTypes.BINARY(column.length())
: column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1);
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case TINYBLOB:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
Expand Down Expand Up @@ -301,6 +303,48 @@ public void testParseAlterStatement() throws Exception {
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols5", DataTypes.BOOLEAN())))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"cols6", BinaryType.ofEmptyLiteral())))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols7", DataTypes.BINARY(1))))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"cols8", CharType.ofEmptyLiteral())))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
}
List<Event> actual = fetchResults(events, expected.size());
assertThat(actual).isEqualTo(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ public DataType deserialize(DataInputView source) throws IOException {
boolean isNullable = source.readBoolean();
switch (dataTypeClass) {
case BINARY:
return new BinaryType(isNullable, source.readInt());
int binaryLength = source.readInt();
return binaryLength == 0
? BinaryType.ofEmptyLiteral()
: new BinaryType(isNullable, binaryLength);
case ARRAY:
return new ArrayType(isNullable, this.deserialize(source));
case BOOLEAN:
Expand All @@ -197,7 +200,10 @@ public DataType deserialize(DataInputView source) throws IOException {
case VARBINARY:
return new VarBinaryType(isNullable, source.readInt());
case CHAR:
return new CharType(isNullable, source.readInt());
int charLength = source.readInt();
return charLength == 0
? CharType.ofEmptyLiteral()
: new CharType(isNullable, charLength);
case SMALLINT:
return new SmallIntType(isNullable);
case TIMESTAMP:
Expand Down

0 comments on commit 1cf1e8e

Please sign in to comment.