Skip to content

Commit c6cf46b

Browse files
authored
[FLINK-36879][runtime] Support to convert delete as insert in transform (#3804)
1 parent 2ad8006 commit c6cf46b

File tree

22 files changed

+618
-65
lines changed

22 files changed

+618
-65
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,26 @@ To describe a transform rule, the following parameters can be used:
3939
| primary-keys | Sink table primary keys, separated by commas | optional |
4040
| partition-keys | Sink table partition keys, separated by commas | optional |
4141
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
4243
| description | Transform rule description | optional |
4344

4445
Multiple rules can be declared in one single pipeline YAML file.
4546

47+
## converter-after-transform
48+
49+
`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows.
50+
51+
- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete.
52+
53+
For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record.
54+
55+
```yaml
56+
transform:
57+
- source-table: \.*.\.*
58+
projection: \*, __data_event_type__ AS op_type
59+
converter-after-transform: SOFT_DELETE
60+
```
61+
4662
# Metadata Fields
4763
## Fields definition
4864
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

docs/content/docs/core-concept/transform.md

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,34 @@ What's more, it also helps users filter some unnecessary data during the synchro
3131
# Parameters
3232
To describe a transform rule, the following parameters can be used:
3333

34-
| Parameter | Meaning | Optional/Required |
35-
|--------------|----------------------------------------------------|-------------------|
36-
| source-table | Source table id, supports regular expressions | required |
37-
| projection | Projection rule, supports syntax similar to the select clause in SQL | optional |
38-
| filter | Filter rule, supports syntax similar to the where clause in SQL | optional |
39-
| primary-keys | Sink table primary keys, separated by commas | optional |
40-
| partition-keys | Sink table partition keys, separated by commas | optional |
41-
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42-
| description | Transform rule description | optional |
34+
| Parameter | Meaning | Optional/Required |
35+
|---------------------------|-----------------------------------------------------------------------------------|-------------------|
36+
| source-table | Source table id, supports regular expressions | required |
37+
| projection | Projection rule, supports syntax similar to the select clause in SQL | optional |
38+
| filter | Filter rule, supports syntax similar to the where clause in SQL | optional |
39+
| primary-keys | Sink table primary keys, separated by commas | optional |
40+
| partition-keys | Sink table partition keys, separated by commas | optional |
41+
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
43+
| description | Transform rule description | optional |
4344

4445
Multiple rules can be declared in one single pipeline YAML file.
4546

47+
## converter-after-transform
48+
49+
`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows.
50+
51+
- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete.
52+
53+
For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record.
54+
55+
```yaml
56+
transform:
57+
- source-table: \.*.\.*
58+
projection: \*, __data_event_type__ AS op_type
59+
converter-after-transform: SOFT_DELETE
60+
```
61+
4662
# Metadata Fields
4763
## Fields definition
4864
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
7878
private static final String TRANSFORM_PROJECTION_KEY = "projection";
7979
private static final String TRANSFORM_FILTER_KEY = "filter";
8080
private static final String TRANSFORM_DESCRIPTION_KEY = "description";
81+
private static final String TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY =
82+
"converter-after-transform";
8183

8284
// UDF related keys
8385
private static final String UDF_KEY = "user-defined-function";
@@ -316,6 +318,10 @@ private TransformDef toTransformDef(JsonNode transformNode) {
316318
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
317319
.map(JsonNode::asText)
318320
.orElse(null);
321+
String postTransformConverter =
322+
Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY))
323+
.map(JsonNode::asText)
324+
.orElse(null);
319325

320326
return new TransformDef(
321327
sourceTable,
@@ -324,7 +330,8 @@ private TransformDef toTransformDef(JsonNode transformNode) {
324330
primaryKeys,
325331
partitionKeys,
326332
tableOptions,
327-
description);
333+
description,
334+
postTransformConverter);
328335
}
329336

330337
private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ void testValidTimeZone() throws Exception {
159159
}
160160

