Skip to content

Commit 3a80802

Browse files
committed
[FLINK-34548][API] Supports sink-v2 Sink
1 parent de4bac7 commit 3a80802

File tree

16 files changed

+707
-6
lines changed

16 files changed

+707
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Sink interface for DataStream api v2. Note that this is a temporary approach, will be removed in
25+
* the future.
26+
*/
27+
@Experimental
28+
public interface Sink<T> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/** Utils to convert the sink-v2 based sink to a DataStream v2 Source. */
24+
@Experimental
25+
public class SinkUtils {
26+
public static <T> Sink<T> wrapSink(org.apache.flink.api.connector.sink2.Sink<T> sink) {
27+
return new WrappedSink<>(sink);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** A simple {@link Sink} implementation that wrap a sink-v2 based sink. */
24+
@Internal
25+
public class WrappedSink<T> implements Sink<T> {
26+
org.apache.flink.api.connector.sink2.Sink<T> wrappedSink;
27+
28+
public WrappedSink(org.apache.flink.api.connector.sink2.Sink<T> wrappedSink) {
29+
this.wrappedSink = wrappedSink;
30+
}
31+
32+
public org.apache.flink.api.connector.sink2.Sink<T> getWrappedSink() {
33+
return wrappedSink;
34+
}
35+
}

Diff for: flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/stream/GlobalStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction;
@@ -78,7 +79,7 @@ <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
7879
*/
7980
BroadcastStream<T> broadcast();
8081

81-
// TODO add toSink method.
82+
void toSink(Sink<T> sink);
8283

8384
/**
8485
* This class represents a combination of two {@link GlobalStream}. It will be used as the

Diff for: flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/stream/KeyedPartitionStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -168,7 +169,7 @@ <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
168169
*/
169170
BroadcastStream<T> broadcast();
170171

171-
// TODO add toSink method.
172+
void toSink(Sink<T> sink);
172173

173174
/**
174175
* This class represents a combination of two {@link KeyedPartitionStream}. It will be used as

Diff for: flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/stream/NonKeyedPartitionStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -100,7 +101,7 @@ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
100101
*/
101102
BroadcastStream<T> broadcast();
102103

103-
// TODO add toSink method.
104+
void toSink(Sink<T> sink);
104105

105106
/**
106107
* This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java

+21
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.flink.streaming.api.graph.StreamGraph;
4949
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
5050
import org.apache.flink.streaming.api.transformations.SourceTransformation;
51+
import org.apache.flink.streaming.runtime.translators.ProcessFunctionSinkTransformationTranslator;
5152
import org.apache.flink.util.ExceptionUtils;
5253
import org.apache.flink.util.FlinkException;
5354
import org.apache.flink.util.Preconditions;
@@ -76,6 +77,18 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
7677

7778
private final PipelineExecutorServiceLoader executorServiceLoader;
7879

80+
static {
81+
try {
82+
// All transformation translator must be put to a map in StreamGraphGenerator, but
83+
// streaming-java is not depend on process-function module, using reflect to handle
84+
// this.
85+
ProcessFunctionSinkTransformationTranslator.registerSinkTransformationTranslator();
86+
} catch (Exception e) {
87+
throw new RuntimeException(
88+
"Can not register process function transformation translator.");
89+
}
90+
}
91+
7992
/**
8093
* The environment of the context (local by default, cluster if invoked through command line).
8194
*/
@@ -180,6 +193,14 @@ public List<Transformation<?>> getTransformations() {
180193
return transformations;
181194
}
182195

196+
public void setParallelism(int parallelism) {
197+
executionConfig.setParallelism(parallelism);
198+
}
199+
200+
public CheckpointConfig getCheckpointCfg() {
201+
return checkpointCfg;
202+
}
203+
183204
// -----------------------------------------------
184205
// Internal Methods
185206
// -----------------------------------------------

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/GlobalStreamImpl.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,7 @@
3839
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
3940
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
4041
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
42+
import org.apache.flink.streaming.api.transformations.ProcessFunctionSinkTransformation;
4143
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
4244
import org.apache.flink.util.OutputTag;
4345

@@ -98,7 +100,14 @@ public <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
98100
return new GlobalStreamImpl<>(environment, outTransformation);
99101
}
100102

101-
// TODO add toSink method.
103+
@Override
104+
public void toSink(Sink<T> sink) {
105+
ProcessFunctionSinkTransformation<T, T> sinkTransformation =
106+
StreamUtils.addSinkOperator(this, sink, getType());
107+
// Operator parallelism should always be 1 for global stream.
108+
// parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
109+
sinkTransformation.setParallelism(1, true);
110+
}
102111

103112
// ---------------------
104113
// Partitioning

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/KeyedPartitionStreamImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -322,7 +323,10 @@ public KeySelector<V, K> getKeySelector() {
322323
return keySelector;
323324
}
324325

325-
// TODO add toSink method.
326+
@Override
327+
public void toSink(Sink<V> sink) {
328+
StreamUtils.addSinkOperator(this, sink, getType());
329+
}
326330

327331
// ---------------------
328332
// Partitioning

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/NonKeyedPartitionStreamImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.v2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -132,7 +133,10 @@ public <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
132133
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
133134
}
134135

