Skip to content

Commit 35ab5f8

Browse files
committed
Clarify DDL parsing methods & unify filter-project execution flow
1 parent 019869f commit 35ab5f8

13 files changed

+453
-322
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

+18-46
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
6666
/** keep the relationship of TableId and table information. */
6767
private final Map<TableId, TableInfo> tableInfoMap;
6868

69-
private transient Map<TransformProjection, TransformProjectionProcessor>
69+
private transient Map<TransformProjection, PostTransformProcessor>
7070
transformProjectionProcessorMap;
7171
private transient Map<TransformFilter, TransformFilterProcessor> transformFilterProcessorMap;
7272

@@ -217,7 +217,7 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception
217217
}
218218

219219
Schema projectedSchema = transformSchema(tableId, schema);
220-
tableInfoMap.put(tableId, TableInfo.of(tableId, schema, projectedSchema));
220+
tableInfoMap.put(tableId, TableInfo.of(tableId, projectedSchema, schema));
221221

222222
if (event instanceof CreateTableEvent) {
223223
return new CreateTableEvent(event.tableId(), projectedSchema);
@@ -231,7 +231,7 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
231231
Optional<Schema> schemaOptional = schemaEvolutionClient.getLatestSchema(tableId);
232232
if (schemaOptional.isPresent()) {
233233
Schema projectedSchema = transformSchema(tableId, schemaOptional.get());
234-
tableInfo = TableInfo.of(tableId, schemaOptional.get(), projectedSchema);
234+
tableInfo = TableInfo.of(tableId, projectedSchema, schemaOptional.get());
235235
} else {
236236
throw new RuntimeException(
237237
"Could not find schema message from SchemaRegistry for " + tableId);
@@ -251,14 +251,12 @@ private Schema transformSchema(TableId tableId, Schema schema) throws Exception
251251
if (!transformProjectionProcessorMap.containsKey(transformProjection)) {
252252
transformProjectionProcessorMap.put(
253253
transformProjection,
254-
TransformProjectionProcessor.of(
255-
transformProjection, transformFilter));
254+
PostTransformProcessor.of(transformProjection, transformFilter));
256255
}
257-
TransformProjectionProcessor transformProjectionProcessor =
256+
PostTransformProcessor postTransformProcessor =
258257
transformProjectionProcessorMap.get(transformProjection);
259258
// update the columns of projection and add the column of projection into Schema
260-
newSchemas.add(
261-
transformProjectionProcessor.processSchemaChangeEvent(schema, false));
259+
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
262260
}
263261
}
264262
}
@@ -289,36 +287,13 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
289287
long epochTime = System.currentTimeMillis();
290288
for (PostTransformers transform : transforms) {
291289
Selectors selectors = transform.getSelectors();
292-
boolean isPreProjection = transform.isContainFilteredComputedColumn();
290+
293291
if (selectors.isMatch(tableId)) {
294292
Optional<DataChangeEvent> dataChangeEventOptional = Optional.of(dataChangeEvent);
295293
Optional<TransformProjection> transformProjectionOptional =
296294
transform.getProjection();
297295
Optional<TransformFilter> transformFilterOptional = transform.getFilter();
298-
if (isPreProjection
299-
&& transformProjectionOptional.isPresent()
300-
&& transformProjectionOptional.get().isValid()) {
301-
TransformProjection transformProjection = transformProjectionOptional.get();
302-
if (!transformProjectionProcessorMap.containsKey(transformProjection)
303-
|| !transformProjectionProcessorMap
304-
.get(transformProjection)
305-
.hasTableInfo()) {
306-
transformProjectionProcessorMap.put(
307-
transformProjection,
308-
TransformProjectionProcessor.of(
309-
tableInfo,
310-
transformProjection,
311-
transformFilterOptional.orElse(null),
312-
timezone));
313-
}
314-
TransformProjectionProcessor transformProjectionProcessor =
315-
transformProjectionProcessorMap.get(transformProjection);
316-
dataChangeEventOptional =
317-
processProjection(
318-
transformProjectionProcessor,
319-
dataChangeEventOptional.get(),
320-
epochTime);
321-
}
296+
322297
if (transformFilterOptional.isPresent()
323298
&& transformFilterOptional.get().isVaild()) {
324299
TransformFilter transformFilter = transformFilterOptional.get();
@@ -335,8 +310,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
335310
dataChangeEventOptional.get(),
336311
epochTime);
337312
}
338-
if (!isPreProjection
339-
&& dataChangeEventOptional.isPresent()
313+
if (dataChangeEventOptional.isPresent()
340314
&& transformProjectionOptional.isPresent()
341315
&& transformProjectionOptional.get().isValid()) {
342316
TransformProjection transformProjection = transformProjectionOptional.get();
@@ -346,17 +320,17 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
346320
.hasTableInfo()) {
347321
transformProjectionProcessorMap.put(
348322
transformProjection,
349-
TransformProjectionProcessor.of(
323+
PostTransformProcessor.of(
350324
tableInfo,
351325
transformProjection,
352326
transformFilterOptional.orElse(null),
353327
timezone));
354328
}
355-
TransformProjectionProcessor transformProjectionProcessor =
329+
PostTransformProcessor postTransformProcessor =
356330
transformProjectionProcessorMap.get(transformProjection);
357331
dataChangeEventOptional =
358332
processProjection(
359-
transformProjectionProcessor,
333+
postTransformProcessor,
360334
dataChangeEventOptional.get(),
361335
epochTime);
362336
}
@@ -402,20 +376,18 @@ private Optional<DataChangeEvent> processFilter(
402376
}
403377

404378
private Optional<DataChangeEvent> processProjection(
405-
TransformProjectionProcessor transformProjectionProcessor,
379+
PostTransformProcessor postTransformProcessor,
406380
DataChangeEvent dataChangeEvent,
407-
long epochTime)
408-
throws Exception {
381+
long epochTime) {
409382
BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
410383
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
411384
if (before != null) {
412385
BinaryRecordData projectedBefore =
413-
transformProjectionProcessor.processData(before, epochTime);
386+
postTransformProcessor.processData(before, epochTime);
414387
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
415388
}
416389
if (after != null) {
417-
BinaryRecordData projectedAfter =
418-
transformProjectionProcessor.processData(after, epochTime);
390+
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
419391
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
420392
}
421393
return Optional.of(dataChangeEvent);
@@ -438,14 +410,14 @@ private Optional<DataChangeEvent> processPostProjection(
438410

439411
private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) {
440412
List<Object> valueList = new ArrayList<>();
441-
RecordData.FieldGetter[] fieldGetters = tableInfo.getProjectedFieldGetters();
413+
RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
442414

443415
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
444416
valueList.add(fieldGetter.getFieldOrNull(recordData));
445417
}
446418

447419
return tableInfo
448-
.getProjectedRecordDataGenerator()
420+
.getRecordDataGenerator()
449421
.generate(valueList.toArray(new Object[valueList.size()]));
450422
}
451423

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java

+31-40
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.data.RecordData;
2121
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
22-
import org.apache.flink.cdc.common.event.CreateTableEvent;
2322
import org.apache.flink.cdc.common.schema.Column;
2423
import org.apache.flink.cdc.common.schema.Schema;
2524
import org.apache.flink.cdc.runtime.parser.TransformParser;
@@ -48,16 +47,16 @@
4847
* and the user-defined expression computed columns.
4948
* </ul>
5049
*/
51-
public class TransformProjectionProcessor {
52-
private static final Logger LOG = LoggerFactory.getLogger(TransformProjectionProcessor.class);
50+
public class PostTransformProcessor {
51+
private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class);
5352
private TableInfo tableInfo;
5453
private TableChangeInfo tableChangeInfo;
5554
private TransformProjection transformProjection;
5655
private @Nullable TransformFilter transformFilter;
5756
private String timezone;
5857
private Map<String, ProjectionColumnProcessor> projectionColumnProcessorMap;
5958

60-
public TransformProjectionProcessor(
59+
public PostTransformProcessor(
6160
TableInfo tableInfo,
6261
TableChangeInfo tableChangeInfo,
6362
TransformProjection transformProjection,
@@ -79,67 +78,59 @@ public boolean hasTableInfo() {
7978
return this.tableInfo != null;
8079
}
8180

82-
public static TransformProjectionProcessor of(
81+
public static PostTransformProcessor of(
8382
TableInfo tableInfo,
8483
TransformProjection transformProjection,
8584
TransformFilter transformFilter,
8685
String timezone) {
87-
return new TransformProjectionProcessor(
86+
return new PostTransformProcessor(
8887
tableInfo, null, transformProjection, transformFilter, timezone);
8988
}
9089

91-
public static TransformProjectionProcessor of(
90+
public static PostTransformProcessor of(
9291
TableChangeInfo tableChangeInfo,
9392
TransformProjection transformProjection,
9493
TransformFilter transformFilter) {
95-
return new TransformProjectionProcessor(
94+
return new PostTransformProcessor(
9695
null, tableChangeInfo, transformProjection, transformFilter, null);
9796
}
9897

99-
public static TransformProjectionProcessor of(
98+
public static PostTransformProcessor of(
10099
TransformProjection transformProjection, TransformFilter transformFilter) {
101-
return new TransformProjectionProcessor(
102-
null, null, transformProjection, transformFilter, null);
100+
return new PostTransformProcessor(null, null, transformProjection, transformFilter, null);
103101
}
104102

105-
public CreateTableEvent processCreateTableEvent(
106-
CreateTableEvent createTableEvent, boolean keepReferencedColumns) {
103+
public Schema processSchemaChangeEvent(Schema schema) {
107104
List<ProjectionColumn> projectionColumns =
108105
TransformParser.generateProjectionColumns(
109-
transformProjection.getProjection(),
110-
transformFilter != null ? transformFilter.getExpression() : null,
111-
createTableEvent.getSchema().getColumns(),
112-
keepReferencedColumns);
113-
transformProjection.setProjectionColumns(projectionColumns);
114-
List<Column> allColumnList = transformProjection.getAllColumnList();
115-
// add the column of projection into Schema
116-
Schema schema = createTableEvent.getSchema().copy(allColumnList);
117-
return new CreateTableEvent(createTableEvent.tableId(), schema);
118-
}
119-
120-
public Schema processSchemaChangeEvent(Schema schema, boolean keepReferencedColumns) {
121-
List<ProjectionColumn> projectionColumns =
122-
TransformParser.generateProjectionColumns(
123-
transformProjection.getProjection(),
124-
transformFilter != null ? transformFilter.getExpression() : null,
125-
schema.getColumns(),
126-
keepReferencedColumns);
106+
transformProjection.getProjection(), schema.getColumns());
127107
transformProjection.setProjectionColumns(projectionColumns);
128108
return schema.copy(
129109
projectionColumns.stream()
130110
.map(ProjectionColumn::getColumn)
131111
.collect(Collectors.toList()));
132112
}
133113

134-
public BinaryRecordData processFillDataField(BinaryRecordData data) {
114+
public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
135115
List<Object> valueList = new ArrayList<>();
136-
for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) {
116+
for (Column column : tableInfo.getSchema().getColumns()) {
137117
boolean isProjectionColumn = false;
138118
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
139119
if (column.getName().equals(projectionColumn.getColumnName())
140-
&& !projectionColumn.isReferencedColumn()
141120
&& projectionColumn.isValidTransformedProjectionColumn()) {
142-
valueList.add(null);
121+
if (!projectionColumnProcessorMap.containsKey(
122+
projectionColumn.getColumnName())) {
123+
projectionColumnProcessorMap.put(
124+
projectionColumn.getColumnName(),
125+
ProjectionColumnProcessor.of(
126+
tableInfo, projectionColumn, timezone));
127+
}
128+
ProjectionColumnProcessor projectionColumnProcessor =
129+
projectionColumnProcessorMap.get(projectionColumn.getColumnName());
130+
valueList.add(
131+
DataTypeConverter.convert(
132+
projectionColumnProcessor.evaluate(after, epochTime),
133+
projectionColumn.getDataType()));
143134
isProjectionColumn = true;
144135
break;
145136
}
@@ -148,17 +139,17 @@ public BinaryRecordData processFillDataField(BinaryRecordData data) {
148139
valueList.add(
149140
getValueFromBinaryRecordData(
150141
column.getName(),
151-
data,
152-
tableChangeInfo.getOriginalSchema().getColumns(),
153-
tableChangeInfo.getFieldGetters()));
142+
after,
143+
tableInfo.getOriginalSchema().getColumns(),
144+
tableInfo.getOriginalFieldGetters()));
154145
}
155146
}
156-
return tableChangeInfo
147+
return tableInfo
157148
.getRecordDataGenerator()
158149
.generate(valueList.toArray(new Object[valueList.size()]));
159150
}
160151

161-
public BinaryRecordData processData(BinaryRecordData after, long epochTime) {
152+
public BinaryRecordData preProcessData(BinaryRecordData after, long epochTime) {
162153
List<Object> valueList = new ArrayList<>();
163154
for (Column column : tableInfo.getSchema().getColumns()) {
164155
boolean isProjectionColumn = false;

0 commit comments

Comments
 (0)