Skip to content

Commit

Permalink
Cleanup: remove redundant reference column prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed May 6, 2024
1 parent 461e797 commit c448e89
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public void testSyncWholeDatabase() throws Exception {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testSyncWholeDatabase() throws Exception {
throw e;
}

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
Expand Down Expand Up @@ -246,7 +246,7 @@ private void validateResult(List<String> expectedEvents) {
}
}

private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ public void testHeteroSchemaTransform() throws Exception {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand Down Expand Up @@ -185,19 +185,19 @@ public void testHeteroSchemaTransform() throws Exception {
throw e;
}

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand Down Expand Up @@ -249,37 +249,43 @@ public void testAssortedSchemaTransform() throws Exception {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1008, v8, alice], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, v8.1, bob], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2011, v11, eva], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2012, v12, fred], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2013, v13, gus], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, v14, henry], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand All @@ -305,19 +311,19 @@ public void testAssortedSchemaTransform() throws Exception {
throw e;
}

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, v8.1, bob], after=[1009, v100, bob], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, v7, iina], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, v11, eva], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand All @@ -327,17 +333,143 @@ public void testAssortedSchemaTransform() throws Exception {
System.out.println(stdout);
}

private void validateResult(List<String> expectedEvents) {
@Test
public void testWildcardSchemaTransform() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEALPHA) AS NAME\n"
+ " filter: AGEALPHA < 19\n"
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`NAME` STRING}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, v8, 199, 17, Alice, alice], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, v8.1, 0, 18, Bob, bob], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`NAME` STRING}, primaryKeys=ID, options=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, v11, Big Sur, 21, Eva, eva], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, v12, Monterey, 22, Fred, fred], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, v13, Ventura, 23, Gus, gus], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, v14, Sonoma, 24, Henry, henry], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
transformRenameDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 16, 'IINA');");
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, v8.1, 0, 18, Bob, bob], after=[1009, v100, 0, 18, Bob, bob], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, v7, 79, 16, IINA, iina], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, v11, Big Sur, 21, Eva, eva], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);

String stdout = taskManagerConsumer.toUtf8String();
System.out.println(stdout);
}

private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
if (!stdout.contains(event)) {
throw new RuntimeException(
"failed to get specific event: " + event + " from stdout: " + stdout);
}
waitUntilSpecificEvent(event, 6000L);
}
}

private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn.REFERENCED_COLUMN_PREFIX;

/**
* A data process function that performs column filtering, calculated column evaluation & final
* projection.
Expand Down Expand Up @@ -218,11 +216,11 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception
getTableInfoFromSchemaEvolutionClient(tableId).getSchema(), event);
}

Schema newSchema = transformSchema(tableId, schema);
tableInfoMap.put(tableId, TableInfo.of(tableId, schema));
Schema projectedSchema = transformSchema(tableId, schema);
tableInfoMap.put(tableId, TableInfo.of(tableId, schema, projectedSchema));

if (event instanceof CreateTableEvent) {
return new CreateTableEvent(event.tableId(), newSchema);
return new CreateTableEvent(event.tableId(), projectedSchema);
}
return event;
}
Expand All @@ -232,7 +230,8 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
if (tableInfo == null) {
Optional<Schema> schemaOptional = schemaEvolutionClient.getLatestSchema(tableId);
if (schemaOptional.isPresent()) {
tableInfo = TableInfo.of(tableId, schemaOptional.get());
Schema projectedSchema = transformSchema(tableId, schemaOptional.get());
tableInfo = TableInfo.of(tableId, schemaOptional.get(), projectedSchema);
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
Expand Down Expand Up @@ -290,7 +289,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
long epochTime = System.currentTimeMillis();
for (PostTransformers transform : transforms) {
Selectors selectors = transform.getSelectors();
Boolean isPreProjection = transform.isContainFilteredComputedColumn();
boolean isPreProjection = transform.isContainFilteredComputedColumn();
if (selectors.isMatch(tableId)) {
Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
Optional<TransformProjection> transformProjectionOptional =
Expand Down Expand Up @@ -439,14 +438,10 @@ private Optional<DataChangeEvent> processPostProjection(

private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) {
List<Object> valueList = new ArrayList<>();
List<String> columns = tableInfo.getSchema().getColumnNames();
RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
RecordData.FieldGetter[] fieldGetters = tableInfo.getProjectedFieldGetters();

for (int i = 0; i < recordData.getArity(); i++) {
if (columns.get(i).startsWith(REFERENCED_COLUMN_PREFIX)) {
continue;
}
valueList.add(fieldGetters[i].getFieldOrNull(recordData));
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
valueList.add(fieldGetter.getFieldOrNull(recordData));
}

return tableInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public class ProjectionColumn implements Serializable {
private final List<String> originalColumnNames;
private TransformExpressionKey transformExpressionKey;
private boolean isReferencedColumn;
private boolean isCalculatedColumn;

public static final String REFERENCED_COLUMN_PREFIX = "__referenced_column__prefix__";

public ProjectionColumn(
Column column,
Expand All @@ -74,20 +71,10 @@ public ProjectionColumn copy() {
}

public Column getColumn() {
return isReferencedColumn
? column.copy(REFERENCED_COLUMN_PREFIX + column.getName())
: column;
}

public Column getRawColumn() {
return column;
}

public String getColumnName() {
return isReferencedColumn ? REFERENCED_COLUMN_PREFIX + column.getName() : column.getName();
}

public String getRawColumnName() {
return column.getName();
}

Expand Down
Loading

0 comments on commit c448e89

Please sign in to comment.