Skip to content

Commit d859c50

Browse files
committed
Fix CI
1 parent 27d32b9 commit d859c50

File tree

3 files changed

+32
-14
lines changed

3 files changed

+32
-14
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void testHeteroSchemaTransform() throws Exception {
144144
List<String> expectedEvents =
145145
Arrays.asList(
146146
String.format(
147-
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` STRING}, primaryKeys=ID, options=()}",
147+
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
148148
transformRenameDatabase.getDatabaseName()),
149149
String.format(
150150
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, 8.1], op=INSERT, meta=()}",

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,19 @@ public static List<ProjectionColumn> generateProjectionColumns(
262262
.map(
263263
cName -> {
264264
DataType columnType;
265-
if (relDataTypeMap.containsKey(cName)) {
265+
if (rawDataTypeMap.containsKey(cName)) {
266+
columnType = rawDataTypeMap.get(cName);
267+
} else if (relDataTypeMap.containsKey(cName)) {
266268
columnType =
267269
DataTypeConverter
268270
.convertCalciteRelDataTypeToDataType(
269271
relDataTypeMap.get(
270272
cName));
271273
} else {
272-
columnType = rawDataTypeMap.get(cName);
274+
throw new RuntimeException(
275+
String.format(
276+
"Failed to deduce column %s type",
277+
cName));
273278
}
274279
return ProjectionColumn.of(
275280
cName,
@@ -322,9 +327,17 @@ public static List<ProjectionColumn> generateProjectionColumns(
322327
} else if (sqlNode instanceof SqlIdentifier) {
323328
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
324329
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
325-
DataType columnType =
326-
DataTypeConverter.convertCalciteRelDataTypeToDataType(
327-
relDataTypeMap.get(columnName));
330+
DataType columnType;
331+
if (rawDataTypeMap.containsKey(columnName)) {
332+
columnType = rawDataTypeMap.get(columnName);
333+
} else if (relDataTypeMap.containsKey(columnName)) {
334+
columnType =
335+
DataTypeConverter.convertCalciteRelDataTypeToDataType(
336+
relDataTypeMap.get(columnName));
337+
} else {
338+
throw new RuntimeException(
339+
String.format("Failed to deduce column %s type", columnName));
340+
}
328341
if (isMetadataColumn(columnName)) {
329342
projectionColumns.add(
330343
ProjectionColumn.of(
@@ -354,17 +367,22 @@ public static List<ProjectionColumn> generateProjectionColumns(
354367
parseColumnNameList(sqlFilter.getWhere()).stream()
355368
.map(
356369
cName -> {
357-
if (relDataTypeMap.containsKey(cName)) {
370+
if (rawDataTypeMap.containsKey(cName)) {
371+
// filter expression references an existed column
372+
return ProjectionColumn.of(
373+
cName, rawDataTypeMap.get(cName));
374+
} else if (relDataTypeMap.containsKey(cName)) {
358375
// filter expression references a calculated column
359376
return ProjectionColumn.of(
360377
cName,
361378
DataTypeConverter
362379
.convertCalciteRelDataTypeToDataType(
363380
relDataTypeMap.get(cName)));
364381
} else {
365-
// filter expression references an existed column
366-
return ProjectionColumn.of(
367-
cName, rawDataTypeMap.get(cName));
382+
throw new RuntimeException(
383+
String.format(
384+
"Failed to deduce column %s type",
385+
cName));
368386
}
369387
})
370388
.collect(Collectors.toSet()));

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public class PostTransformOperatorTest {
7676
Schema.newBuilder()
7777
.physicalColumn("col1", DataTypes.STRING())
7878
.physicalColumn("identifier_name", DataTypes.STRING())
79-
.physicalColumn("__namespace_name__", DataTypes.STRING())
80-
.physicalColumn("__schema_name__", DataTypes.STRING())
81-
.physicalColumn("__table_name__", DataTypes.STRING())
79+
.physicalColumn("__namespace_name__", DataTypes.STRING().notNull())
80+
.physicalColumn("__schema_name__", DataTypes.STRING().notNull())
81+
.physicalColumn("__table_name__", DataTypes.STRING().notNull())
8282
.primaryKey("col1")
8383
.build();
8484

@@ -400,7 +400,7 @@ void testMetadataTransform() throws Exception {
400400
PostTransformOperator.newBuilder()
401401
.addTransform(
402402
METADATA_TABLEID.identifier(),
403-
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
403+
"col1, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
404404
" __table_name__ = 'metadata_table' ")
405405
.build();
406406
EventOperatorTestHarness<PostTransformOperator, Event>

0 commit comments

Comments
 (0)