Skip to content

Commit 6a345aa

Browse files
authored
[FLINK-34545][pipeline-connector/ob]Add OceanBase pipeline connector to Flink CDC
This closes #3360.
1 parent dca94bd commit 6a345aa

File tree

34 files changed

+4634
-3
lines changed

34 files changed

+4634
-3
lines changed

.github/workflows/flink_cdc_base.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ env:
5757
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks,\
5858
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka,\
5959
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon,\
60-
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch"
60+
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch,\
61+
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase"
6162

6263
MODULES_MYSQL: "\
6364
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc,\
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
---
2+
title: "OceanBase"
3+
weight: 7
4+
type: docs
5+
aliases:
6+
- /connectors/pipeline-connectors/oceanbase
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# OceanBase Connector
28+
29+
OceanBase Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[OceanBase](https://github.com/oceanbase/oceanbase)。 本文档介绍如何设置 OceanBase Pipeline 连接器。
30+
31+
## 连接器的功能
32+
* 自动建表
33+
* 表结构变更同步
34+
* 数据实时同步
35+
36+
## 示例
37+
38+
从 MySQL 读取数据同步到 OceanBase 的 Pipeline 可以定义如下:
39+
40+
```yaml
41+
source:
42+
type: mysql
43+
hostname: mysql
44+
port: 3306
45+
username: mysqluser
46+
password: mysqlpw
47+
tables: mysql_2_oceanbase_test_17l13vc.\.*
48+
server-id: 5400-5404
49+
server-time-zone: UTC
50+
51+
sink:
52+
type: oceanbase
53+
url: jdbc:mysql://oceanbase:2881/test
54+
username: root@test
55+
password:
56+
57+
pipeline:
58+
name: MySQL to OceanBase Pipeline
59+
parallelism: 1
60+
```
61+
62+
## 连接器配置项
63+
64+
<div class="highlight">
65+
<table class="colwidths-auto docutils">
66+
<thead>
67+
<tr>
68+
<th>参数名</th>
69+
<th>是否必需</th>
70+
<th>默认值</th>
71+
<th>类型</th>
72+
<th>描述</th>
73+
</tr>
74+
</thead>
75+
<tbody>
76+
<tr>
77+
<td>type</td>
78+
<td>required</td>
79+
<td style="word-wrap: break-word;">(none)</td>
80+
<td>String</td>
81+
<td>指定要使用的连接器, 这里需要设置成 <code>'oceanbase'</code>.</td>
82+
</tr>
83+
<tr>
84+
<td>url</td>
85+
<td>是</td>
86+
<td></td>
87+
<td>String</td>
88+
<td>数据库的 JDBC url。</td>
89+
</tr>
90+
<tr>
91+
<td>username</td>
92+
<td>是</td>
93+
<td></td>
94+
<td>String</td>
95+
<td>连接用户名。</td>
96+
</tr>
97+
<tr>
98+
<td>password</td>
99+
<td>是</td>
100+
<td></td>
101+
<td>String</td>
102+
<td>连接密码。</td>
103+
</tr>
104+
<tr>
105+
<td>schema-name</td>
106+
<td>否</td>
107+
<td></td>
108+
<td>String</td>
109+
<td>连接的 schema 名或 db 名。</td>
110+
</tr>
111+
<tr>
112+
<td>table-name</td>
113+
<td>否</td>
114+
<td></td>
115+
<td>String</td>
116+
<td>表名。</td>
117+
</tr>
118+
<tr>
119+
<td>driver-class-name</td>
120+
<td>否</td>
121+
<td>com.mysql.cj.jdbc.Driver</td>
122+
<td>String</td>
123+
<td>驱动类名,默认为 'com.mysql.cj.jdbc.Driver'。同时该connector并不包含对应驱动,需手动引入。</td>
124+
</tr>
125+
<tr>
126+
<td>druid-properties</td>
127+
<td>否</td>
128+
<td></td>
129+
<td>String</td>
130+
<td>Druid 连接池属性,多个值用分号分隔。</td>
131+
</tr>
132+
<tr>
133+
<td>sync-write</td>
134+
<td>否</td>
135+
<td>false</td>
136+
<td>Boolean</td>
137+
<td>是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。</td>
138+
</tr>
139+
<tr>
140+
<td>buffer-flush.interval</td>
141+
<td>否</td>
142+
<td>1s</td>
143+
<td>Duration</td>
144+
<td>缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。</td>
145+
</tr>
146+
<tr>
147+
<td>buffer-flush.buffer-size</td>
148+
<td>否</td>
149+
<td>1000</td>
150+
<td>Integer</td>
151+
<td>缓冲区大小。</td>
152+
</tr>
153+
<tr>
154+
<td>max-retries</td>
155+
<td>否</td>
156+
<td>3</td>
157+
<td>Integer</td>
158+
<td>失败重试次数。</td>
159+
</tr>
160+
<tr>
161+
<td>memstore-check.enabled</td>
162+
<td>否</td>
163+
<td>true</td>
164+
<td>Boolean</td>
165+
<td>是否开启内存检查。</td>
166+
</tr>
167+
<tr>
168+
<td>memstore-check.threshold</td>
169+
<td>否</td>
170+
<td>0.9</td>
171+
<td>Double</td>
172+
<td>内存使用的阈值相对最大限制值的比例。</td>
173+
</tr>
174+
<tr>
175+
<td>memstore-check.interval</td>
176+
<td>否</td>
177+
<td>30s</td>
178+
<td>Duration</td>
179+
<td>内存使用检查周期。</td>
180+
</tr>
181+
<tr>
182+
<td>partition.enabled</td>
183+
<td>否</td>
184+
<td>false</td>
185+
<td>Boolean</td>
186+
<td>是否启用分区计算功能,按照分区来写数据。仅当 'sync-write' 和 'direct-load.enabled' 都为 false 时生效。</td>
187+
</tr>
188+
<tr>
189+
<td>direct-load.enabled</td>
190+
<td>否</td>
191+
<td>false</td>
192+
<td>Boolean</td>
193+
<td>是否开启旁路导入。需要注意旁路导入需要将 sink 的并发度设置为1。</td>
194+
</tr>
195+
<tr>
196+
<td>direct-load.host</td>
197+
<td>否</td>
198+
<td></td>
199+
<td>String</td>
200+
<td>旁路导入使用的域名或 IP 地址,开启旁路导入时为必填项。</td>
201+
</tr>
202+
<tr>
203+
<td>direct-load.port</td>
204+
<td>否</td>
205+
<td>2882</td>
206+
<td>Integer</td>
207+
<td>旁路导入使用的 RPC 端口,开启旁路导入时为必填项。</td>
208+
</tr>
209+
<tr>
210+
<td>direct-load.parallel</td>
211+
<td>否</td>
212+
<td>8</td>
213+
<td>Integer</td>
214+
<td>旁路导入任务的并发度。</td>
215+
</tr>
216+
<tr>
217+
<td>direct-load.max-error-rows</td>
218+
<td>否</td>
219+
<td>0</td>
220+
<td>Long</td>
221+
<td>旁路导入任务最大可容忍的错误行数目。</td>
222+
</tr>
223+
<tr>
224+
<td>direct-load.dup-action</td>
225+
<td>否</td>
226+
<td>REPLACE</td>
227+
<td>STRING</td>
228+
<td>旁路导入任务中主键重复时的处理策略。可以是 'STOP_ON_DUP'(本次导入失败),'REPLACE'(替换)或 'IGNORE'(忽略)。</td>
229+
</tr>
230+
<tr>
231+
<td>direct-load.timeout</td>
232+
<td>否</td>
233+
<td>7d</td>
234+
<td>Duration</td>
235+
<td>旁路导入任务的超时时间。</td>
236+
</tr>
237+
<tr>
238+
<td>direct-load.heartbeat-timeout</td>
239+
<td>否</td>
240+
<td>30s</td>
241+
<td>Duration</td>
242+
<td>旁路导入任务客户端的心跳超时时间。</td>
243+
</tr>
244+
</tbody>
245+
</table>
246+
</div>
247+
248+
## 使用说明
249+
250+
* 暂仅支持OceanBase的MySQL租户
251+
252+
* at-least-once语义保证,暂不支持 exactly-once
253+
254+
* 对于自动建表
255+
* 没有分区键
256+
257+
* 对于表结构变更同步
258+
* 暂只支持新增列、重命名列
259+
* 新增列只能添加到最后一列
260+
261+
* 对于数据同步,pipeline 连接器使用 [OceanBase Sink 连接器](https://github.com/oceanbase/flink-connector-oceanbase)
262+
将数据写入 OceanBase,具体可以参考 [Sink 文档](https://github.com/oceanbase/flink-connector-oceanbase/blob/main/docs/sink/flink-connector-oceanbase.md)。
263+
264+
## 数据类型映射
265+
<div class="wy-table-responsive">
266+
<table class="colwidths-auto docutils">
267+
<thead>
268+
<tr>
269+
<th class="text-left">CDC type</th>
270+
<th class="text-left">OceanBase type under MySQL tenant</th>
271+
<th class="text-left" style="width:60%;">NOTE</th>
272+
</tr>
273+
</thead>
274+
<tbody>
275+
<tr>
276+
<td>TINYINT</td>
277+
<td>TINYINT</td>
278+
<td></td>
279+
</tr>
280+
<tr>
281+
<td>SMALLINT</td>
282+
<td>SMALLINT</td>
283+
<td></td>
284+
</tr>
285+
<tr>
286+
<td>INT</td>
287+
<td>INT</td>
288+
<td></td>
289+
</tr>
290+
<tr>
291+
<td>BIGINT</td>
292+
<td>BIGINT</td>
293+
<td></td>
294+
</tr>
295+
<tr>
296+
<td>FLOAT</td>
297+
<td>FLOAT</td>
298+
<td></td>
299+
</tr>
300+
<tr>
301+
<td>DOUBLE</td>
302+
<td>DOUBLE</td>
303+
<td></td>
304+
</tr>
305+
<tr>
306+
<td>DECIMAL(p, s)</td>
307+
<td>DECIMAL(p, s)</td>
308+
<td></td>
309+
</tr>
310+
<tr>
311+
<td>BOOLEAN</td>
312+
<td>BOOLEAN</td>
313+
<td></td>
314+
</tr>
315+
<tr>
316+
<td>DATE</td>
317+
<td>DATE</td>
318+
<td></td>
319+
</tr>
320+
<tr>
321+
<td>TIMESTAMP</td>
322+
<td>TIMESTAMP</td>
323+
<td></td>
324+
</tr>
325+
<tr>
326+
<td>TIMESTAMP_LTZ</td>
327+
<td>TIMESTAMP</td>
328+
<td></td>
329+
</tr>
330+
<tr>
331+
<td>CHAR(n) where n <= 256</td>
332+
<td>CHAR(n)</td>
333+
<td></td>
334+
</tr>
335+
<tr>
336+
<td>CHAR(n) where n > 256</td>
337+
<td>VARCHAR(n)</td>
338+
<td></td>
339+
</tr>
340+
<tr>
341+
<td>VARCHAR(n)</td>
342+
<td>VARCHAR(n)</td>
343+
<td></td>
344+
</tr>
345+
</tbody>
346+
</table>
347+
</div>
348+
349+
{{< top >}}

docs/content.zh/docs/connectors/pipeline-connectors/overview.md

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器
3838
| [MySQL]({{< ref "docs/connectors/pipeline-connectors/mysql" >}}) | Source | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 |
3939
| [Paimon]({{< ref "docs/connectors/pipeline-connectors/paimon" >}}) | Sink | <li> [Paimon](https://paimon.apache.org/): 0.6, 0.7, 0.8 |
4040
| [StarRocks]({{< ref "docs/connectors/pipeline-connectors/starrocks" >}}) | Sink | <li> [StarRocks](https://www.starrocks.io/): 2.x, 3.x |
41+
| [OceanBase]({{< ref "docs/connectors/pipeline-connectors/oceanbase" >}}) | Sink | <li> [OceanBase](https://www.oceanbase.com/): 3.x, 4.x |
4142

4243
## Develop Your Own Connector
4344

0 commit comments

Comments
 (0)