Skip to content

Commit 18dd51f

Browse files
authored
[FLINK-34905][cdc-connector][mysql] Fix the default length of CHAR/BINARY data type of Add column DDL (#3145)
1 parent abb98ee commit 18dd51f

File tree

5 files changed

+100
-6
lines changed

5 files changed

+100
-6
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,32 @@ public BinaryType() {
6565
this(DEFAULT_LENGTH);
6666
}
6767

68+
/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
69+
private BinaryType(int length, boolean isNullable) {
70+
super(isNullable, DataTypeRoot.BINARY);
71+
this.length = length;
72+
}
73+
74+
/**
75+
* The SQL standard defines that character string literals are allowed to be zero-length strings
76+
* (i.e., to contain no characters) even though it is not permitted to declare a type that is
77+
* zero. For consistent behavior, the same logic applies to binary strings.
78+
*
79+
* <p>This method enables this special kind of binary string.
80+
*
81+
* <p>Zero-length binary strings have no serializable string representation.
82+
*/
83+
public static BinaryType ofEmptyLiteral() {
84+
return new BinaryType(EMPTY_LITERAL_LENGTH, false);
85+
}
86+
6887
public int getLength() {
6988
return length;
7089
}
7190

7291
@Override
7392
public DataType copy(boolean isNullable) {
74-
return new BinaryType(isNullable, length);
93+
return new BinaryType(length, isNullable);
7594
}
7695

7796
@Override

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,32 @@ public CharType() {
6464
this(DEFAULT_LENGTH);
6565
}
6666

67+
/** Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}. */
68+
private CharType(int length, boolean isNullable) {
69+
super(isNullable, DataTypeRoot.CHAR);
70+
this.length = length;
71+
}
72+
73+
/**
74+
* The SQL standard defines that character string literals are allowed to be zero-length strings
75+
* (i.e., to contain no characters) even though it is not permitted to declare a type that is
76+
* zero.
77+
*
78+
* <p>This method enables this special kind of character string.
79+
*
80+
* <p>Zero-length character strings have no serializable string representation.
81+
*/
82+
public static CharType ofEmptyLiteral() {
83+
return new CharType(EMPTY_LITERAL_LENGTH, false);
84+
}
85+
6786
public int getLength() {
6887
return length;
6988
}
7089

7190
@Override
7291
public DataType copy(boolean isNullable) {
73-
return new CharType(isNullable, length);
92+
return new CharType(length, isNullable);
7493
}
7594

7695
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.utils;
1919

20+
import org.apache.flink.cdc.common.types.BinaryType;
21+
import org.apache.flink.cdc.common.types.CharType;
2022
import org.apache.flink.cdc.common.types.DataType;
2123
import org.apache.flink.cdc.common.types.DataTypes;
2224

@@ -198,7 +200,9 @@ private static DataType convertFromColumn(Column column) {
198200
? DataTypes.TIMESTAMP_LTZ(column.length())
199201
: DataTypes.TIMESTAMP_LTZ(0);
200202
case CHAR:
201-
return DataTypes.CHAR(column.length());
203+
return column.length() > 0
204+
? DataTypes.CHAR(column.length())
205+
: column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1);
202206
case VARCHAR:
203207
return DataTypes.VARCHAR(column.length());
204208
case TINYTEXT:
@@ -218,7 +222,9 @@ private static DataType convertFromColumn(Column column) {
218222
case MULTILINESTRING:
219223
return DataTypes.STRING();
220224
case BINARY:
221-
return DataTypes.BINARY(column.length());
225+
return column.length() > 0
226+
? DataTypes.BINARY(column.length())
227+
: column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1);
222228
case VARBINARY:
223229
return DataTypes.VARBINARY(column.length());
224230
case TINYBLOB:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

+44
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.flink.cdc.common.schema.Column;
3232
import org.apache.flink.cdc.common.schema.Schema;
3333
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
34+
import org.apache.flink.cdc.common.types.BinaryType;
35+
import org.apache.flink.cdc.common.types.CharType;
3436
import org.apache.flink.cdc.common.types.DataType;
3537
import org.apache.flink.cdc.common.types.DataTypes;
3638
import org.apache.flink.cdc.common.types.RowType;
@@ -301,6 +303,48 @@ public void testParseAlterStatement() throws Exception {
301303
Collections.singletonList(
302304
new AddColumnEvent.ColumnWithPosition(
303305
Column.physicalColumn("cols5", DataTypes.BOOLEAN())))));
306+
statement.execute(
307+
String.format(
308+
"ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;",
309+
inventoryDatabase.getDatabaseName()));
310+
expected.add(
311+
new AddColumnEvent(
312+
tableId,
313+
Collections.singletonList(
314+
new AddColumnEvent.ColumnWithPosition(
315+
Column.physicalColumn(
316+
"cols6", BinaryType.ofEmptyLiteral())))));
317+
statement.execute(
318+
String.format(
319+
"ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;",
320+
inventoryDatabase.getDatabaseName()));
321+
expected.add(
322+
new AddColumnEvent(
323+
tableId,
324+
Collections.singletonList(
325+
new AddColumnEvent.ColumnWithPosition(
326+
Column.physicalColumn("cols7", DataTypes.BINARY(1))))));
327+
statement.execute(
328+
String.format(
329+
"ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;",
330+
inventoryDatabase.getDatabaseName()));
331+
expected.add(
332+
new AddColumnEvent(
333+
tableId,
334+
Collections.singletonList(
335+
new AddColumnEvent.ColumnWithPosition(
336+
Column.physicalColumn(
337+
"cols8", CharType.ofEmptyLiteral())))));
338+
statement.execute(
339+
String.format(
340+
"ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;",
341+
inventoryDatabase.getDatabaseName()));
342+
expected.add(
343+
new AddColumnEvent(
344+
tableId,
345+
Collections.singletonList(
346+
new AddColumnEvent.ColumnWithPosition(
347+
Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
304348
}
305349
List<Event> actual = fetchResults(events, expected.size());
306350
assertThat(actual).isEqualTo(expected);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ public DataType deserialize(DataInputView source) throws IOException {
183183
boolean isNullable = source.readBoolean();
184184
switch (dataTypeClass) {
185185
case BINARY:
186-
return new BinaryType(isNullable, source.readInt());
186+
int binaryLength = source.readInt();
187+
return binaryLength == 0
188+
? BinaryType.ofEmptyLiteral()
189+
: new BinaryType(isNullable, binaryLength);
187190
case ARRAY:
188191
return new ArrayType(isNullable, this.deserialize(source));
189192
case BOOLEAN:
@@ -197,7 +200,10 @@ public DataType deserialize(DataInputView source) throws IOException {
197200
case VARBINARY:
198201
return new VarBinaryType(isNullable, source.readInt());
199202
case CHAR:
200-
return new CharType(isNullable, source.readInt());
203+
int charLength = source.readInt();
204+
return charLength == 0
205+
? CharType.ofEmptyLiteral()
206+
: new CharType(isNullable, charLength);
201207
case SMALLINT:
202208
return new SmallIntType(isNullable);
203209
case TIMESTAMP:

0 commit comments

Comments
 (0)