From b3f5fa5549fccb657198e8f7acd4057fef8e3a54 Mon Sep 17 00:00:00 2001 From: stdnt-xiao Date: Wed, 27 Dec 2023 21:13:12 +0800 Subject: [PATCH] [Document][`CDCSOURCE`] Add parameter description and example for debezium. * (#2828) * [Document][`CDCSOURCE`] Add the oplog parameter for debezium.skipped.operations * [Document][`CDCSOURCE`] Add the oplog parameter for debezium.* --- .../cdcsource_statements.md | 80 +++++++++++++------ 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/docs/docs/data_integration_guide/cdcsource_statements.md b/docs/docs/data_integration_guide/cdcsource_statements.md index c94a535515..d3ec177cf1 100644 --- a/docs/docs/data_integration_guide/cdcsource_statements.md +++ b/docs/docs/data_integration_guide/cdcsource_statements.md @@ -88,30 +88,31 @@ plugins/flink-${flink-version}/dlink-client-${version}.jar ## 配置参数 -| 配置项 | 是否必须 | 默认值 | 说明 | -| ------------------------------ | -------- | ------------- | ------------------------------------------------------------ | -| connector | 是 | 无 | 指定要使用的连接器 | -| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 | -| port | 是 | 无 | 数据库服务器的端口号 | -| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 | -| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 | -| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” | -| database-name | 否 | 无 | 此参数非必填 | -| table-name | 否 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" | -| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 | -| checkpoint | 否 | 无 | 单位 ms | -| parallelism | 否 | 无 | 任务并行度 | +| 配置项 | 是否必须 | 默认值 | 说明 | +|--------------------------------| -------- | ------------- |------------------------------------------------------------------------------------------------------------------------------| +| connector | 是 | 无 | 指定要使用的连接器 | +| hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 | +| port | 是 | 无 | 数据库服务器的端口号 | +| username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 | +| password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 | +| scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” | +| database-name | 否 | 无 | 此参数非必填 | +| table-name | 否 | 无 | 只支持正则,示例:"test\\.student,test\\.score",所有表示例:"test\\..*" | +| source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 | +| debezium.* | 否 | 无 | 支持debezium参数,示例:`'debezium.skipped.operations'='d'` 即过滤源数据库删除操作日志。 | +| checkpoint | 否 | 无 | 单位 ms | +| parallelism | 否 | 无 | 任务并行度 | | sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 | -| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 | -| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ | -| sink.table.suffix | 否 | 无 | 目标表的表名后缀 | -| sink.table.upper | 否 | false | 目标表的表名全大写 | -| sink.table.lower | 否 | false | 目标表的表名全小写 | -| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 | -| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 | -| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') | -| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 | -| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. | +| sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 | +| sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ | +| sink.table.suffix | 否 | 无 | 目标表的表名后缀 | +| sink.table.upper | 否 | false | 目标表的表名全大写 | +| sink.table.lower | 否 | false | 目标表的表名全小写 | +| sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 | +| sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 | +| sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\\n', '') | +| sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 | +| sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. | ## 经典示例 @@ -547,6 +548,39 @@ EXECUTE CDCSOURCE jobname WITH ( ) ``` +### 支持debezium参数 + +CDCSOURCE 支持 debezium.* 参数。该示例为将 mysql 整库同步到另一个 mysql 数据库,添加`'debezium.skipped.operations'='d'`参数,使得采集日志过滤掉源数据库删除操作,让目标库保留全量数据。 + +```sql +EXECUTE CDCSOURCE cdc_mysql WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = '127.0.0.1', + 'port' = '3306', + 'username' = 'root', + 'password' = '123456', + 'checkpoint' = '3000', + 'scan.startup.mode' = 'initial', + 'parallelism' = '1', + 'table-name' = 'bigdata\.products,bigdata\.orders', + 'debezium.skipped.operations'='d', + 'sink.connector' = 'jdbc', + 'sink.url' = 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false', + 'sink.username' = 'root', + 'sink.password' = '123456', + 'sink.sink.db' = 'test', + 'sink.table.prefix' = 'test_', + 'sink.table.lower' = 'true', + 'sink.table-name' = '${tableName}', + 'sink.driver' = 'com.mysql.jdbc.Driver', + 'sink.sink.buffer-flush.interval' = '2s', + 'sink.sink.buffer-flush.max-rows' = '100', + 'sink.sink.max-retries' = '5', + 'sink.auto.create' = 'true' +) +``` + + ## 常见问题 ### 如何确认整库同步任务提交成功