Skip to content

Commit ad1f554

Browse files
yuxiqianWholeWorld-Timothyicchux
authored
[FLINK-35647][route] Support symbol replacement to enrich routing rules
This closes #3428. Co-authored-by: 张田 <[email protected]> Co-authored-by: yangshuaitong <[email protected]>
1 parent 302a691 commit ad1f554

File tree

16 files changed

+494
-80
lines changed

16 files changed

+494
-80
lines changed

docs/content.zh/docs/core-concept/route.md

+21-6
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ under the License.
3030
# Parameters
3131
To describe a route, the follows are required:
3232

33-
| parameter | meaning | optional/required |
34-
|--------------|----------------------------------------------------|-------------------|
35-
| source-table | Source table id, supports regular expressions | required |
36-
| sink-table | Sink table id, supports regular expressions | required |
37-
| description | Routing rule description(a default value provided) | optional |
33+
| parameter | meaning | optional/required |
34+
|----------------|---------------------------------------------------------------------------------------------|-------------------|
35+
| source-table | Source table id, supports regular expressions | required |
36+
| sink-table | Sink table id, supports symbol replacement | required |
37+
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
38+
| description | Routing rule description(a default value provided) | optional |
3839

3940
A route module can contain a list of source-table/sink-table rules.
4041

@@ -71,4 +72,18 @@ route:
7172
- source-table: mydb.products
7273
sink-table: ods_db.ods_products
7374
description: sync products table to ods_products
74-
```
75+
```
76+
77+
## Pattern Replacement in routing rules
78+
79+
If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this:
80+
81+
```yaml
82+
route:
83+
- source-table: source_db.\.*
84+
sink-table: sink_db.<>
85+
replace-symbol: <>
86+
description: route all tables in source_db to sink_db
87+
```
88+
89+
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.

docs/content/docs/core-concept/route.md

+21-6
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ under the License.
3030
# Parameters
3131
To describe a route, the follows are required:
3232

33-
| parameter | meaning | optional/required |
34-
|--------------|----------------------------------------------------|-------------------|
35-
| source-table | Source table id, supports regular expressions | required |
36-
| sink-table | Sink table id, supports regular expressions | required |
37-
| description | Routing rule description(a default value provided) | optional |
33+
| parameter | meaning | optional/required |
34+
|----------------|---------------------------------------------------------------------------------------------|-------------------|
35+
| source-table | Source table id, supports regular expressions | required |
36+
| sink-table | Sink table id, supports symbol replacement | required |
37+
| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional |
38+
| description | Routing rule description(a default value provided) | optional |
3839

3940
A route module can contain a list of source-table/sink-table rules.
4041

@@ -71,4 +72,18 @@ route:
7172
- source-table: mydb.products
7273
sink-table: ods_db.ods_products
7374
description: sync products table to ods_products
74-
```
75+
```
76+
77+
## Pattern Replacement in routing rules
78+
79+
If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this:
80+
81+
```yaml
82+
route:
83+
- source-table: source_db.\.*
84+
sink-table: sink_db.<>
85+
replace-symbol: <>
86+
description: route all tables in source_db to sink_db
87+
```
88+
89+
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
5555
// Route keys
5656
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
5757
private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
58+
private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
5859
private static final String ROUTE_DESCRIPTION_KEY = "description";
5960

6061
// Transform keys
@@ -164,11 +165,15 @@ private RouteDef toRouteDef(JsonNode routeNode) {
164165
"Missing required field \"%s\" in route configuration",
165166
ROUTE_SINK_TABLE_KEY)
166167
.asText();
168+
String replaceSymbol =
169+
Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL))
170+
.map(JsonNode::asText)
171+
.orElse(null);
167172
String description =
168173
Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY))
169174
.map(JsonNode::asText)
170175
.orElse(null);
171-
return new RouteDef(sourceTable, sinkTable, description);
176+
return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
172177
}
173178

174179
private TransformDef toTransformDef(JsonNode transformNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

+80-1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,15 @@ void testInvalidTimeZone() throws Exception {
166166
+ "Or use 'UTC' without time zone and daylight saving time.");
167167
}
168168

