Skip to content

Commit a31a225

Browse files
committed
Add unified transform operator tests & Fix metadata & wildcard related issues
1 parent 35ab5f8 commit a31a225

File tree

8 files changed

+935
-6
lines changed

8 files changed

+935
-6
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import org.apache.flink.cdc.common.schema.Column;
2828
import org.apache.flink.cdc.common.schema.Schema;
2929

30+
import javax.annotation.Nullable;
31+
3032
import java.util.ArrayList;
33+
import java.util.Collections;
3134
import java.util.LinkedList;
3235
import java.util.List;
3336
import java.util.stream.Collectors;
@@ -56,6 +59,19 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
5659
return fieldGetters;
5760
}
5861

62+
/** Restore original data fields from RecordData structure. */
63+
public static List<Object> restoreOriginalData(
64+
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
65+
if (recordData == null) {
66+
return Collections.emptyList();
67+
}
68+
List<Object> actualFields = new ArrayList<>();
69+
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
70+
actualFields.add(fieldGetter.getFieldOrNull(recordData));
71+
}
72+
return actualFields;
73+
}
74+
5975
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
6076
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
6177
if (event instanceof AddColumnEvent) {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,48 @@ public void testWildcardSchemaTransform() throws Exception {
463463
System.out.println(stdout);
464464
}
465465

466+
@Test
467+
public void testWildcardWithMetadataColumnTransform() throws Exception {
468+
String pipelineJob =
469+
String.format(
470+
"source:\n"
471+
+ " type: mysql\n"
472+
+ " hostname: %s\n"
473+
+ " port: 3306\n"
474+
+ " username: %s\n"
475+
+ " password: %s\n"
476+
+ " tables: %s.\\.*\n"
477+
+ " server-id: 5400-5404\n"
478+
+ " server-time-zone: UTC\n"
479+
+ "\n"
480+
+ "sink:\n"
481+
+ " type: values\n"
482+
+ "transform:\n"
483+
+ " - source-table: %s.TABLEALPHA\n"
484+
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
485+
+ "transform:\n"
486+
+ " - source-table: %s.TABLEBETA\n"
487+
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
488+
+ "\n"
489+
+ "pipeline:\n"
490+
+ " parallelism: 1",
491+
INTER_CONTAINER_MYSQL_ALIAS,
492+
MYSQL_TEST_USER,
493+
MYSQL_TEST_PASSWORD,
494+
transformRenameDatabase.getDatabaseName(),
495+
transformRenameDatabase.getDatabaseName(),
496+
transformRenameDatabase.getDatabaseName());
497+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
498+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
499+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
500+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
501+
waitUntilJobRunning(Duration.ofSeconds(30));
502+
LOG.info("Pipeline job is running");
503+
504+
Thread.sleep(10000L);
505+
System.out.println(taskManagerConsumer.toUtf8String());
506+
}
507+
466508
private void validateResult(List<String> expectedEvents) throws Exception {
467509
for (String event : expectedEvents) {
468510
waitUntilSpecificEvent(event, 6000L);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ public static PreTransformOperator.Builder newBuilder() {
6767
public static class Builder {
6868
private final List<TransformRule> transformRules = new ArrayList<>();
6969

70+
public PreTransformOperator.Builder addTransform(
71+
String tableInclusions, @Nullable String projection, @Nullable String filter) {
72+
transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", ""));
73+
return this;
74+
}
75+
7076
public PreTransformOperator.Builder addTransform(
7177
String tableInclusions,
7278
@Nullable String projection,

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
5050
import org.apache.calcite.sql.parser.SqlParseException;
5151
import org.apache.calcite.sql.parser.SqlParser;
52+
import org.apache.calcite.sql.parser.SqlParserPos;
5253
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
5354
import org.apache.calcite.sql.util.SqlOperatorTables;
5455
import org.apache.calcite.sql.validate.SqlConformanceEnum;
@@ -166,6 +167,11 @@ public static List<Column> generateReferencedColumns(
166167
if (SqlKind.AS.equals(sqlBasicCall.getOperator().kind)) {
167168
referencedColumnNames.addAll(
168169
parseColumnNameList(sqlBasicCall.getOperandList().get(0)));
170+
} else {
171+
throw new ParseException(
172+
"Unrecognized projection expression: "
173+
+ sqlBasicCall
174+
+ ". Should be <EXPR> AS <IDENTIFIER>");
169175
}
170176
} else if (sqlNode instanceof SqlIdentifier) {
171177
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
@@ -189,6 +195,26 @@ public static List<Column> generateReferencedColumns(
189195
.collect(Collectors.toList());
190196
}
191197

198+
// Expands wildcard character * to full column list.
199+
// For example, given projection expression "a AS new_a, *, c as new_c"
200+
// and schema [a, b, c], expand it to [a as new_a, a, b, c, c as new_c].
201+
// This step is necessary since passing wildcard to sqlToRel will capture
202+
// unexpected metadata columns.
203+
private static void expandWildcard(SqlSelect sqlSelect, List<Column> columns) {
204+
List<SqlNode> expandedNodes = new ArrayList<>();
205+
for (SqlNode sqlNode : sqlSelect.getSelectList().getList()) {
206+
if (sqlNode instanceof SqlIdentifier && ((SqlIdentifier) sqlNode).isStar()) {
207+
expandedNodes.addAll(
208+
columns.stream()
209+
.map(c -> new SqlIdentifier(c.getName(), SqlParserPos.QUOTED_ZERO))
210+
.collect(Collectors.toList()));
211+
} else {
212+
expandedNodes.add(sqlNode);
213+
}
214+
}
215+
sqlSelect.setSelectList(new SqlNodeList(expandedNodes, SqlParserPos.ZERO));
216+
}
217+
192218
// Returns projected columns based on given projection expression.
193219
// For example, given projection expression "a, b, c, upper(a) as d, b as e" and columns array
194220
// [a, b, c, x, y, z], returns projection column array [a, b, c, d, e].
@@ -201,6 +227,7 @@ public static List<ProjectionColumn> generateProjectionColumns(
201227
if (sqlSelect.getSelectList().isEmpty()) {
202228
return new ArrayList<>();
203229
}
230+
expandWildcard(sqlSelect, columns);
204231
RelNode relNode = sqlToRel(columns, sqlSelect);
205232
Map<String, RelDataType> relDataTypeMap =
206233
relNode.getRowType().getFieldList().stream()
@@ -268,7 +295,10 @@ public static List<ProjectionColumn> generateProjectionColumns(
268295
projectionColumns.add(projectionColumn);
269296
}
270297
} else {
271-
throw new ParseException("Unrecognized projection: " + sqlBasicCall.toString());
298+
throw new ParseException(
299+
"Unrecognized projection expression: "
300+
+ sqlBasicCall
301+
+ ". Should be <EXPR> AS <IDENTIFIER>");
272302
}
273303
} else if (sqlNode instanceof SqlIdentifier) {
274304
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public class PostTransformOperatorTest {
7171
private static final TableId METADATA_TABLEID =
7272
TableId.tableId("my_company", "my_branch", "metadata_table");
7373
private static final Schema METADATA_SCHEMA =
74+
Schema.newBuilder()
75+
.physicalColumn("col1", DataTypes.STRING())
76+
.primaryKey("col1")
77+
.build();
78+
private static final Schema EXPECTED_METADATA_SCHEMA =
7479
Schema.newBuilder()
7580
.physicalColumn("col1", DataTypes.STRING())
7681
.physicalColumn("identifier_name", DataTypes.STRING())
@@ -420,7 +425,7 @@ void testMetadataTransform() throws Exception {
420425
PostTransformOperator.newBuilder()
421426
.addTransform(
422427
METADATA_TABLEID.identifier(),
423-
"col1, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
428+
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
424429
" __table_name__ = 'metadata_table' ")
425430
.build();
426431
EventOperatorTestHarness<PostTransformOperator, Event>
@@ -432,16 +437,17 @@ void testMetadataTransform() throws Exception {
432437
CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA);
433438
BinaryRecordDataGenerator recordDataGenerator =
434439
new BinaryRecordDataGenerator(((RowType) METADATA_SCHEMA.toRowDataType()));
440+
BinaryRecordDataGenerator expectedRecordDataGenerator =
441+
new BinaryRecordDataGenerator(((RowType) EXPECTED_METADATA_SCHEMA.toRowDataType()));
435442
// Insert
436443
DataChangeEvent insertEvent =
437444
DataChangeEvent.insertEvent(
438445
METADATA_TABLEID,
439-
recordDataGenerator.generate(
440-
new Object[] {new BinaryStringData("1"), null, null, null, null}));
446+
recordDataGenerator.generate(new Object[] {new BinaryStringData("1")}));
441447
DataChangeEvent insertEventExpect =
442448
DataChangeEvent.insertEvent(
443449
METADATA_TABLEID,
444-
recordDataGenerator.generate(
450+
expectedRecordDataGenerator.generate(
445451
new Object[] {
446452
new BinaryStringData("1"),
447453
new BinaryStringData("my_company.my_branch.metadata_table"),
@@ -454,7 +460,7 @@ void testMetadataTransform() throws Exception {
454460
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
455461
.isEqualTo(
456462
new StreamRecord<>(
457-
new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA)));
463+
new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA)));
458464
transform.processElement(new StreamRecord<>(insertEvent));
459465
Assertions.assertThat(
460466
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,37 @@ public class PreTransformOperatorTest {
133133
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
134134
.build();
135135

136+
private static final TableId METADATA_TABLEID =
137+
TableId.tableId("my_company", "my_branch", "metadata_table");
138+
private static final Schema METADATA_SCHEMA =
139+
Schema.newBuilder()
140+
.physicalColumn("id", DataTypes.STRING().notNull())
141+
.physicalColumn("age", DataTypes.INT())
142+
.physicalColumn("name", DataTypes.STRING())
143+
.primaryKey("id")
144+
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
145+
.build();
146+
147+
private static final Schema EXPECTED_METADATA_SCHEMA =
148+
Schema.newBuilder()
149+
.physicalColumn("id", DataTypes.STRING().notNull())
150+
.physicalColumn("age", DataTypes.INT())
151+
.physicalColumn("name", DataTypes.STRING())
152+
.primaryKey("id")
153+
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
154+
.build();
155+
156+
private static final TableId METADATA_AS_TABLEID =
157+
TableId.tableId("my_company", "my_branch", "metadata_as_table");
158+
private static final Schema METADATA_AS_SCHEMA =
159+
Schema.newBuilder()
160+
.physicalColumn("sid", DataTypes.INT())
161+
.physicalColumn("name", DataTypes.STRING())
162+
.physicalColumn("name_upper", DataTypes.STRING())
163+
.physicalColumn("tbname", DataTypes.STRING())
164+
.primaryKey("sid")
165+
.build();
166+
136167
@Test
137168
void testEventTransform() throws Exception {
138169
PreTransformOperator transform =
@@ -460,4 +491,30 @@ public void testWildcardTransformColumn() throws Exception {
460491
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
461492
.isEqualTo(new StreamRecord<>(updateEventExpect));
462493
}
494+
495+
@Test
496+
void testMetadataTransform() throws Exception {
497+
PreTransformOperator transform =
498+
PreTransformOperator.newBuilder()
499+
.addTransform(
500+
METADATA_TABLEID.identifier(),
501+
"*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__",
502+
" __table_name__ = 'metadata_table' ")
503+
.build();
504+
505+
EventOperatorTestHarness<PreTransformOperator, Event>
506+
transformFunctionEventEventOperatorTestHarness =
507+
new EventOperatorTestHarness<>(transform, 1);
508+
// Initialization
509+
transformFunctionEventEventOperatorTestHarness.open();
510+
// Create table
511+
CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA);
512+
transform.processElement(new StreamRecord<>(createTableEvent));
513+
514+
Assertions.assertThat(
515+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
516+
.isEqualTo(
517+
new StreamRecord<>(
518+
new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA)));
519+
}
463520
}

0 commit comments

Comments
 (0)