161161
@Test
162-
void testInvalidTimeZone() throws Exception {
162+
void testInvalidTimeZone() {
163163
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
164164
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
165165
assertThatThrownBy(
@@ -336,15 +336,17 @@ private void testSchemaEvolutionTypesParsing(
336336
"id",
337337
"product_name",
338338
"comment=app order",
339-
"project fields from source table"),
339+
"project fields from source table",
340+
"SOFT_DELETE"),
340341
new TransformDef(
341342
"mydb.web_order_.*",
342343
"CONCAT(id, order_id) as uniq_id, *",
343344
"uniq_id > 10",
344345
null,
345346
null,
346347
null,
347-
"add new uniq_id for each row")),
348+
"add new uniq_id for each row",
349+
null)),
348350
Collections.emptyList(),
349351
Collections.singletonList(
350352
new ModelDef(
@@ -402,6 +404,7 @@ void testParsingFullDefinitionFromString() throws Exception {
402404
+ " partition-keys: product_name\n"
403405
+ " table-options: comment=app order\n"
404406
+ " description: project fields from source table\n"
407+
+ " converter-after-transform: SOFT_DELETE\n"
405408
+ " - source-table: mydb.web_order_.*\n"
406409
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
407410
+ " filter: uniq_id > 10\n"
@@ -469,15 +472,17 @@ void testParsingFullDefinitionFromString() throws Exception {
469472
"id",
470473
"product_name",
471474
"comment=app order",
472-
"project fields from source table"),
475+
"project fields from source table",
476+
"SOFT_DELETE"),
473477
new TransformDef(
474478
"mydb.web_order_.*",
475479
"CONCAT(id, order_id) as uniq_id, *",
476480
"uniq_id > 10",
477481
null,
478482
null,
479483
null,
480-
"add new uniq_id for each row")),
484+
"add new uniq_id for each row",
485+
null)),
481486
Collections.emptyList(),
482487
Collections.singletonList(
483488
new ModelDef(
@@ -606,15 +611,17 @@ void testParsingFullDefinitionFromString() throws Exception {
606611
"id",
607612
"product_name",
608613
"comment=app order",
609-
"project fields from source table"),
614+
"project fields from source table",
615+
"SOFT_DELETE"),
610616
new TransformDef(
611617
"mydb.web_order_.*",
612618
"CONCAT(id, order_id) as uniq_id, *",
613619
"uniq_id > 10",
614620
null,
615621
null,
616622
null,
617-
"add new uniq_id for each row")),
623+
"add new uniq_id for each row",
624+
null)),
618625
Collections.emptyList(),
619626
Configuration.fromMap(
620627
ImmutableMap.<String, String>builder()
@@ -646,6 +653,7 @@ void testParsingFullDefinitionFromString() throws Exception {
646653
null,
647654
null,
648655
null,
656+
null,
649657
null)),
650658
Arrays.asList(
651659
new UdfDef(

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ transform:
4949
partition-keys: product_name
5050
table-options: comment=app order
5151
description: project fields from source table
52+
converter-after-transform: SOFT_DELETE
5253
- source-table: mydb.web_order_.*
5354
projection: CONCAT(id, order_id) as uniq_id, *
5455
filter: uniq_id > 10

flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ transform:
4747
partition-keys: product_name
4848
table-options: comment=app order
4949
description: project fields from source table
50+
converter-after-transform: SOFT_DELETE
5051
- source-table: mydb.web_order_.*
5152
projection: CONCAT(id, order_id) as uniq_id, *
5253
filter: uniq_id > 10

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class TransformDef {
5050
private final String primaryKeys;
5151
private final String partitionKeys;
5252
private final String tableOptions;
53+
private final String postTransformConverter;
5354

5455
public TransformDef(
5556
String sourceTable,
@@ -58,14 +59,16 @@ public TransformDef(
5859
String primaryKeys,
5960
String partitionKeys,
6061
String tableOptions,
61-
String description) {
62+
String description,
63+
String postTransformConverter) {
6264
this.sourceTable = sourceTable;
6365
this.projection = projection;
6466
this.filter = filter;
6567
this.primaryKeys = primaryKeys;
6668
this.partitionKeys = partitionKeys;
6769
this.tableOptions = tableOptions;
6870
this.description = description;
71+
this.postTransformConverter = postTransformConverter;
6972
}
7073

7174
public String getSourceTable() {
@@ -104,6 +107,10 @@ public String getTableOptions() {
104107
return tableOptions;
105108
}
106109

110+
public String getPostTransformConverter() {
111+
return postTransformConverter;
112+
}
113+
107114
@Override
108115
public String toString() {
109116
return "TransformDef{"
@@ -119,6 +126,9 @@ public String toString() {
119126
+ ", description='"
120127
+ description
121128
+ '\''
129+
+ ", postTransformConverter='"
130+
+ postTransformConverter
131+
+ '\''
122132
+ '}';
123133
}
124134

@@ -137,7 +147,8 @@ public boolean equals(Object o) {
137147
&& Objects.equals(description, that.description)
138148
&& Objects.equals(primaryKeys, that.primaryKeys)
139149
&& Objects.equals(partitionKeys, that.partitionKeys)
140-
&& Objects.equals(tableOptions, that.tableOptions);
150+
&& Objects.equals(tableOptions, that.tableOptions)
151+
&& Objects.equals(postTransformConverter, that.postTransformConverter);
141152
}
142153

143154
@Override
@@ -149,6 +160,7 @@ public int hashCode() {
149160
description,
150161
primaryKeys,
151162
partitionKeys,
152-
tableOptions);
163+
tableOptions,
164+
postTransformConverter);
153165
}
154166
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public DataStream<Event> translatePreTransform(
6060
transform.getFilter().orElse(null),
6161
transform.getPrimaryKeys(),
6262
transform.getPartitionKeys(),
63-
transform.getTableOptions());
63+
transform.getTableOptions(),
64+
transform.getPostTransformConverter());
6465
}
6566

6667
preTransformFunctionBuilder.addUdfFunctions(
@@ -91,7 +92,8 @@ public DataStream<Event> translatePostTransform(
9192
transform.isValidFilter() ? transform.getFilter().get() : null,
9293
transform.getPrimaryKeys(),
9394
transform.getPartitionKeys(),
94-
transform.getTableOptions());
95+
transform.getTableOptions(),
96+
transform.getPostTransformConverter());
9597
}
9698
}
9799
postTransformFunctionBuilder.addTimezone(timezone);

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
325325
"col1",
326326
"col12",
327327
"key1=value1",
328-
"");
328+
"",
329+
null);
329330