135-
// TODO add toSink method.
136+
@Override
137+
public void toSink(Sink<T> sink) {
138+
StreamUtils.addSinkOperator(this, sink, getType());
139+
}
136140

137141
// ---------------------
138142
// Partitioning

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/utils/StreamUtils.java

+57
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.flink.process.impl.utils;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.v2.Sink;
23+
import org.apache.flink.api.connector.v2.WrappedSink;
2224
import org.apache.flink.api.java.Utils;
2325
import org.apache.flink.api.java.functions.KeySelector;
2426
import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,11 +31,14 @@
2931
import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction;
3032
import org.apache.flink.process.impl.stream.DataStream;
3133
import org.apache.flink.process.impl.stream.KeyedPartitionStreamImpl;
34+
import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
3235
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3336
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
3437
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
38+
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
3539
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
3640
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
41+
import org.apache.flink.streaming.api.transformations.ProcessFunctionSinkTransformation;
3742
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
3843

3944
/** Utils to handle things like extract type information for DataStream. */
@@ -225,4 +230,56 @@ public static <IN1, IN2, OUT> TwoInputTransformation<IN1, IN2, OUT> getTwoInputT
225230

226231
return transform;
227232
}
233+
234+
/** Construct and return a new DataStream with one input operator. */
235+
public static <T, R> DataStream<R> transformOneInputOperator(
236+
String operatorName,
237+
DataStream<T> inputStream,
238+
TypeInformation<R> outTypeInfo,
239+
StreamOperatorFactory<R> operatorFactory) {
240+
// read the output type of the input Transform to coax out errors about MissingTypeInfo
241+
inputStream.getTransformation().getOutputType();
242+
243+
OneInputTransformation<T, R> resultTransform =
244+
new OneInputTransformation<>(
245+
inputStream.getTransformation(),
246+
operatorName,
247+
operatorFactory,
248+
outTypeInfo,
249+
inputStream.getEnvironment().getParallelism(),
250+
false);
251+
252+
NonKeyedPartitionStreamImpl<R> returnStream =
253+
new NonKeyedPartitionStreamImpl<>(inputStream.getEnvironment(), resultTransform);
254+
255+
inputStream.getEnvironment().addOperator(resultTransform);
256+
257+
return returnStream;
258+
}
259+
260+
/** Add sink operator to the input stream. */
261+
public static <T> ProcessFunctionSinkTransformation<T, T> addSinkOperator(
262+
DataStream<T> inputStream, Sink<T> sink, TypeInformation<T> typeInformation) {
263+
// read the output type of the input Transform to coax out errors about MissingTypeInfo
264+
inputStream.getTransformation().getOutputType();
265+
266+
if (!(sink instanceof WrappedSink)) {
267+
throw new UnsupportedOperationException(
268+
"Unsupported type of sink, please use SinkUtils to wrap a sink-v2 sink first.");
269+
}
270+
271+
org.apache.flink.api.connector.sink2.Sink<T> innerSink =
272+
((WrappedSink<T>) sink).getWrappedSink();
273+
274+
ProcessFunctionSinkTransformation<T, T> sinkTransformation =
275+
new ProcessFunctionSinkTransformation<>(
276+
inputStream,
277+
innerSink,
278+
typeInformation,
279+
"Sink",
280+
inputStream.getEnvironment().getParallelism(),
281+
false);
282+
inputStream.getEnvironment().addOperator(sinkTransformation);
283+
return sinkTransformation;
284+
}
228285
}

0 commit comments

Comments
 (0)