169+
@Test
170+
void testRouteWithReplacementSymbol() throws Exception {
171+
URL resource =
172+
Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml");
173+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
174+
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
175+
assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym);
176+
}
177+
169178
private final PipelineDef fullDef =
170179
new PipelineDef(
171180
new SourceDef(
@@ -197,10 +206,12 @@ void testInvalidTimeZone() throws Exception {
197206
new RouteDef(
198207
"mydb.default.app_order_.*",
199208
"odsdb.default.app_order",
209+
null,
200210
"sync all sharding tables to one"),
201211
new RouteDef(
202212
"mydb.default.web_order",
203213
"odsdb.default.ods_web_order",
214+
null,
204215
"sync table to with given prefix ods_")),
205216
Arrays.asList(
206217
new TransformDef(
@@ -258,10 +269,12 @@ void testInvalidTimeZone() throws Exception {
258269
new RouteDef(
259270
"mydb.default.app_order_.*",
260271
"odsdb.default.app_order",
272+
null,
261273
"sync all sharding tables to one"),
262274
new RouteDef(
263275
"mydb.default.web_order",
264276
"odsdb.default.ods_web_order",
277+
null,
265278
"sync table to with given prefix ods_")),
266279
Arrays.asList(
267280
new TransformDef(
@@ -312,7 +325,10 @@ void testInvalidTimeZone() throws Exception {
312325
.build())),
313326
Collections.singletonList(
314327
new RouteDef(
315-
"mydb.default.app_order_.*", "odsdb.default.app_order", null)),
328+
"mydb.default.app_order_.*",
329+
"odsdb.default.app_order",
330+
null,
331+
null)),
316332
Collections.emptyList(),
317333
Configuration.fromMap(
318334
ImmutableMap.<String, String>builder()
@@ -326,4 +342,67 @@ void testInvalidTimeZone() throws Exception {
326342
Collections.emptyList(),
327343
Collections.emptyList(),
328344
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
345+
346+
private final PipelineDef fullDefWithRouteRepSym =
347+
new PipelineDef(
348+
new SourceDef(
349+
"mysql",
350+
"source-database",
351+
Configuration.fromMap(
352+
ImmutableMap.<String, String>builder()
353+
.put("host", "localhost")
354+
.put("port", "3306")
355+
.put("username", "admin")
356+
.put("password", "pass")
357+
.put(
358+
"tables",
359+
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
360+
.put(
361+
"chunk-column",
362+
"app_order_.*:id,web_order:product_id")
363+
.put("capture-new-tables", "true")
364+
.build())),
365+
new SinkDef(
366+
"kafka",
367+
"sink-queue",
368+
Configuration.fromMap(
369+
ImmutableMap.<String, String>builder()
370+
.put("bootstrap-servers", "localhost:9092")
371+
.put("auto-create-table", "true")
372+
.build())),
373+
Arrays.asList(
374+
new RouteDef(
375+
"mydb.default.app_order_.*",
376+
"odsdb.default.app_order_<>",
377+
"<>",
378+
"sync all sharding tables to one"),
379+
new RouteDef(
380+
"mydb.default.web_order",
381+
"odsdb.default.ods_web_order_>_<",
382+
">_<",
383+
"sync table to with given prefix ods_")),
384+
Arrays.asList(
385+
new TransformDef(
386+
"mydb.app_order_.*",
387+
"id, order_id, TO_UPPER(product_name)",
388+
"id > 10 AND order_id > 100",
389+
"id",
390+
"product_name",
391+
"comment=app order",
392+
"project fields from source table"),
393+
new TransformDef(
394+
"mydb.web_order_.*",
395+
"CONCAT(id, order_id) as uniq_id, *",
396+
"uniq_id > 10",
397+
null,
398+
null,
399+
null,
400+
"add new uniq_id for each row")),
401+
Configuration.fromMap(
402+
ImmutableMap.<String, String>builder()
403+
.put("name", "source-database-sync-pipe")
404+
.put("parallelism", "4")
405+
.put("schema.change.behavior", "evolve")
406+
.put("schema-operator.rpc-timeout", "1 h")
407+
.build()));
329408
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
################################################################################
17+
source:
18+
type: mysql
19+
name: source-database
20+
host: localhost
21+
port: 3306
22+
username: admin
23+
password: pass
24+
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
25+
chunk-column: app_order_.*:id,web_order:product_id
26+
capture-new-tables: true
27+
28+
sink:
29+
type: kafka
30+
name: sink-queue
31+
bootstrap-servers: localhost:9092
32+
auto-create-table: true
33+
34+
route:
35+
- source-table: mydb.default.app_order_.*
36+
sink-table: odsdb.default.app_order_<>
37+
replace-symbol: "<>"
38+
description: sync all sharding tables to one
39+
- source-table: mydb.default.web_order
40+
sink-table: odsdb.default.ods_web_order_>_<
41+
replace-symbol: ">_<"
42+
description: sync table to with given prefix ods_
43+
44+
transform:
45+
- source-table: mydb.app_order_.*
46+
projection: id, order_id, TO_UPPER(product_name)
47+
filter: id > 10 AND order_id > 100
48+
primary-keys: id
49+
partition-keys: product_name
50+
table-options: comment=app order
51+
description: project fields from source table
52+
- source-table: mydb.web_order_.*
53+
projection: CONCAT(id, order_id) as uniq_id, *
54+
filter: uniq_id > 10
55+
description: add new uniq_id for each row
56+
57+
pipeline:
58+
name: source-database-sync-pipe
59+
parallelism: 4
60+
schema.change.behavior: evolve
61+
schema-operator.rpc-timeout: 1 h
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.route;
19+
20+
import java.io.Serializable;
21+
22+
/** Definition of a routing rule with replacement symbol. */
23+
public class RouteRule implements Serializable {
24+
25+
private static final long serialVersionUID = 1L;
26+
27+
public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) {
28+
this.sourceTable = sourceTable;
29+
this.sinkTable = sinkTable;
30+
this.replaceSymbol = replaceSymbol;
31+
}
32+
33+
public String sourceTable;
34+
public String sinkTable;
35+
public String replaceSymbol;
36+
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,17 @@
3636
public class RouteDef {
3737
private final String sourceTable;
3838
private final String sinkTable;
39+
private final String replaceSymbol;
3940
@Nullable private final String description;
4041

41-
public RouteDef(String sourceTable, String sinkTable, @Nullable String description) {
42+
public RouteDef(
43+
String sourceTable,
44+
String sinkTable,
45+
@Nullable String replaceSymbol,
46+
@Nullable String description) {
4247
this.sourceTable = sourceTable;
4348
this.sinkTable = sinkTable;
49+
this.replaceSymbol = replaceSymbol;
4450
this.description = description;
4551
}
4652

@@ -52,6 +58,10 @@ public String getSinkTable() {
5258
return sinkTable;
5359
}
5460

61+
public Optional<String> getReplaceSymbol() {
62+
return Optional.ofNullable(replaceSymbol);
63+
}
64+
5565
public Optional<String> getDescription() {
5666
return Optional.ofNullable(description);
5767
}
@@ -63,6 +73,8 @@ public String toString() {
6373
+ sourceTable
6474
+ ", sinkTable="
6575
+ sinkTable
76+
+ ", replaceSymbol="
77+
+ replaceSymbol
6678
+ ", description='"
6779
+ description
6880
+ '\''
@@ -80,11 +92,12 @@ public boolean equals(Object o) {
8092
RouteDef routeDef = (RouteDef) o;
8193
return Objects.equals(sourceTable, routeDef.sourceTable)
8294
&& Objects.equals(sinkTable, routeDef.sinkTable)
95+
&& Objects.equals(replaceSymbol, routeDef.replaceSymbol)
8396
&& Objects.equals(description, routeDef.description);
8497
}
8598

8699
@Override
87100
public int hashCode() {
88-
return Objects.hash(sourceTable, sinkTable, description);
101+
return Objects.hash(sourceTable, sinkTable, replaceSymbol, description);
89102
}
90103
}

0 commit comments

Comments
 (0)