Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech authored Jan 22, 2025
2 parents 542e6cd + 3e16a66 commit d964add
Show file tree
Hide file tree
Showing 84 changed files with 1,789 additions and 294 deletions.
9 changes: 8 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ e2e-tests:
- flink-cdc-e2e-tests/**/*
migration-tests:
- flink-cdc-migration-tests/**/*
add-ons:
- flink-cdc-pipeline-model/**/*
- flink-cdc-pipeline-udf-examples/**/*
base:
- flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/**/*
debezium:
- flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-debezium/**/*
- flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/**/*
connector-test-util:
- flink-cdc-connect/flink-cdc-source-connectors/flink-connector-test-util/**/*
db2-cdc-connector:
Expand Down Expand Up @@ -86,3 +89,7 @@ starrocks-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/**/*
elasticsearch-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/**/*
oceanbase-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/**/*
maxcompute-pipeline-connector:
- flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/**/*
6 changes: 3 additions & 3 deletions .github/workflows/close_stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ jobs:
close-issue-message: >
This issue has been closed because Flink CDC doesn't use GitHub issue trackers.
# Stale PRs
days-before-pr-stale: 60
days-before-pr-close: 30
days-before-pr-stale: 120
days-before-pr-close: 60
stale-pr-message: >
This pull request has been automatically marked as stale because it has not had recent
activity for 60 days. It will be closed in 30 days if no further activity occurs.
activity for 120 days. It will be closed in 60 days if no further activity occurs.
close-pr-message: >
This pull request has been closed because it has not had recent activity. You could reopen it
if you try to continue your work, and anyone who are interested in it are encouraged to continue
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
maven-version: 3.8.6

- name: Compile and test
timeout-minutes: 60
timeout-minutes: 90
run: |
set -o pipefail
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
name: Flink CDC CI Nightly
on:
schedule:
- cron: '0 0 * * *' # Deploy every day
- cron: '43 0 * * *' # Run daily, but not at 00:00 UTC to avoid job failure due to network throttle
workflow_dispatch:

