Skip to content

Commit 31a7c1d

Browse files
committed
Cleanup: remove redundant reference column prefix
1 parent d859c50 commit 31a7c1d

File tree

12 files changed

+475
-218
lines changed

12 files changed

+475
-218
lines changed

Diff for: flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ public void testSyncWholeDatabase() throws Exception {
114114
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
115115
waitUntilJobRunning(Duration.ofSeconds(30));
116116
LOG.info("Pipeline job is running");
117-
waitUtilSpecificEvent(
117+
waitUntilSpecificEvent(
118118
String.format(
119119
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
120120
mysqlInventoryDatabase.getDatabaseName()),
121121
60000L);
122-
waitUtilSpecificEvent(
122+
waitUntilSpecificEvent(
123123
String.format(
124124
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
125125
mysqlInventoryDatabase.getDatabaseName()),
@@ -201,7 +201,7 @@ public void testSyncWholeDatabase() throws Exception {
201201
throw e;
202202
}
203203

204-
waitUtilSpecificEvent(
204+
waitUntilSpecificEvent(
205205
String.format(
206206
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
207207
mysqlInventoryDatabase.getDatabaseName()),
@@ -246,7 +246,7 @@ private void validateResult(List<String> expectedEvents) {
246246
}
247247
}
248248

249-
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
249+
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
250250
boolean result = false;
251251
long endTimeout = System.currentTimeMillis() + timeout;
252252
while (System.currentTimeMillis() < endTimeout) {

Diff for: flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java

+152-20
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ public void testHeteroSchemaTransform() throws Exception {
129129
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
130130
waitUntilJobRunning(Duration.ofSeconds(30));
131131
LOG.info("Pipeline job is running");
132-
waitUtilSpecificEvent(
132+
waitUntilSpecificEvent(
133133
String.format(
134134
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
135135
transformRenameDatabase.getDatabaseName()),
136136
6000L);
137137

138-
waitUtilSpecificEvent(
138+
waitUntilSpecificEvent(
139139
String.format(
140140
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
141141
transformRenameDatabase.getDatabaseName()),
@@ -185,19 +185,19 @@ public void testHeteroSchemaTransform() throws Exception {
185185
throw e;
186186
}
187187

188-
waitUtilSpecificEvent(
188+
waitUntilSpecificEvent(
189189
String.format(
190190
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
191191
transformRenameDatabase.getDatabaseName()),
192192
6000L);
193193

194-
waitUtilSpecificEvent(
194+
waitUntilSpecificEvent(
195195
String.format(
196196
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
197197
transformRenameDatabase.getDatabaseName()),
198198
6000L);
199199

200-
waitUtilSpecificEvent(
200+
waitUntilSpecificEvent(
201201
String.format(
202202
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
203203
transformRenameDatabase.getDatabaseName()),
@@ -249,37 +249,43 @@ public void testAssortedSchemaTransform() throws Exception {
249249
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
250250
waitUntilJobRunning(Duration.ofSeconds(30));
251251
LOG.info("Pipeline job is running");
252-
waitUtilSpecificEvent(
252+
253+
waitUntilSpecificEvent(
254+
String.format(
255+
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`NAME` STRING}, primaryKeys=ID, options=()}",
256+
transformRenameDatabase.getDatabaseName()),
257+
6000L);
258+
waitUntilSpecificEvent(
253259
String.format(
254260
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1008, v8, alice], op=INSERT, meta=()}",
255261
transformRenameDatabase.getDatabaseName()),
256262
6000L);
257263

258-
waitUtilSpecificEvent(
264+
waitUntilSpecificEvent(
259265
String.format(
260266
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, v8.1, bob], op=INSERT, meta=()}",
261267
transformRenameDatabase.getDatabaseName()),
262268
6000L);
263269

264-
waitUtilSpecificEvent(
270+
waitUntilSpecificEvent(
265271
String.format(
266272
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2011, v11, eva], op=INSERT, meta=()}",
267273
transformRenameDatabase.getDatabaseName()),
268274
6000L);
269275

270-
waitUtilSpecificEvent(
276+
waitUntilSpecificEvent(
271277
String.format(
272278
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2012, v12, fred], op=INSERT, meta=()}",
273279
transformRenameDatabase.getDatabaseName()),
274280
6000L);
275281

276-
waitUtilSpecificEvent(
282+
waitUntilSpecificEvent(
277283
String.format(
278284
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2013, v13, gus], op=INSERT, meta=()}",
279285
transformRenameDatabase.getDatabaseName()),
280286
6000L);
281287

282-
waitUtilSpecificEvent(
288+
waitUntilSpecificEvent(
283289
String.format(
284290
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, v14, henry], op=INSERT, meta=()}",
285291
transformRenameDatabase.getDatabaseName()),
@@ -305,19 +311,19 @@ public void testAssortedSchemaTransform() throws Exception {
305311
throw e;
306312
}
307313

308-
waitUtilSpecificEvent(
314+
waitUntilSpecificEvent(
309315
String.format(
310316
"DataChangeEvent{tableId=%s.terminus, before=[1009, v8.1, bob], after=[1009, v100, bob], op=UPDATE, meta=()}",
311317
transformRenameDatabase.getDatabaseName()),
312318
6000L);
313319

314-
waitUtilSpecificEvent(
320+
waitUntilSpecificEvent(
315321
String.format(
316322
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, v7, iina], op=INSERT, meta=()}",
317323
transformRenameDatabase.getDatabaseName()),
318324
6000L);
319325

320-
waitUtilSpecificEvent(
326+
waitUntilSpecificEvent(
321327
String.format(
322328
"DataChangeEvent{tableId=%s.terminus, before=[2011, v11, eva], after=[], op=DELETE, meta=()}",
323329
transformRenameDatabase.getDatabaseName()),
@@ -327,17 +333,143 @@ public void testAssortedSchemaTransform() throws Exception {
327333
System.out.println(stdout);
328334
}
329335

330-
private void validateResult(List<String> expectedEvents) {
336+
@Test
337+
public void testWildcardSchemaTransform() throws Exception {
338+
String pipelineJob =
339+
String.format(
340+
"source:\n"
341+
+ " type: mysql\n"
342+
+ " hostname: %s\n"
343+
+ " port: 3306\n"
344+
+ " username: %s\n"
345+
+ " password: %s\n"
346+
+ " tables: %s.\\.*\n"
347+
+ " server-id: 5400-5404\n"
348+
+ " server-time-zone: UTC\n"
349+
+ "\n"
350+
+ "sink:\n"
351+
+ " type: values\n"
352+
+ "transform:\n"
353+
+ " - source-table: %s.TABLEALPHA\n"
354+
+ " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEALPHA) AS NAME\n"
355+
+ " filter: AGEALPHA < 19\n"
356+
+ " - source-table: %s.TABLEBETA\n"
357+
+ " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n"
358+
+ "\n"
359+
+ "pipeline:\n"
360+
+ " parallelism: 1",
361+
INTER_CONTAINER_MYSQL_ALIAS,
362+
MYSQL_TEST_USER,
363+
MYSQL_TEST_PASSWORD,
364+
transformRenameDatabase.getDatabaseName(),
365+
transformRenameDatabase.getDatabaseName(),
366+
transformRenameDatabase.getDatabaseName(),
367+
transformRenameDatabase.getDatabaseName(),
368+
transformRenameDatabase.getDatabaseName());
369+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
370+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
371+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
372+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
373+
waitUntilJobRunning(Duration.ofSeconds(30));
374+
LOG.info("Pipeline job is running");
375+
376+
waitUntilSpecificEvent(
377+
String.format(
378+
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`NAME` STRING}, primaryKeys=ID, options=()}",
379+
transformRenameDatabase.getDatabaseName()),
380+
6000L);
381+
382+
waitUntilSpecificEvent(
383+
String.format(
384+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, v8, 199, 17, Alice, alice], op=INSERT, meta=()}",
385+
transformRenameDatabase.getDatabaseName()),
386+
6000L);
387+
388+
waitUntilSpecificEvent(
389+
String.format(
390+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, v8.1, 0, 18, Bob, bob], op=INSERT, meta=()}",
391+
transformRenameDatabase.getDatabaseName()),
392+
6000L);
393+
394+
waitUntilSpecificEvent(
395+
String.format(
396+
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`NAME` STRING}, primaryKeys=ID, options=()}",
397+
transformRenameDatabase.getDatabaseName()),
398+
6000L);
399+
400+
waitUntilSpecificEvent(
401+
String.format(
402+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, v11, Big Sur, 21, Eva, eva], op=INSERT, meta=()}",
403+
transformRenameDatabase.getDatabaseName()),
404+
6000L);
405+
406+
waitUntilSpecificEvent(
407+
String.format(
408+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, v12, Monterey, 22, Fred, fred], op=INSERT, meta=()}",
409+
transformRenameDatabase.getDatabaseName()),
410+
6000L);
411+
412+
waitUntilSpecificEvent(
413+
String.format(
414+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, v13, Ventura, 23, Gus, gus], op=INSERT, meta=()}",
415+
transformRenameDatabase.getDatabaseName()),
416+
6000L);
417+
418+
waitUntilSpecificEvent(
419+
String.format(
420+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, v14, Sonoma, 24, Henry, henry], op=INSERT, meta=()}",
421+
transformRenameDatabase.getDatabaseName()),
422+
6000L);
423+
424+
LOG.info("Begin incremental reading stage.");
425+
// generate binlogs
426+
String mysqlJdbcUrl =
427+
String.format(
428+
"jdbc:mysql://%s:%s/%s",
429+
MYSQL.getHost(),
430+
MYSQL.getDatabasePort(),
431+
transformRenameDatabase.getDatabaseName());
432+
try (Connection conn =
433+
DriverManager.getConnection(
434+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
435+
Statement stat = conn.createStatement()) {
436+
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
437+
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 16, 'IINA');");
438+
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
439+
} catch (SQLException e) {
440+
LOG.error("Update table for CDC failed.", e);
441+
throw e;
442+
}
443+
444+
waitUntilSpecificEvent(
445+
String.format(
446+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, v8.1, 0, 18, Bob, bob], after=[1009, v100, 0, 18, Bob, bob], op=UPDATE, meta=()}",
447+
transformRenameDatabase.getDatabaseName()),
448+
6000L);
449+
450+
waitUntilSpecificEvent(
451+
String.format(
452+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, v7, 79, 16, IINA, iina], op=INSERT, meta=()}",
453+
transformRenameDatabase.getDatabaseName()),
454+
6000L);
455+
456+
waitUntilSpecificEvent(
457+
String.format(
458+
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, v11, Big Sur, 21, Eva, eva], after=[], op=DELETE, meta=()}",
459+
transformRenameDatabase.getDatabaseName()),
460+
6000L);
461+
331462
String stdout = taskManagerConsumer.toUtf8String();
463+
System.out.println(stdout);
464+
}
465+
466+
private void validateResult(List<String> expectedEvents) throws Exception {
332467
for (String event : expectedEvents) {
333-
if (!stdout.contains(event)) {
334-
throw new RuntimeException(
335-
"failed to get specific event: " + event + " from stdout: " + stdout);
336-
}
468+
waitUntilSpecificEvent(event, 6000L);
337469
}
338470
}
339471