330331
// Setup pipeline
331332
Configuration pipelineConfig = new Configuration();
@@ -387,7 +388,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
387388
"col1",
388389
"col12",
389390
"key1=value1",
390-
"");
391+
"",
392+
null);
391393

392394
// Setup pipeline
393395
Configuration pipelineConfig = new Configuration();
@@ -449,7 +451,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
449451
"col1",
450452
"col12",
451453
"key1=value1",
452-
"");
454+
"",
455+
null);
453456
TransformDef transformDef2 =
454457
new TransformDef(
455458
"default_namespace.default_schema.table1",
@@ -458,7 +461,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
458461
null,
459462
null,
460463
null,
461-
"");
464+
"",
465+
null);
462466
// Setup pipeline
463467
Configuration pipelineConfig = new Configuration();
464468
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
@@ -986,7 +990,8 @@ void testTransformMergingWithRoute() throws Exception {
986990
null,
987991
null,
988992
null,
989-
""));
993+
"",
994+
null));
990995

991996
// Setup route
992997
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
370370
"col1",
371371
"col12",
372372
"key1=value1",
373-
"");
373+
"",
374+
null);
374375

375376
// Setup pipeline
376377
Configuration pipelineConfig = new Configuration();
@@ -430,7 +431,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
430431
"col1",
431432
"col12",
432433
"key1=value1",
433-
"");
434+
"",
435+
null);
434436

435437
// Setup pipeline
436438
Configuration pipelineConfig = new Configuration();
@@ -490,7 +492,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
490492
"col1",
491493
"col12",
492494
"key1=value1",
493-
"");
495+
"",
496+
null);
494497
TransformDef transformDef2 =
495498
new TransformDef(
496499
"default_namespace.default_schema.table1",
@@ -499,7 +502,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
499502
null,
500503
null,
501504
null,
502-
"");
505+
"",
506+
null);
503507
// Setup pipeline
504508
Configuration pipelineConfig = new Configuration();
505509
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
@@ -1020,7 +1024,8 @@ void testTransformMergingWithRoute() throws Exception {
10201024
null,
10211025
null,
10221026
null,
1023-
""));
1027+
"",
1028+
null));
10241029

10251030
// Setup route
10261031
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");

0 commit comments

Comments
 (0)