Skip to content

Commit a7052d0

Browse files
committed
[FLINK-34548][API] Supports FLIP-27 Source
1 parent 36f48d3 commit a7052d0

File tree

7 files changed

+278
-1
lines changed

7 files changed

+278
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Source interface for DataStream api v2. Note that this is a temporary approach, will be removed
25+
* in the future.
26+
*/
27+
@Experimental
28+
public interface Source<T> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.util.Preconditions;
23+
24+
import java.util.Collection;
25+
26+
/** Utils to create the DataStream V2 supported {@link Source}. */
27+
@Experimental
28+
public final class DataStreamV2SourceUtils {
29+
/**
30+
* Wrap a FLIP-27 based source to a DataStream V2 supported source.
31+
*
32+
* @param source The FLIP-27 based source to wrap.
33+
* @return The DataStream V2 supported source.
34+
*/
35+
public static <T> Source<T> wrapSource(
36+
org.apache.flink.api.connector.source.Source<T, ?, ?> source) {
37+
return new WrappedSource<>(source);
38+
}
39+
40+
/**
41+
* Creates a source that contains the given elements.The type of the data stream is that of the
42+
* elements in the collection.
43+
*
44+
* @param data The collection of elements to create the source from.
45+
* @param <T> The generic type of the returned data stream.
46+
* @return The source representing the given collection
47+
*/
48+
public static <T> Source<T> fromData(Collection<T> data) {
49+
Preconditions.checkNotNull(data, "Collection must not be null");
50+
return new FromDataSource<>(data);
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import java.util.Collection;
24+
25+
/** DataStream v2 source that emit data from pre-provided data. */
26+
@Internal
27+
public class FromDataSource<T> implements Source<T> {
28+
private final Collection<T> data;
29+
30+
public FromDataSource(Collection<T> data) {
31+
this.data = data;
32+
}
33+
34+
public Collection<T> getData() {
35+
return data;
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** A simple {@link Source} implementation that wrap a FLIP-27 source. */
24+
@Internal
25+
public class WrappedSource<T> implements Source<T> {
26+
org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource;
27+
28+
public WrappedSource(org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource) {
29+
this.wrappedSource = wrappedSource;
30+
}
31+
32+
public org.apache.flink.api.connector.source.Source<T, ?, ?> getWrappedSource() {
33+
return wrappedSource;
34+
}
35+
}

Diff for: flink-datastream-api/src/main/java/org/apache/flink/datastream/api/ExecutionEnvironment.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import org.apache.flink.api.connector.dsv2.Source;
24+
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
2325

2426
/**
2527
* This is the context in which a program is executed.
@@ -49,5 +51,5 @@ static ExecutionEnvironment getInstance() throws ReflectiveOperationException {
4951
/** Set the execution mode for this environment. */
5052
ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode);
5153

52-
// TODO introduce method to add source
54+
<OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName);
5355
}

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java

+98
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,40 @@
2020

2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
24+
import org.apache.flink.api.common.functions.InvalidTypesException;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.connector.dsv2.FromDataSource;
27+
import org.apache.flink.api.connector.dsv2.Source;
28+
import org.apache.flink.api.connector.dsv2.WrappedSource;
2329
import org.apache.flink.api.dag.Transformation;
30+
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
31+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
32+
import org.apache.flink.api.java.typeutils.TypeExtractor;
2433
import org.apache.flink.configuration.Configuration;
2534
import org.apache.flink.configuration.DeploymentOptions;
2635
import org.apache.flink.configuration.ExecutionOptions;
2736
import org.apache.flink.configuration.ReadableConfig;
37+
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
38+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
2839
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
2940
import org.apache.flink.core.execution.JobClient;
3041
import org.apache.flink.core.execution.PipelineExecutor;
3142
import org.apache.flink.core.execution.PipelineExecutorFactory;
3243
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
3344
import org.apache.flink.datastream.api.ExecutionEnvironment;
45+
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
46+
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
3447
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3548
import org.apache.flink.streaming.api.graph.StreamGraph;
3649
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
50+
import org.apache.flink.streaming.api.transformations.SourceTransformation;
3751
import org.apache.flink.util.ExceptionUtils;
3852
import org.apache.flink.util.FlinkException;
53+
import org.apache.flink.util.Preconditions;
3954

4055
import java.util.ArrayList;
56+
import java.util.Collection;
4157
import java.util.List;
4258
import java.util.concurrent.CompletableFuture;
4359
import java.util.concurrent.ExecutionException;
@@ -132,6 +148,40 @@ protected static void resetContextEnvironment() {
132148
contextEnvironmentFactory = null;
133149
}
134150

151+
@Override
152+
public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName) {
153+
if (source instanceof WrappedSource) {
154+
org.apache.flink.api.connector.source.Source<OUT, ?, ?> innerSource =
155+
((WrappedSource<OUT>) source).getWrappedSource();
156+
final TypeInformation<OUT> resolvedTypeInfo =
157+
getSourceTypeInfo(innerSource, sourceName);
158+
159+
SourceTransformation<OUT, ?, ?> sourceTransformation =
160+
new SourceTransformation<>(
161+
sourceName,
162+
innerSource,
163+
WatermarkStrategy.noWatermarks(),
164+
resolvedTypeInfo,
165+
getParallelism(),
166+
false);
167+
return new NonKeyedPartitionStreamImpl<>(this, sourceTransformation);
168+
} else if (source instanceof FromDataSource) {
169+
Collection<OUT> data = ((FromDataSource<OUT>) source).getData();
170+
TypeInformation<OUT> outType = extractTypeInfoFromCollection(data);
171+
172+
FromElementsGeneratorFunction<OUT> generatorFunction =
173+
new FromElementsGeneratorFunction<>(outType, executionConfig, data);
174+
175+
DataGeneratorSource<OUT> generatorSource =
176+
new DataGeneratorSource<>(generatorFunction, data.size(), outType);
177+
178+
return fromSource(new WrappedSource<>(generatorSource), "Collection Source");
179+
} else {
180+
throw new UnsupportedOperationException(
181+
"Unsupported type of sink, you could use DataStreamV2SourceUtils to wrap a FLIP-27 based source.");
182+
}
183+
}
184+
135185
public Configuration getConfiguration() {
136186
return this.configuration;
137187
}
@@ -156,6 +206,54 @@ public void setParallelism(int parallelism) {
156206
// Internal Methods
157207
// -----------------------------------------------
158208

209+
private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) {
210+
Preconditions.checkNotNull(data, "Collection must not be null");
211+
if (data.isEmpty()) {
212+
throw new IllegalArgumentException("Collection must not be empty");
213+
}
214+
215+
OUT first = data.iterator().next();
216+
if (first == null) {
217+
throw new IllegalArgumentException("Collection must not contain null elements");
218+
}
219+
220+
TypeInformation<OUT> typeInfo;
221+
try {
222+
typeInfo = TypeExtractor.getForObject(first);
223+
} catch (Exception e) {
224+
throw new RuntimeException(
225+
"Could not create TypeInformation for type "
226+
+ first.getClass()
227+
+ "; please specify the TypeInformation manually via the version of the "
228+
+ "method that explicitly accepts it as an argument.",
229+
e);
230+
}
231+
return typeInfo;
232+
}
233+
234+
@SuppressWarnings("unchecked")
235+
private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(
236+
org.apache.flink.api.connector.source.Source<OUT, ?, ?> source, String sourceName) {
237+
TypeInformation<OUT> resolvedTypeInfo = null;
238+
if (source instanceof ResultTypeQueryable) {
239+
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
240+
}
241+
if (resolvedTypeInfo == null) {
242+
try {
243+
resolvedTypeInfo =
244+
TypeExtractor.createTypeInfo(
245+
org.apache.flink.api.connector.source.Source.class,
246+
source.getClass(),
247+
0,
248+
null,
249+
null);
250+
} catch (final InvalidTypesException e) {
251+
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
252+
}
253+
}
254+
return (T) resolvedTypeInfo;
255+
}
256+
159257
public void addOperator(Transformation<?> transformation) {
160258
checkNotNull(transformation, "transformation must not be null.");
161259
this.transformations.add(transformation);

Diff for: flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,20 @@
2020

2121
import org.apache.flink.api.common.RuntimeExecutionMode;
2222
import org.apache.flink.api.common.typeinfo.Types;
23+
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
2526
import org.apache.flink.datastream.api.ExecutionEnvironment;
27+
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
28+
import org.apache.flink.datastream.impl.stream.StreamTestUtils;
29+
import org.apache.flink.streaming.api.graph.StreamGraph;
30+
import org.apache.flink.streaming.api.graph.StreamNode;
2631

2732
import org.junit.jupiter.api.Test;
2833

34+
import java.util.Arrays;
35+
import java.util.Collection;
36+
2937
import static org.assertj.core.api.Assertions.assertThat;
3038

3139
/** Tests for {@link ExecutionEnvironmentImpl}. */
@@ -67,4 +75,21 @@ void testSetExecutionMode() {
6775
env.setExecutionMode(RuntimeExecutionMode.BATCH);
6876
assertThat(env.getExecutionMode()).isEqualTo(RuntimeExecutionMode.BATCH);
6977
}
78+
79+
@Test
80+
void testFromSource() {
81+
ExecutionEnvironmentImpl env =
82+
new ExecutionEnvironmentImpl(
83+
new DefaultExecutorServiceLoader(), new Configuration(), null);
84+
NonKeyedPartitionStream<Integer> source =
85+
env.fromSource(
86+
DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source");
87+
source.process(new StreamTestUtils.NoOpOneInputStreamProcessFunction());
88+
StreamGraph streamGraph = StreamTestUtils.getStreamGraph(env);
89+
Collection<StreamNode> nodes = streamGraph.getStreamNodes();
90+
assertThat(nodes).hasSize(2);
91+
Collection<Integer> sourceIDs = streamGraph.getSourceIDs();
92+
StreamNode sourceNode = nodes.iterator().next();
93+
assertThat(sourceIDs).containsExactly(sourceNode.getId());
94+
}
7095
}

0 commit comments

Comments
 (0)