Skip to content

Commit

Permalink
Clarify DDL parsing methods & unify filter-project execution flow
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed May 7, 2024
1 parent 019869f commit 615547f
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
/** keep the relationship of TableId and table information. */
private final Map<TableId, TableInfo> tableInfoMap;

private transient Map<TransformProjection, TransformProjectionProcessor>
private transient Map<TransformProjection, PostTransformProcessor>
transformProjectionProcessorMap;
private transient Map<TransformFilter, TransformFilterProcessor> transformFilterProcessorMap;

Expand Down Expand Up @@ -217,7 +217,7 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception
}

Schema projectedSchema = transformSchema(tableId, schema);
tableInfoMap.put(tableId, TableInfo.of(tableId, schema, projectedSchema));
tableInfoMap.put(tableId, TableInfo.of(tableId, projectedSchema, schema));

if (event instanceof CreateTableEvent) {
return new CreateTableEvent(event.tableId(), projectedSchema);
Expand All @@ -231,7 +231,7 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
Optional<Schema> schemaOptional = schemaEvolutionClient.getLatestSchema(tableId);
if (schemaOptional.isPresent()) {
Schema projectedSchema = transformSchema(tableId, schemaOptional.get());
tableInfo = TableInfo.of(tableId, schemaOptional.get(), projectedSchema);
tableInfo = TableInfo.of(tableId, projectedSchema, schemaOptional.get());
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
Expand All @@ -251,14 +251,12 @@ private Schema transformSchema(TableId tableId, Schema schema) throws Exception
if (!transformProjectionProcessorMap.containsKey(transformProjection)) {
transformProjectionProcessorMap.put(
transformProjection,
TransformProjectionProcessor.of(
transformProjection, transformFilter));
PostTransformProcessor.of(transformProjection, transformFilter));
}
TransformProjectionProcessor transformProjectionProcessor =
PostTransformProcessor postTransformProcessor =
transformProjectionProcessorMap.get(transformProjection);
// update the columns of projection and add the column of projection into Schema
newSchemas.add(
transformProjectionProcessor.processSchemaChangeEvent(schema, false));
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
}
}
}
Expand Down Expand Up @@ -289,36 +287,13 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
long epochTime = System.currentTimeMillis();
for (PostTransformers transform : transforms) {
Selectors selectors = transform.getSelectors();
boolean isPreProjection = transform.isContainFilteredComputedColumn();

if (selectors.isMatch(tableId)) {
Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
Optional<TransformProjection> transformProjectionOptional =
transform.getProjection();
Optional<TransformFilter> transformFilterOptional = transform.getFilter();
if (isPreProjection
&& transformProjectionOptional.isPresent()
&& transformProjectionOptional.get().isValid()) {
TransformProjection transformProjection = transformProjectionOptional.get();
if (!transformProjectionProcessorMap.containsKey(transformProjection)
|| !transformProjectionProcessorMap
.get(transformProjection)
.hasTableInfo()) {
transformProjectionProcessorMap.put(
transformProjection,
TransformProjectionProcessor.of(
tableInfo,
transformProjection,
transformFilterOptional.orElse(null),
timezone));
}
TransformProjectionProcessor transformProjectionProcessor =
transformProjectionProcessorMap.get(transformProjection);
dataChangeEventOptional =
processProjection(
transformProjectionProcessor,
dataChangeEventOptional.get(),
epochTime);
}

if (transformFilterOptional.isPresent()
&& transformFilterOptional.get().isVaild()) {
TransformFilter transformFilter = transformFilterOptional.get();
Expand All @@ -335,8 +310,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
dataChangeEventOptional.get(),
epochTime);
}
if (!isPreProjection
&& dataChangeEventOptional.isPresent()
if (dataChangeEventOptional.isPresent()
&& transformProjectionOptional.isPresent()
&& transformProjectionOptional.get().isValid()) {
TransformProjection transformProjection = transformProjectionOptional.get();
Expand All @@ -346,17 +320,17 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
.hasTableInfo()) {
transformProjectionProcessorMap.put(
transformProjection,
TransformProjectionProcessor.of(
PostTransformProcessor.of(
tableInfo,
transformProjection,
transformFilterOptional.orElse(null),
timezone));
}
TransformProjectionProcessor transformProjectionProcessor =
PostTransformProcessor postTransformProcessor =
transformProjectionProcessorMap.get(transformProjection);
dataChangeEventOptional =
processProjection(
transformProjectionProcessor,
postTransformProcessor,
dataChangeEventOptional.get(),
epochTime);
}
Expand Down Expand Up @@ -402,20 +376,18 @@ private Optional<DataChangeEvent> processFilter(
}

private Optional<DataChangeEvent> processProjection(
TransformProjectionProcessor transformProjectionProcessor,
PostTransformProcessor postTransformProcessor,
DataChangeEvent dataChangeEvent,
long epochTime)
throws Exception {
long epochTime) {
BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
transformProjectionProcessor.processData(before, epochTime);
postTransformProcessor.processData(before, epochTime);
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
}
if (after != null) {
BinaryRecordData projectedAfter =
transformProjectionProcessor.processData(after, epochTime);
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
}
return Optional.of(dataChangeEvent);
Expand All @@ -438,14 +410,14 @@ private Optional<DataChangeEvent> processPostProjection(

private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) {
List<Object> valueList = new ArrayList<>();
RecordData.FieldGetter[] fieldGetters = tableInfo.getProjectedFieldGetters();
RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();

for (RecordData.FieldGetter fieldGetter : fieldGetters) {
valueList.add(fieldGetter.getFieldOrNull(recordData));
}

return tableInfo
.getProjectedRecordDataGenerator()
.getRecordDataGenerator()
.generate(valueList.toArray(new Object[valueList.size()]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.parser.TransformParser;
Expand Down Expand Up @@ -48,16 +47,16 @@
* and the user-defined expression computed columns.
* </ul>
*/
public class TransformProjectionProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TransformProjectionProcessor.class);
public class PostTransformProcessor {
private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class);
private TableInfo tableInfo;
private TableChangeInfo tableChangeInfo;
private TransformProjection transformProjection;
private @Nullable TransformFilter transformFilter;
private String timezone;
private Map<String, ProjectionColumnProcessor> projectionColumnProcessorMap;

public TransformProjectionProcessor(
public PostTransformProcessor(
TableInfo tableInfo,
TableChangeInfo tableChangeInfo,
TransformProjection transformProjection,
Expand All @@ -79,67 +78,59 @@ public boolean hasTableInfo() {
return this.tableInfo != null;
}

public static TransformProjectionProcessor of(
public static PostTransformProcessor of(
TableInfo tableInfo,
TransformProjection transformProjection,
TransformFilter transformFilter,
String timezone) {
return new TransformProjectionProcessor(
return new PostTransformProcessor(
tableInfo, null, transformProjection, transformFilter, timezone);
}

public static TransformProjectionProcessor of(
public static PostTransformProcessor of(
TableChangeInfo tableChangeInfo,
TransformProjection transformProjection,
TransformFilter transformFilter) {
return new TransformProjectionProcessor(
return new PostTransformProcessor(
null, tableChangeInfo, transformProjection, transformFilter, null);
}

public static TransformProjectionProcessor of(
public static PostTransformProcessor of(
TransformProjection transformProjection, TransformFilter transformFilter) {
return new TransformProjectionProcessor(
null, null, transformProjection, transformFilter, null);
return new PostTransformProcessor(null, null, transformProjection, transformFilter, null);
}

public CreateTableEvent processCreateTableEvent(
CreateTableEvent createTableEvent, boolean keepReferencedColumns) {
public Schema processSchemaChangeEvent(Schema schema) {
List<ProjectionColumn> projectionColumns =
TransformParser.generateProjectionColumns(
transformProjection.getProjection(),
transformFilter != null ? transformFilter.getExpression() : null,
createTableEvent.getSchema().getColumns(),
keepReferencedColumns);
transformProjection.setProjectionColumns(projectionColumns);
List<Column> allColumnList = transformProjection.getAllColumnList();
// add the column of projection into Schema
Schema schema = createTableEvent.getSchema().copy(allColumnList);
return new CreateTableEvent(createTableEvent.tableId(), schema);
}

public Schema processSchemaChangeEvent(Schema schema, boolean keepReferencedColumns) {
List<ProjectionColumn> projectionColumns =
TransformParser.generateProjectionColumns(
transformProjection.getProjection(),
transformFilter != null ? transformFilter.getExpression() : null,
schema.getColumns(),
keepReferencedColumns);
transformProjection.getProjection(), schema.getColumns());
transformProjection.setProjectionColumns(projectionColumns);
return schema.copy(
projectionColumns.stream()
.map(ProjectionColumn::getColumn)
.collect(Collectors.toList()));
}

public BinaryRecordData processFillDataField(BinaryRecordData data) {
public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
List<Object> valueList = new ArrayList<>();
for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) {
for (Column column : tableInfo.getSchema().getColumns()) {
boolean isProjectionColumn = false;
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
if (column.getName().equals(projectionColumn.getColumnName())
&& !projectionColumn.isReferencedColumn()
&& projectionColumn.isValidTransformedProjectionColumn()) {
valueList.add(null);
if (!projectionColumnProcessorMap.containsKey(
projectionColumn.getColumnName())) {
projectionColumnProcessorMap.put(
projectionColumn.getColumnName(),
ProjectionColumnProcessor.of(
tableInfo, projectionColumn, timezone));
}
ProjectionColumnProcessor projectionColumnProcessor =
projectionColumnProcessorMap.get(projectionColumn.getColumnName());
valueList.add(
DataTypeConverter.convert(
projectionColumnProcessor.evaluate(after, epochTime),
projectionColumn.getDataType()));
isProjectionColumn = true;
break;
}
Expand All @@ -148,17 +139,17 @@ public BinaryRecordData processFillDataField(BinaryRecordData data) {
valueList.add(
getValueFromBinaryRecordData(
column.getName(),
data,
tableChangeInfo.getOriginalSchema().getColumns(),
tableChangeInfo.getFieldGetters()));
after,
tableInfo.getOriginalSchema().getColumns(),
tableInfo.getOriginalFieldGetters()));
}
}
return tableChangeInfo
return tableInfo
.getRecordDataGenerator()
.generate(valueList.toArray(new Object[valueList.size()]));
}

public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) {
List<Object> valueList = new ArrayList<>();
for (Column column : tableInfo.getSchema().getColumns()) {
boolean isProjectionColumn = false;
Expand Down
Loading

0 comments on commit 615547f

Please sign in to comment.