Skip to content

Commit d0457f7

Browse files
authored
[FLINK-35314][cdc] Add Flink CDC pipeline transform user document (#3308)
1 parent 8d2d2a6 commit d0457f7

File tree

2 files changed

+482
-2
lines changed

2 files changed

+482
-2
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 241 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,245 @@ under the License.
2828
**Transform** module helps users delete and expand data columns based on the data columns in the table.
2929
What's more, it also helps users filter some unnecessary data during the synchronization process.
3030

31+
# Parameters
32+
To describe a transform rule, the following parameters can be used:
33+
34+
| Parameter | Meaning | Optional/Required |
35+
|--------------|----------------------------------------------------|-------------------|
36+
| source-table | Source table id, supports regular expressions | required |
37+
| projection | Projection rule, supports syntax similar to the select clause in SQL | optional |
38+
| filter | Filter rule, supports syntax similar to the where clause in SQL | optional |
39+
| primary-keys | Sink table primary keys, separated by commas | optional |
40+
| partition-keys | Sink table partition keys, separated by commas | optional |
41+
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| description | Transform rule description | optional |
43+
44+
Multiple rules can be declared in one single pipeline YAML file.
45+
46+
# Metadata Fields
47+
## Fields definition
48+
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
49+
50+
| Field | Data Type | Description |
51+
|--------------------|-----------|----------------------------------------------|
52+
| __namespace_name__ | String | Name of the namespace that contains the row. |
53+
| __schema_name__ | String | Name of the schema that contains the row. |
54+
| __table_name__ | String | Name of the table that contains the row. |
55+
56+
## Metadata relationship
57+
58+
| Type | Namespace | SchemaName | Table |
59+
|----------------------|-----------|------------|-------|
60+
| JDBC | Catalog | Schema | Table |
61+
| Debezium | Catalog | Schema | Table |
62+
| MySQL | Database | - | Table |
63+
| Postgres | Database | Schema | Table |
64+
| Oracle | - | Schema | Table |
65+
| Microsoft SQL Server | Database | Schema | Table |
66+
| StarRocks | Database | - | Table |
67+
| Doris | Database | - | Table |
68+
69+
# Functions
70+
Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [Janino script](https://www.janino.net/) to evaluate expressions with function call.
71+
72+
## Comparison Functions
73+
74+
| Function | Janino Code | Description |
75+
|----------------------|-----------------------------|-----------------------------------------------------------------|
76+
| value1 = value2 | valueEquals(value1, value2) | Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is NULL. |
77+
| value1 <> value2 | !valueEquals(value1, value2) | Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or value2 is NULL. |
78+
| value1 > value2 | value1 > value2 | Returns TRUE if value1 is greater than value2; returns FALSE if value1 or value2 is NULL. |
79+
| value1 >= value2 | value1 >= value2 | Returns TRUE if value1 is greater than or equal to value2; returns FALSE if value1 or value2 is NULL. |
80+
| value1 < value2 | value1 < value2 | Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2 is NULL. |
81+
| value1 <= value2 | value1 <= value2 | Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1 or value2 is NULL. |
82+
| value IS NULL | null == value | Returns TRUE if value is NULL. |
83+
| value IS NOT NULL | null != value | Returns TRUE if value is not NULL. |
84+
| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. |
85+
| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, value3) | Returns TRUE if value1 is less than value2 or greater than value3. |
86+
| string1 LIKE string2 | like(string1, string2) | Returns TRUE if string1 matches pattern string2. |
87+
| string1 NOT LIKE string2 | notLike(string1, string2) | Returns TRUE if string1 does not match pattern string2. |
88+
| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*) | Returns TRUE if value1 exists in the given list (value2, value3, …). |
89+
| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*) | Returns TRUE if value1 does not exist in the given list (value2, value3, …). |
90+
91+
## Logical Functions
92+
93+
| Function | Janino Code | Description |
94+
|----------------------|-----------------------------|-----------------------------------------------------------------|
95+
| boolean1 OR boolean2 | boolean1 &#124;&#124; boolean2 | Returns TRUE if BOOLEAN1 is TRUE or BOOLEAN2 is TRUE. |
96+
| boolean1 AND boolean2 | boolean1 && boolean2 | Returns TRUE if BOOLEAN1 and BOOLEAN2 are both TRUE. |
97+
| NOT boolean | !boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. |
98+
| boolean IS FALSE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. |
99+
| boolean IS NOT FALSE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
100+
| boolean IS TRUE | true == boolean | Returns TRUE if BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
101+
| boolean IS NOT TRUE | false == boolean | Returns TRUE if boolean is FALSE; returns FALSE if boolean is TRUE. |
102+
103+
## Arithmetic Functions
104+
105+
| Function | Janino Code | Description |
106+
|----------------------|-----------------------------|-----------------------------------------------------------------|
107+
| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. |
108+
| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. |
109+
| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. |
110+
| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. |
111+
| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. |
112+
| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. |
113+
| CEIL(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. |
114+
| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. |
115+
| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. |
116+
| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
117+
118+
## String Functions
119+
120+
| Function | Janino Code | Description |
121+
| -------------------- | ------------------------ | ------------------------------------------------- |
122+
| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. |
123+
| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. |
124+
| UPPER(string) | upper(string) | Returns string in uppercase. |
125+
| LOWER(string) | lower(string) | Returns string in lowercase. |
126+
| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. |
127+
| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
128+
| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). |
129+
| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. |
130+
131+
## Temporal Functions
132+
133+
| Function | Janino Code | Description |
134+
| -------------------- | ------------------------ | ------------------------------------------------- |
135+
| LOCALTIME | localtime() | Returns the current SQL time in the local time zone, the return type is TIME(0). |
136+
| LOCALTIMESTAMP | localtimestamp() | Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP(3). |
137+
| CURRENT_TIME | currentTime() | Returns the current SQL time in the local time zone, this is a synonym of LOCAL_TIME. |
138+
| CURRENT_DATE | currentDate() | Returns the current SQL date in the local time zone. |
139+
| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). |
140+
| NOW() | now() | Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. |
141+
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. |
142+
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
143+
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
144+
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
145+
146+
## Conditional Functions
147+
148+
| Function | Janino Code | Description |
149+
| -------------------- | ------------------------ | ------------------------------------------------- |
150+
| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first time value is contained in (valueX_1, valueX_2, …). When no value matches, returns result_z if it is provided and returns NULL otherwise. |
151+
| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END | Nested ternary expression | Returns resultX when the first conditionX is met. When no condition is met, returns result_z if it is provided and returns NULL otherwise. |
152+
| COALESCE(value1 [, value2]*) | coalesce(Object... objects) | Returns the first argument that is not NULL.If all arguments are NULL, it returns NULL as well. The return type is the least restrictive, common type of all of its arguments. The return type is nullable if all arguments are nullable as well. |
153+
| IF(condition, true_value, false_value) | condition ? true_value : false_value | Returns the true_value if condition is met, otherwise false_value. E.g., IF(5 > 3, 5, 3) returns 5. |
154+
31155
# Example
32-
This feature will support soon.
156+
## Add computed columns
157+
Evaluation expressions can be used to generate new columns. For example, if we want to append two computed columns based on the table `web_order` in the database `mydb`, we may define a transform rule as follows:
158+
159+
```yaml
160+
transform:
161+
- source-table: mydb.web_order
162+
projection: id, order_id, UPPER(product_name) as product_name, localtimestamp as new_timestamp
163+
description: append calculated columns based on source table
164+
```
165+
166+
## Reference metadata columns
167+
We may reference metadata column in projection expressions. For example, given a table `web_order` in the database `mydb`, we may define a transform rule as follows:
168+
169+
```yaml
170+
transform:
171+
- source-table: mydb.web_order
172+
projection: id, order_id, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name
173+
description: access metadata columns from source table
174+
```
175+
176+
## Use wildcard character to project all fields
177+
A wildcard character (`*`) can be used to reference all fields in a table. For example, given two tables `web_order` and `app_order` in the database `mydb`, we may define a transform rule as follows:
178+
179+
```yaml
180+
transform:
181+
- source-table: mydb.web_order
182+
projection: \*, UPPER(product_name) as product_name
183+
description: project fields with wildcard character from source table
184+
- source-table: mydb.app_order
185+
projection: UPPER(product_name) as product_name, *
186+
description: project fields with wildcard character from source table
187+
```
188+
Notice: When `*` character presents at the beginning of expressions, an escaping backslash is required.
189+
190+
## Add filter rule
191+
Use reference columns when adding filtering rules to the table `web_order` in the database `mydb`, we may define a transform rule as follows:
192+
193+
```yaml
194+
transform:
195+
- source-table: mydb.web_order
196+
filter: id > 10 AND order_id > 100
197+
description: filtering rows from source table
198+
```
199+
200+
Computed columns can be used in filtering conditions, too. For example, given a table `web_order` in the database `mydb`, we may define a transform rule as follows:
201+
202+
```yaml
203+
transform:
204+
- source-table: mydb.web_order
205+
projection: id, order_id, UPPER(province) as new_province
206+
filter: new_province = 'SHANGHAI'
207+
description: filtering rows based on computed columns
208+
```
209+
210+
## Reassign primary key
211+
We can reassign the primary key in transform rules. For example, given a table `web_order` in the database `mydb`, we may define a transform rule as follows:
212+
213+
```yaml
214+
transform:
215+
- source-table: mydb.web_order
216+
projection: id, order_id
217+
primary-keys: order_id
218+
description: reassign primary key example
219+
```
220+
221+
Composite primary keys are also supported:
222+
223+
```yaml
224+
transform:
225+
- source-table: mydb.web_order
226+
projection: id, order_id, UPPER(product_name) as product_name
227+
primary-keys: order_id, product_name
228+
description: reassign composite primary keys example
229+
```
230+
231+
## Reassign partition key
232+
We can reassign the partition key in transform rules. For example, given a table web_order in the database mydb, we may define a transform rule as follows:
233+
234+
```yaml
235+
transform:
236+
- source-table: mydb.web_order
237+
projection: id, order_id, UPPER(product_name) as product_name
238+
partition-keys: product_name
239+
description: reassign partition key example
240+
```
241+
242+
## Specify table creation configuration
243+
Extra options can be defined in a transform rule, and will be applied when creating downstream tables. Given a table `web_order` in the database `mydb`, we may define a transform rule as follows:
244+
245+
```yaml
246+
transform:
247+
- source-table: mydb.web_order
248+
projection: id, order_id, UPPER(product_name) as product_name
249+
table-options: comment=web order
250+
description: auto creating table options example
251+
```
252+
Tips: The format of table-options is `key1=value1,key2=value2`.
253+
254+
## Classification mapping
255+
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
256+
257+
```yaml
258+
transform:
259+
- source-table: mydb.web_order
260+
projection: id, order_id
261+
filter: UPPER(province) = 'SHANGHAI'
262+
description: classification mapping example
263+
- source-table: mydb.web_order
264+
projection: order_id as id, id as order_id
265+
filter: UPPER(province) = 'BEIJING'
266+
description: classification mapping example
267+
```
268+
269+
# Known limitations
270+
* Currently, transform doesn't work with route rules. It will be supported in future versions.
271+
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
272+
* Regular matching of tables with different schemas is not supported. If necessary, multiple rules need to be written.

0 commit comments

Comments
 (0)