Skip to content

Commit

Permalink
[Document][CDCSOURCE] Add parameter description and example for deb…
Browse files Browse the repository at this point in the history
…ezium. * (#2828)

* [Document][`CDCSOURCE`] Add the oplog parameter for debezium.skipped.operations

* [Document][`CDCSOURCE`] Add the oplog parameter for debezium.*
  • Loading branch information
stdnt-xiao authored Dec 27, 2023
1 parent 57758d3 commit b3f5fa5
Showing 1 changed file with 57 additions and 23 deletions.
80 changes: 57 additions & 23 deletions docs/docs/data_integration_guide/cdcsource_statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,31 @@ plugins/flink-${flink-version}/dlink-client-${version}.jar

## 配置参数

| 配置项 | 是否必须 | 默认值 | 说明 |
| ------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
| connector ||| 指定要使用的连接器 |
| hostname ||| 数据库服务器的 IP 地址或主机名 |
| port ||| 数据库服务器的端口号 |
| username ||| 连接到数据库服务器时要使用的数据库的用户名 |
| password ||| 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode || latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name ||| 此参数非必填 |
| table-name ||| 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| source.* ||| 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| checkpoint ||| 单位 ms |
| parallelism ||| 任务并行度 |
| 配置项 | 是否必须 | 默认值 | 说明 |
|--------------------------------| -------- | ------------- |------------------------------------------------------------------------------------------------------------------------------|
| connector ||| 指定要使用的连接器 |
| hostname ||| 数据库服务器的 IP 地址或主机名 |
| port ||| 数据库服务器的端口号 |
| username ||| 连接到数据库服务器时要使用的数据库的用户名 |
| password ||| 连接到数据库服务器时要使用的数据库的密码 |
| scan.startup.mode || latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
| database-name ||| 此参数非必填 |
| table-name ||| 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" |
| source.* ||| 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
| debezium.* ||| 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 |
| checkpoint ||| 单位 ms |
| parallelism ||| 任务并行度 |
| sink.connector ||| 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
| sink.sink.db ||| 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix ||| 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix ||| 目标表的表名后缀 |
| sink.table.upper || false | 目标表的表名全大写 |
| sink.table.lower || false | 目标表的表名全小写 |
| sink.auto.create || false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone || UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break || false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* ||| 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* ||| N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |
| sink.sink.db ||| 目标数据源的库名,不指定时默认使用源数据源的库名 |
| sink.table.prefix ||| 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
| sink.table.suffix ||| 目标表的表名后缀 |
| sink.table.upper || false | 目标表的表名全大写 |
| sink.table.lower || false | 目标表的表名全小写 |
| sink.auto.create || false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
| sink.timezone || UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
| sink.column.replace.line-break || false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') |
| sink.* ||| 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
| sink[N].* ||| N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |

## 经典示例

Expand Down Expand Up @@ -547,6 +548,39 @@ EXECUTE CDCSOURCE jobname WITH (
)
```

### 支持debezium参数

CDCSOURCE 支持 debezium.* 参数。该示例为将 mysql 整库同步到另一个 mysql 数据库,添加`'debezium.skipped.operations'='d'`参数,使得采集日志过滤掉源数据库删除操作,让目标库保留全量数据。

```sql
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'debezium.skipped.operations'='d',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
```


## 常见问题

### 如何确认整库同步任务提交成功
Expand Down

0 comments on commit b3f5fa5

Please sign in to comment.