Skip to content

Commit 15522a6

Browse files
committed
Support regular batch mode
1 parent 891ffe9 commit 15522a6

File tree

11 files changed

+968
-15
lines changed

11 files changed

+968
-15
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
166166
pipelineDef.getModels(),
167167
dataSource.supportedMetadataColumns());
168168

169-
if (isParallelMetadataSource || isBatchMode) {
169+
if (isParallelMetadataSource) {
170170
// Translate a distributed topology for sources with distributed tables
171171
// PostTransform -> Partitioning
172172
DataStream<PartitioningEvent> partitionedStream =
@@ -195,6 +195,7 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
195195
schemaOperatorTranslator.translateRegular(
196196
stream,
197197
parallelism,
198+
isBatchMode,
198199
dataSink.getMetadataApplier()
199200
.setAcceptedSchemaEvolutionTypes(
200201
pipelineDef
@@ -208,13 +209,18 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
208209
stream,
209210
parallelism,
210211
parallelism,
212+
isBatchMode,
211213
schemaOperatorIDGenerator.generate(),
212214
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
213215
}
214216

215217
// Schema Operator -> Sink -> X
216218
sinkTranslator.translate(
217-
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
219+
pipelineDef.getSink(),
220+
stream,
221+
dataSink,
222+
isBatchMode,
223+
schemaOperatorIDGenerator.generate());
218224
}
219225

220226
private void addFrameworkJars() {

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.cdc.composer.definition.SinkDef;
3434
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
3535
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
36+
import org.apache.flink.cdc.runtime.operators.sink.DataBatchSinkFunctionOperator;
3637
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
3738
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
3839
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -46,6 +47,7 @@
4647
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4748
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
4849
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
50+
import org.apache.flink.streaming.api.operators.StreamSink;
4951
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
5052
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
5153

@@ -82,20 +84,29 @@ public void translate(
8284
DataStream<Event> input,
8385
DataSink dataSink,
8486
OperatorID schemaOperatorID) {
87+
translate(sinkDef, input, dataSink, false, schemaOperatorID);
88+
}
89+
90+
public void translate(
91+
SinkDef sinkDef,
92+
DataStream<Event> input,
93+
DataSink dataSink,
94+
boolean isBatchMode,
95+
OperatorID schemaOperatorID) {
8596
// Get sink provider
8697
EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
8798
String sinkName = generateSinkName(sinkDef);
8899
if (eventSinkProvider instanceof FlinkSinkProvider) {
89100
// Sink V2
90101
FlinkSinkProvider sinkProvider = (FlinkSinkProvider) eventSinkProvider;
91102
Sink<Event> sink = sinkProvider.getSink();
92-
sinkTo(input, sink, sinkName, schemaOperatorID);
103+
sinkTo(input, sink, sinkName, isBatchMode, schemaOperatorID);
93104
} else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) {
94105
// SinkFunction
95106
FlinkSinkFunctionProvider sinkFunctionProvider =
96107
(FlinkSinkFunctionProvider) eventSinkProvider;
97108
SinkFunction<Event> sinkFunction = sinkFunctionProvider.getSinkFunction();
98-
sinkTo(input, sinkFunction, sinkName, schemaOperatorID);
109+
sinkTo(input, sinkFunction, sinkName, isBatchMode, schemaOperatorID);
99110
}
100111
}
101112

@@ -104,6 +115,7 @@ void sinkTo(
104115
DataStream<Event> input,
105116
Sink<Event> sink,
106117
String sinkName,
118+
boolean isBatchMode,
107119
OperatorID schemaOperatorID) {
108120
DataStream<Event> stream = input;
109121
// Pre-write topology
@@ -112,22 +124,27 @@ void sinkTo(
112124
}
113125

114126
if (sink instanceof TwoPhaseCommittingSink) {
115-
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
127+
addCommittingTopology(sink, stream, sinkName, isBatchMode, schemaOperatorID);
116128
} else {
117129
stream.transform(
118130
SINK_WRITER_PREFIX + sinkName,
119131
CommittableMessageTypeInfo.noOutput(),
120-
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
132+
new DataSinkWriterOperatorFactory<>(sink, isBatchMode, schemaOperatorID));
121133
}
122134
}
123135

124136
private void sinkTo(
125137
DataStream<Event> input,
126138
SinkFunction<Event> sinkFunction,
127139
String sinkName,
140+
boolean isBatchMode,
128141
OperatorID schemaOperatorID) {
129-
DataSinkFunctionOperator sinkOperator =
130-
new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
142+
StreamSink<Event> sinkOperator;
143+
if (isBatchMode) {
144+
sinkOperator = new DataBatchSinkFunctionOperator(sinkFunction, schemaOperatorID);
145+
} else {
146+
sinkOperator = new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
147+
}
131148
final StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment();
132149
PhysicalTransformation<Event> transformation =
133150
new LegacySinkTransformation<>(
@@ -143,6 +160,7 @@ private <CommT> void addCommittingTopology(
143160
Sink<Event> sink,
144161
DataStream<Event> inputStream,
145162
String sinkName,
163+
boolean isBatchMode,
146164
OperatorID schemaOperatorID) {
147165
TypeInformation<CommittableMessage<CommT>> typeInformation =
148166
CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink));
@@ -158,8 +176,7 @@ private <CommT> void addCommittingTopology(
158176
((WithPreCommitTopology<Event, CommT>) sink).addPreCommitTopology(written);
159177
}
160178

161-
// TODO: Hard coding stream mode and checkpoint
162-
boolean isBatchMode = false;
179+
// TODO: Hard coding checkpoint
163180
boolean isCheckpointingEnabled = true;
164181
DataStream<CommittableMessage<CommT>> committed =
165182
preCommitted.transform(

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
2727
import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector;
2828
import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor;
29+
import org.apache.flink.cdc.runtime.partitioning.RegularPrePartitionBatchOperator;
2930
import org.apache.flink.cdc.runtime.partitioning.RegularPrePartitionOperator;
3031
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
3132
import org.apache.flink.cdc.runtime.typeutils.PartitioningEventTypeInfo;
@@ -46,11 +47,34 @@ public DataStream<Event> translateRegular(
4647
int downstreamParallelism,
4748
OperatorID schemaOperatorID,
4849
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
50+
return translateRegular(
51+
input,
52+
upstreamParallelism,
53+
downstreamParallelism,
54+
false,
55+
schemaOperatorID,
56+
hashFunctionProvider);
57+
}
58+
59+
public DataStream<Event> translateRegular(
60+
DataStream<Event> input,
61+
int upstreamParallelism,
62+
int downstreamParallelism,
63+
boolean isBatchMode,
64+
OperatorID schemaOperatorID,
65+
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
4966
return input.transform(
50-
"PrePartition",
67+
isBatchMode ? "BatchPrePartition" : "PrePartition",
5168
new PartitioningEventTypeInfo(),
52-
new RegularPrePartitionOperator(
53-
schemaOperatorID, downstreamParallelism, hashFunctionProvider))
69+
isBatchMode
70+
? new RegularPrePartitionBatchOperator(
71+
schemaOperatorID,
72+
downstreamParallelism,
73+
hashFunctionProvider)
74+
: new RegularPrePartitionOperator(
75+
schemaOperatorID,
76+
downstreamParallelism,
77+
hashFunctionProvider))
5478
.setParallelism(upstreamParallelism)
5579
.partitionCustom(new EventPartitioner(), new PartitioningEventKeySelector())
5680
.map(new PostPartitionProcessor(), new EventTypeInfo())

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.common.sink.MetadataApplier;
2525
import org.apache.flink.cdc.common.utils.Preconditions;
2626
import org.apache.flink.cdc.composer.definition.RouteDef;
27+
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaBatchOperator;
2728
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
2829
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperatorFactory;
2930
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
@@ -59,8 +60,26 @@ public DataStream<Event> translateRegular(
5960
int parallelism,
6061
MetadataApplier metadataApplier,
6162
List<RouteDef> routes) {
62-
return addRegularSchemaOperator(
63-
input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone);
63+
return translateRegular(input, parallelism, false, metadataApplier, routes);
64+
}
65+
66+
public DataStream<Event> translateRegular(
67+
DataStream<Event> input,
68+
int parallelism,
69+
boolean isBatchMode,
70+
MetadataApplier metadataApplier,
71+
List<RouteDef> routes) {
72+
73+
return isBatchMode
74+
? addRegularSchemaBatchOperator(
75+
input, parallelism, metadataApplier, routes, timezone)
76+
: addRegularSchemaOperator(
77+
input,
78+
parallelism,
79+
metadataApplier,
80+
routes,
81+
schemaChangeBehavior,
82+
timezone);
6483
}
6584

6685
public DataStream<Event> translateDistributed(
@@ -105,6 +124,29 @@ private DataStream<Event> addRegularSchemaOperator(
105124
return stream;
106125
}
107126

127+
private DataStream<Event> addRegularSchemaBatchOperator(
128+
DataStream<Event> input,
129+
int parallelism,
130+
MetadataApplier metadataApplier,
131+
List<RouteDef> routes,
132+
String timezone) {
133+
List<RouteRule> routingRules = new ArrayList<>();
134+
for (RouteDef route : routes) {
135+
routingRules.add(
136+
new RouteRule(
137+
route.getSourceTable(),
138+
route.getSinkTable(),
139+
route.getReplaceSymbol().orElse(null)));
140+
}
141+
SingleOutputStreamOperator<Event> stream =
142+
input.transform(
143+
"SchemaBatchOperator",
144+
new EventTypeInfo(),
145+
new SchemaBatchOperator(routingRules, metadataApplier, timezone));
146+
stream.uid(schemaOperatorUid).setParallelism(parallelism);
147+
return stream;
148+
}
149+
108150
private DataStream<Event> addDistributedSchemaOperator(
109151
DataStream<PartitioningEvent> input,
110152
int parallelism,

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ void testPreWriteWithoutCommitSink() {
5353
inputStream,
5454
mockPreWriteWithoutCommitSink,
5555
"testPreWriteWithoutCommitSink",
56+
false,
5657
new OperatorID());
5758

5859
// Check if the `addPreWriteTopology` is called, and the uid is set when the transformation

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public SchemaManager() {
7373
behavior = SchemaChangeBehavior.EVOLVE;
7474
}
7575

76+
public SchemaManager(SchemaChangeBehavior schemaChangeBehavior) {
77+
evolvedSchemas = new ConcurrentHashMap<>();
78+
originalSchemas = new ConcurrentHashMap<>();
79+
behavior = schemaChangeBehavior;
80+
}
81+
7682
public SchemaManager(
7783
Map<TableId, SortedMap<Integer, Schema>> originalSchemas,
7884
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas,

0 commit comments

Comments
 (0)