concurrency:
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ Pipeline 连接器配置项
<td>String</td>
<td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 </td>
</tr>
<tr>
<td>sink.tableId-to-topic.mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ pipeline:
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
<tr>
<td>treat-tinyint1-as-boolean.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否将TINYINT(1)类型当做Boolean类型处理,默认true。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器
| [StarRocks]({{< ref "docs/connectors/pipeline-connectors/starrocks" >}}) | Sink | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |
| [OceanBase]({{< ref "docs/connectors/pipeline-connectors/oceanbase" >}}) | Sink | <li> [OceanBase](https://www.oceanbase.com/): 3.x, 4.x |

## Supported Flink Versions
下表展示了 Flink<sup>®</sup> CDC 管道连接器和 Flink<sup>®</sup> 之间的版本映射

| Flink<sup>®</sup> CDC Version | Flink<sup>®</sup> Version | Pipeline Source | Pipeline Sink | 备注 |
|:-----------------------------------:|:----------------------------------------------------------------------------------------------------------------------------------------:|:---------------:|:------------------------------------------:|:----------------------------------|
| <font color="DarkCyan">3.0.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font> | Mysql | StarRocks,Doris | |
| <font color="DarkCyan">3.1.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font>, <font color="MediumVioletRed">1.19.\*</font> | Mysql | StarRocks,Doris,Paimon,Kafka | 仅 flink-cdc 3.1.1 支持 flink 1.19.* |
| <font color="DarkCyan">3.2.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font>, <font color="MediumVioletRed">1.19.\*</font> | Mysql | StarRocks,Doris,Paimon,Kafka,ElasticSearch | |


## Develop Your Own Connector

如果现有的连接器无法满足您的需求,您可以自行开发自己的连接器,以将您的外部系统集成到 Flink CDC 数据管道中。查阅 [Flink CDC APIs]({{< ref "docs/developer-guide/understand-flink-cdc-api" >}}) 了解如何开发您自己的连接器。
Expand Down
13 changes: 13 additions & 0 deletions docs/content.zh/docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ restart-strategy.fixed-delay.delay= 30s
1. tableList选项要求表名使用数据库名,而不是DataStream API中的表名。对于MySQL CDC源代码,tableList选项值应该类似于‘my_db.my_table’。
2. 如果要同步排除products和orders表之外的整个my_db库,tableList选项值应该类似于‘my_db.(?!products|orders).*’。

### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么?
这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,Flink CDC 3.3.0之前的版本未处理该参数,导致TINYINT(1)类型的数据被解析为布尔值。
若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean.enabled: false`
例如:
```yaml
source:
type: mysql
...
treat-tinyint1-as-boolean.enabled: false

sink:
type: ...
```
## Postgres CDC FAQ
### Q1: 发现 PG 服务器磁盘使用率高,WAL 不释放 是什么原因?
Expand Down
8 changes: 4 additions & 4 deletions docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ under the License.
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz),解压后得到 flink-1.18.0 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz),解压后得到 flink-1.19.1 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.19.1 所在目录。

```shell
cd flink-1.18.0
cd flink-1.19.1
```

2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
Expand Down Expand Up @@ -338,7 +338,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
```shell
docker-compose down
```
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:

```shell
./bin/stop-cluster.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ under the License.
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,解压后得到 flink-1.18.0 目录。
1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,解压后得到 flink-1.19.1 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。

```shell
cd flink-1.18.0
cd flink-1.19.1
```

2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
Expand Down Expand Up @@ -306,7 +306,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
docker-compose down
```

在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:

```shell
./bin/stop-cluster.sh
Expand Down
14 changes: 13 additions & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,19 @@ During a snapshot operation, the connector will query each included table to pro
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ Pipeline Connector Options
<td>String</td>
<td>custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. </td>
</tr>
<tr>
<td>sink.tableId-to-topic.mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
</tr>
</tbody>
</table>
</div>
Expand Down
19 changes: 19 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,25 @@ pipeline:
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
<tr>
<td>treat-tinyint1-as-boolean.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether treat TINYINT(1) as boolean, by default is true.</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ definition.
| [Paimon]({{< ref "docs/connectors/pipeline-connectors/paimon" >}}) | Sink | <li> [Paimon](https://paimon.apache.org/): 0.6, 0.7, 0.8 |
| [StarRocks]({{< ref "docs/connectors/pipeline-connectors/starrocks" >}}) | Sink | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |

## Supported Flink Versions
The following table shows the version mapping between Flink<sup>®</sup> CDC Pipeline Connectors and Flink<sup>®</sup>

| Flink<sup>®</sup> CDC Version | Flink<sup>®</sup> Version | Pipeline Source | Pipeline Sink | Notes |
|:-----------------------------------:|:----------------------------------------------------------------------------------------------------------------------------------------:|:-----------------:|:------------------------------------------:|:-----------------------------------------:|
| <font color="DarkCyan">3.0.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font> | Mysql | StarRocks,Doris | |
| <font color="DarkCyan">3.1.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font>, <font color="MediumVioletRed">1.19.\*</font> | Mysql | StarRocks,Doris,Paimon,Kafka | only flink-cdc 3.1.1 support flink 1.19.* |
| <font color="DarkCyan">3.2.x</font> | <font color="MediumVioletRed">1.17.\*</font>, <font color="MediumVioletRed">1.18.\*</font>, <font color="MediumVioletRed">1.19.\*</font> | Mysql | StarRocks,Doris,Paimon,Kafka,ElasticSearch | |


## Develop Your Own Connector

If provided connectors cannot fulfill your requirement, you can always develop
Expand Down
13 changes: 13 additions & 0 deletions docs/content/docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ The reason for this problem is that the reading of the full volume phase of the
1. The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like ‘my_db.my_table’.
2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!products|orders).*'.

### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen?
This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values.
To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean.enabled: false` at the source node.
For example:
```yaml
source:
type: mysql
...
treat-tinyint1-as-boolean.enabled: false

sink:
type: ...
```
## Postgres CDC FAQ
### Q1: It is found that the disk utilization rate of PG server is high. What is the reason why wal is not released?
Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/get-started/quickstart/mysql-to-doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation.
Prepare a Linux or MacOS computer with Docker installed.

### Prepare Flink Standalone cluster
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.
1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,unzip and get flink-1.19.1 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located.

```shell
cd flink-1.18.0
cd flink-1.19.1
```

2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
Expand Down Expand Up @@ -343,7 +343,7 @@ After finishing the tutorial, run the following command to stop all containers i
```shell
docker-compose down
```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`:
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`:

```shell
./bin/stop-cluster.sh
Expand Down
Loading

0 comments on commit d964add

Please sign in to comment.