Skip to content

Commit 2dabfc0

Browse files
authored
[FLINK-35730][cdc-cli] PipelineDefinitionParser supports parsing pipeline def in text format
This closes #3444.
1 parent af02ce1 commit 2dabfc0

File tree

3 files changed

+72
-6
lines changed

3 files changed

+72
-6
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java

+6
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,10 @@ public interface PipelineDefinitionParser {
3030
* the {@link PipelineDef}.
3131
*/
3232
PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception;
33+
34+
/**
35+
* Parse the specified pipeline definition string, merge global configurations, then generate
36+
* the {@link PipelineDef}.
37+
*/
38+
PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) throws Exception;
3339
}

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -82,39 +82,48 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
8282
@Override
8383
public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig)
8484
throws Exception {
85-
JsonNode root = mapper.readTree(pipelineDefPath.toFile());
85+
return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig);
86+
}
87+
88+
@Override
89+
public PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig)
90+
throws Exception {
91+
return parse(mapper.readTree(pipelineDefText), globalPipelineConfig);
92+
}
8693

94+
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
95+
throws Exception {
8796
// Source is required
8897
SourceDef sourceDef =
8998
toSourceDef(
9099
checkNotNull(
91-
root.get(SOURCE_KEY),
100+
pipelineDefJsonNode.get(SOURCE_KEY),
92101
"Missing required field \"%s\" in pipeline definition",
93102
SOURCE_KEY));
94103

95104
// Sink is required
96105
SinkDef sinkDef =
97106
toSinkDef(
98107
checkNotNull(
99-
root.get(SINK_KEY),
108+
pipelineDefJsonNode.get(SINK_KEY),
100109
"Missing required field \"%s\" in pipeline definition",
101110
SINK_KEY));
102111

103112
// Transforms are optional
104113
List<TransformDef> transformDefs = new ArrayList<>();
105-
Optional.ofNullable(root.get(TRANSFORM_KEY))
114+
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
106115
.ifPresent(
107116
node ->
108117
node.forEach(
109118
transform -> transformDefs.add(toTransformDef(transform))));
110119

111120
// Routes are optional
112121
List<RouteDef> routeDefs = new ArrayList<>();
113-
Optional.ofNullable(root.get(ROUTE_KEY))
122+
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
114123
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
115124

116125
// Pipeline configs are optional
117-
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));
126+
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
118127

119128
// Merge user config into global config
120129
Configuration pipelineConfig = new Configuration();

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

+51
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,57 @@ void testRouteWithReplacementSymbol() throws Exception {
238238
.put("schema-operator.rpc-timeout", "1 h")
239239
.build()));
240240

241+
@Test
242+
void testParsingFullDefinitionFromString() throws Exception {
243+
String pipelineDefText =
244+
"source:\n"
245+
+ " type: mysql\n"
246+
+ " name: source-database\n"
247+
+ " host: localhost\n"
248+
+ " port: 3306\n"
249+
+ " username: admin\n"
250+
+ " password: pass\n"
251+
+ " tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*\n"
252+
+ " chunk-column: app_order_.*:id,web_order:product_id\n"
253+
+ " capture-new-tables: true\n"
254+
+ "\n"
255+
+ "sink:\n"
256+
+ " type: kafka\n"
257+
+ " name: sink-queue\n"
258+
+ " bootstrap-servers: localhost:9092\n"
259+
+ " auto-create-table: true\n"
260+
+ "\n"
261+
+ "route:\n"
262+
+ " - source-table: mydb.default.app_order_.*\n"
263+
+ " sink-table: odsdb.default.app_order\n"
264+
+ " description: sync all sharding tables to one\n"
265+
+ " - source-table: mydb.default.web_order\n"
266+
+ " sink-table: odsdb.default.ods_web_order\n"
267+
+ " description: sync table to with given prefix ods_\n"
268+
+ "\n"
269+
+ "transform:\n"
270+
+ " - source-table: mydb.app_order_.*\n"
271+
+ " projection: id, order_id, TO_UPPER(product_name)\n"
272+
+ " filter: id > 10 AND order_id > 100\n"
273+
+ " primary-keys: id\n"
274+
+ " partition-keys: product_name\n"
275+
+ " table-options: comment=app order\n"
276+
+ " description: project fields from source table\n"
277+
+ " - source-table: mydb.web_order_.*\n"
278+
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
279+
+ " filter: uniq_id > 10\n"
280+
+ " description: add new uniq_id for each row\n"
281+
+ "\n"
282+
+ "pipeline:\n"
283+
+ " name: source-database-sync-pipe\n"
284+
+ " parallelism: 4\n"
285+
+ " schema.change.behavior: evolve\n"
286+
+ " schema-operator.rpc-timeout: 1 h";
287+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
288+
PipelineDef pipelineDef = parser.parse(pipelineDefText, new Configuration());
289+
assertThat(pipelineDef).isEqualTo(fullDef);
290+
}
291+
241292
private final PipelineDef fullDefWithGlobalConf =
242293
new PipelineDef(
243294
new SourceDef(

0 commit comments

Comments
 (0)