Skip to content

Commit 5ee1f9b

Browse files
committed
Support merge create table event in source operator
1 parent 15522a6 commit 5ee1f9b

File tree

17 files changed

+1297
-44
lines changed

17 files changed

+1297
-44
lines changed

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public SchemaChangeEventType getType() {
7979
}
8080

8181
@Override
82-
public SchemaChangeEvent copy(TableId newTableId) {
82+
public CreateTableEvent copy(TableId newTableId) {
8383
return new CreateTableEvent(newTableId, schema);
8484
}
8585
}

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.cdc.common.route.RouteRule;
2324

25+
import java.util.List;
2426
import java.util.Set;
2527

2628
/**
@@ -72,6 +74,9 @@ interface Context {
7274
/** Returns the configuration of current pipeline. */
7375
Configuration getPipelineConfiguration();
7476

77+
/** Returns the configuration of current routers. */
78+
List<RouteRule> getRouteRules();
79+
7580
/**
7681
* Returns the class loader of the current session.
7782
*

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

+20
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.cdc.common.route.RouteRule;
2324
import org.apache.flink.cdc.common.utils.Preconditions;
2425
import org.apache.flink.table.api.ValidationException;
2526

27+
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.HashSet;
2830
import java.util.List;
@@ -151,13 +153,26 @@ public static class DefaultContext implements Factory.Context {
151153
private final Configuration factoryConfiguration;
152154
private final ClassLoader classLoader;
153155
private final Configuration pipelineConfiguration;
156+
private final List<RouteRule> routeRules;
154157

155158
public DefaultContext(
156159
Configuration factoryConfiguration,
157160
Configuration pipelineConfiguration,
158161
ClassLoader classLoader) {
159162
this.factoryConfiguration = factoryConfiguration;
160163
this.pipelineConfiguration = pipelineConfiguration;
164+
this.routeRules = new ArrayList<>();
165+
this.classLoader = classLoader;
166+
}
167+
168+
public DefaultContext(
169+
Configuration factoryConfiguration,
170+
Configuration pipelineConfiguration,
171+
List<RouteRule> routeRules,
172+
ClassLoader classLoader) {
173+
this.factoryConfiguration = factoryConfiguration;
174+
this.pipelineConfiguration = pipelineConfiguration;
175+
this.routeRules = routeRules;
161176
this.classLoader = classLoader;
162177
}
163178

@@ -171,6 +186,11 @@ public Configuration getPipelineConfiguration() {
171186
return pipelineConfiguration;
172187
}
173188

189+
@Override
190+
public List<RouteRule> getRouteRules() {
191+
return routeRules;
192+
}
193+
174194
@Override
175195
public ClassLoader getClassLoader() {
176196
return classLoader;

Diff for: flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
136136
OperatorIDGenerator schemaOperatorIDGenerator =
137137
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
138138
DataSource dataSource =
139-
sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
139+
sourceTranslator.createDataSource(
140+
pipelineDef.getSource(), pipelineDef.getRoute(), pipelineDefConfig, env);
140141
DataSink dataSink =
141142
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
142143

Diff for: flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import org.apache.flink.cdc.common.event.Event;
2424
import org.apache.flink.cdc.common.factories.DataSourceFactory;
2525
import org.apache.flink.cdc.common.factories.FactoryHelper;
26+
import org.apache.flink.cdc.common.route.RouteRule;
2627
import org.apache.flink.cdc.common.source.DataSource;
2728
import org.apache.flink.cdc.common.source.EventSourceProvider;
2829
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
2930
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
31+
import org.apache.flink.cdc.composer.definition.RouteDef;
3032
import org.apache.flink.cdc.composer.definition.SourceDef;
3133
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
3234
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
@@ -35,6 +37,9 @@
3537
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3638
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3739

40+
import java.util.List;
41+
import java.util.stream.Collectors;
42+
3843
/** Translator used to build {@link DataSource} which will generate a {@link DataStream}. */
3944
@Internal
4045
public class DataSourceTranslator {
@@ -76,14 +81,37 @@ public DataStreamSource<Event> translate(
7681
}
7782

7883
public DataSource createDataSource(
79-
SourceDef sourceDef, Configuration pipelineConfig, StreamExecutionEnvironment env) {
84+
SourceDef sourceDef,
85+
List<RouteDef> routes,
86+
Configuration pipelineConfig,
87+
StreamExecutionEnvironment env) {
8088
// Search the data source factory
8189
DataSourceFactory sourceFactory =
8290
FactoryDiscoveryUtils.getFactoryByIdentifier(
8391
sourceDef.getType(), DataSourceFactory.class);
8492
// Add source JAR to environment
8593
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
8694
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
95+
96+
if (routes != null && !routes.isEmpty()) {
97+
List<RouteRule> routeRules =
98+
routes.stream()
99+
.map(
100+
routeDef ->
101+
new RouteRule(
102+
routeDef.getSourceTable(),
103+
routeDef.getSinkTable(),
104+
routeDef.getReplaceSymbol().isPresent()
105+
? routeDef.getReplaceSymbol().get()
106+
: null))
107+
.collect(Collectors.toList());
108+
return sourceFactory.createDataSource(
109+
new FactoryHelper.DefaultContext(
110+
sourceDef.getConfig(),
111+
pipelineConfig,
112+
routeRules,
113+
Thread.currentThread().getContextClassLoader()));
114+
}
87115
return sourceFactory.createDataSource(
88116
new FactoryHelper.DefaultContext(
89117
sourceDef.getConfig(),

0 commit comments

Comments
 (0)