340-
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
472+
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
341473
boolean result = false;
342474
long endTimeout = System.currentTimeMillis() + timeout;
343475
while (System.currentTimeMillis() < endTimeout) {

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import java.util.concurrent.ConcurrentHashMap;
5151
import java.util.stream.Collectors;
5252

53-
import static org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn.REFERENCED_COLUMN_PREFIX;
54-
5553
/**
5654
* A data process function that performs column filtering, calculated column evaluation & final
5755
* projection.
@@ -218,11 +216,11 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception
218216
getTableInfoFromSchemaEvolutionClient(tableId).getSchema(), event);
219217
}
220218

221-
Schema newSchema = transformSchema(tableId, schema);
222-
tableInfoMap.put(tableId, TableInfo.of(tableId, schema));
219+
Schema projectedSchema = transformSchema(tableId, schema);
220+
tableInfoMap.put(tableId, TableInfo.of(tableId, schema, projectedSchema));
223221

224222
if (event instanceof CreateTableEvent) {
225-
return new CreateTableEvent(event.tableId(), newSchema);
223+
return new CreateTableEvent(event.tableId(), projectedSchema);
226224
}
227225
return event;
228226
}
@@ -232,7 +230,8 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
232230
if (tableInfo == null) {
233231
Optional<Schema> schemaOptional = schemaEvolutionClient.getLatestSchema(tableId);
234232
if (schemaOptional.isPresent()) {
235-
tableInfo = TableInfo.of(tableId, schemaOptional.get());
233+
Schema projectedSchema = transformSchema(tableId, schemaOptional.get());
234+
tableInfo = TableInfo.of(tableId, schemaOptional.get(), projectedSchema);
236235
} else {
237236
throw new RuntimeException(
238237
"Could not find schema message from SchemaRegistry for " + tableId);
@@ -290,7 +289,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
290289
long epochTime = System.currentTimeMillis();
291290
for (PostTransformers transform : transforms) {
292291
Selectors selectors = transform.getSelectors();
293-
Boolean isPreProjection = transform.isContainFilteredComputedColumn();
292+
boolean isPreProjection = transform.isContainFilteredComputedColumn();
294293
if (selectors.isMatch(tableId)) {
295294
Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
296295
Optional<TransformProjection> transformProjectionOptional =
@@ -439,14 +438,10 @@ private Optional<DataChangeEvent> processPostProjection(
439438

440439
private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) {
441440
List<Object> valueList = new ArrayList<>();
442-
List<String> columns = tableInfo.getSchema().getColumnNames();
443-
RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
441+
RecordData.FieldGetter[] fieldGetters = tableInfo.getProjectedFieldGetters();
444442

445-
for (int i = 0; i < recordData.getArity(); i++) {
446-
if (columns.get(i).startsWith(REFERENCED_COLUMN_PREFIX)) {
447-
continue;
448-
}
449-
valueList.add(fieldGetters[i].getFieldOrNull(recordData));
443+
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
444+
valueList.add(fieldGetter.getFieldOrNull(recordData));
450445
}
451446

452447
return tableInfo

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java

-13
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ public class ProjectionColumn implements Serializable {
4949
private final List<String> originalColumnNames;
5050
private TransformExpressionKey transformExpressionKey;
5151
private boolean isReferencedColumn;
52-
private boolean isCalculatedColumn;
53-
54-
public static final String REFERENCED_COLUMN_PREFIX = "__referenced_column__prefix__";
5552

5653
public ProjectionColumn(
5754
Column column,
@@ -74,20 +71,10 @@ public ProjectionColumn copy() {
7471
}
7572

7673
public Column getColumn() {
77-
return isReferencedColumn
78-
? column.copy(REFERENCED_COLUMN_PREFIX + column.getName())
79-
: column;
80-
}
81-
82-
public Column getRawColumn() {
8374
return column;
8475
}
8576

8677
public String getColumnName() {
87-
return isReferencedColumn ? REFERENCED_COLUMN_PREFIX + column.getName() : column.getName();
88-
}
89-
90-
public String getRawColumnName() {
9178
return column.getName();
9279
}
9380

0 commit comments

Comments
 (0)