Skip to content

Commit

Permalink
[FLINK-35730][cdc-cli] PipelineDefinitionParser supports parsing pipe…
Browse files Browse the repository at this point in the history
…line def in text format

This closes apache#3444.
  • Loading branch information
aiwenmo authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent a01f362 commit aae2048
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public interface PipelineDefinitionParser {
* the {@link PipelineDef}.
*/
PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception;

/**
* Parse the specified pipeline definition string, merge global configurations, then generate
* the {@link PipelineDef}.
*/
PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,39 +82,48 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
@Override
public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig)
throws Exception {
JsonNode root = mapper.readTree(pipelineDefPath.toFile());
return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig);
}

@Override
public PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig)
throws Exception {
return parse(mapper.readTree(pipelineDefText), globalPipelineConfig);
}

private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root.get(SOURCE_KEY),
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));

// Sink is required
SinkDef sinkDef =
toSinkDef(
checkNotNull(
root.get(SINK_KEY),
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));

// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Optional.ofNullable(root.get(TRANSFORM_KEY))
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
.ifPresent(
node ->
node.forEach(
transform -> transformDefs.add(toTransformDef(transform))));

// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
Optional.ofNullable(root.get(ROUTE_KEY))
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,57 @@ void testRouteWithReplacementSymbol() throws Exception {
.put("schema-operator.rpc-timeout", "1 h")
.build()));

@Test
void testParsingFullDefinitionFromString() throws Exception {
String pipelineDefText =
"source:\n"
+ " type: mysql\n"
+ " name: source-database\n"
+ " host: localhost\n"
+ " port: 3306\n"
+ " username: admin\n"
+ " password: pass\n"
+ " tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*\n"
+ " chunk-column: app_order_.*:id,web_order:product_id\n"
+ " capture-new-tables: true\n"
+ "\n"
+ "sink:\n"
+ " type: kafka\n"
+ " name: sink-queue\n"
+ " bootstrap-servers: localhost:9092\n"
+ " auto-create-table: true\n"
+ "\n"
+ "route:\n"
+ " - source-table: mydb.default.app_order_.*\n"
+ " sink-table: odsdb.default.app_order\n"
+ " description: sync all sharding tables to one\n"
+ " - source-table: mydb.default.web_order\n"
+ " sink-table: odsdb.default.ods_web_order\n"
+ " description: sync table to with given prefix ods_\n"
+ "\n"
+ "transform:\n"
+ " - source-table: mydb.app_order_.*\n"
+ " projection: id, order_id, TO_UPPER(product_name)\n"
+ " filter: id > 10 AND order_id > 100\n"
+ " primary-keys: id\n"
+ " partition-keys: product_name\n"
+ " table-options: comment=app order\n"
+ " description: project fields from source table\n"
+ " - source-table: mydb.web_order_.*\n"
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
+ " filter: uniq_id > 10\n"
+ " description: add new uniq_id for each row\n"
+ "\n"
+ "pipeline:\n"
+ " name: source-database-sync-pipe\n"
+ " parallelism: 4\n"
+ " schema.change.behavior: evolve\n"
+ " schema-operator.rpc-timeout: 1 h";
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(pipelineDefText, new Configuration());
assertThat(pipelineDef).isEqualTo(fullDef);
}

private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
new SourceDef(
Expand Down

0 comments on commit aae2048

Please sign in to comment.