Skip to content

Commit

Permalink
[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multi…
Browse files Browse the repository at this point in the history
…ple data sources
  • Loading branch information
linjc13 committed Jan 17, 2025
1 parent 92081df commit 994d17a
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 102 deletions.
42 changes: 40 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
</table>
</div>

## 示例
## 单数据源示例

MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

```yaml
source:
Expand All @@ -77,6 +77,44 @@ pipeline:
parallelism: 4
```
## 多数据源示例
多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai

- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
## 连接器配置项
<div class="highlight">
Expand Down
42 changes: 40 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
</table>
</div>

## Example
## single data source Example

An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:

```yaml
source:
Expand All @@ -78,6 +78,44 @@ pipeline:
parallelism: 4
```
## multiple data source Example
An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai

- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
## Connector Options
<div class="highlight">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -55,6 +56,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

// Parent node keys
private static final String SOURCE_KEY = "source";
private static final String MULTIPLE_SOURCE_KEY = "sources";
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
Expand All @@ -63,6 +65,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

// Source / sink keys
private static final String TYPE_KEY = "type";
private static final String SOURCES = "sources";
private static final String NAME_KEY = "name";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
Expand Down Expand Up @@ -135,13 +138,20 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
List<SourceDef> sourceDefs = new ArrayList<>();
SourceDef sourceDef = null;
if (multipleSourceNode != null) {
Iterator<JsonNode> it = multipleSourceNode.elements();
while (it.hasNext()) {
JsonNode sourceNode = it.next();
getSourceDefs(sourceNode, sourceDefs);
}
} else {
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
// Source is required
sourceDef = getSourceDefs(sourceNode, sourceDefs);
}

// Sink is required
SinkDef sinkDef =
Expand Down Expand Up @@ -171,7 +181,25 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
sourceDefs,
sourceDef,
sinkDef,
routeDefs,
transformDefs,
udfDefs,
modelDefs,
pipelineConfig);
}

private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root,
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
sourceDefs.add(sourceDef);
return sourceDef;
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down
Loading

0 comments on commit 994d17a

Please sign in to comment.