Skip to content

Commit de4bac7

Browse files
committed
[FLINK-34548][API] Supports FLIP-27 Source
1 parent b9853ae commit de4bac7

File tree

7 files changed

+271
-1
lines changed

7 files changed

+271
-1
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Source interface for DataStream api v2. Note that this is a temporary approach, will be removed
25+
* in the future.
26+
*/
27+
@Experimental
28+
public interface Source<T> {}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import java.util.Collection;
24+
25+
/** DataStream v2 source that emit data from pre-provided data. */
26+
@Internal
27+
public class FromDataSource<T> implements Source<T> {
28+
private final Collection<T> data;
29+
30+
public FromDataSource(Collection<T> data) {
31+
this.data = data;
32+
}
33+
34+
public Collection<T> getData() {
35+
return data;
36+
}
37+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.util.Preconditions;
23+
24+
import java.util.Collection;
25+
26+
/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */
27+
@Experimental
28+
public final class SourceUtils {
29+
public static <T> Source<T> wrapSource(
30+
org.apache.flink.api.connector.source.Source<T, ?, ?> source) {
31+
return new WrappedSource<>(source);
32+
}
33+
34+
/**
35+
* Creates a source that contains the given elements.The type of the data stream is that of the
36+
* elements in the collection.
37+
*
38+
* @param data The collection of elements to create the source from.
39+
* @param <T> The generic type of the returned data stream.
40+
* @return The source representing the given collection
41+
*/
42+
public static <T> Source<T> fromData(Collection<T> data) {
43+
Preconditions.checkNotNull(data, "Collection must not be null");
44+
return new FromDataSource<>(data);
45+
}
46+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.v2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** A simple {@link Source} implementation that wrap a FLIP-27 source. */
24+
@Internal
25+
public class WrappedSource<T> implements Source<T> {
26+
org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource;
27+
28+
public WrappedSource(org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource) {
29+
this.wrappedSource = wrappedSource;
30+
}
31+
32+
public org.apache.flink.api.connector.source.Source<T, ?, ?> getWrappedSource() {
33+
return wrappedSource;
34+
}
35+
}

flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import org.apache.flink.api.connector.v2.Source;
24+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
2325

2426
/**
2527
* This is the context in which a program is executed.
@@ -45,5 +47,5 @@ static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperation
4547

4648
ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode);
4749

48-
// TODO introduce method to add source
50+
<OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName);
4951
}

flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,40 @@
2020

2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
24+
import org.apache.flink.api.common.functions.InvalidTypesException;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.connector.v2.FromDataSource;
27+
import org.apache.flink.api.connector.v2.Source;
28+
import org.apache.flink.api.connector.v2.WrappedSource;
2329
import org.apache.flink.api.dag.Transformation;
30+
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
31+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
32+
import org.apache.flink.api.java.typeutils.TypeExtractor;
2433
import org.apache.flink.configuration.Configuration;
2534
import org.apache.flink.configuration.DeploymentOptions;
2635
import org.apache.flink.configuration.ExecutionOptions;
2736
import org.apache.flink.configuration.ReadableConfig;
37+
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
38+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
2839
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
2940
import org.apache.flink.core.execution.JobClient;
3041
import org.apache.flink.core.execution.PipelineExecutor;
3142
import org.apache.flink.core.execution.PipelineExecutorFactory;
3243
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
3344
import org.apache.flink.process.api.ExecutionEnvironment;
45+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
46+
import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
3447
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3548
import org.apache.flink.streaming.api.graph.StreamGraph;
3649
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
50+
import org.apache.flink.streaming.api.transformations.SourceTransformation;
3751
import org.apache.flink.util.ExceptionUtils;
3852
import org.apache.flink.util.FlinkException;
53+
import org.apache.flink.util.Preconditions;
3954

4055
import java.util.ArrayList;
56+
import java.util.Collection;
4157
import java.util.List;
4258
import java.util.concurrent.CompletableFuture;
4359
import java.util.concurrent.ExecutionException;
@@ -114,6 +130,40 @@ protected static void resetContextEnvironment() {
114130
contextEnvironmentFactory = null;
115131
}
116132

133+
@Override
134+
public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName) {
135+
if (source instanceof WrappedSource) {
136+
org.apache.flink.api.connector.source.Source<OUT, ?, ?> innerSource =
137+
((WrappedSource<OUT>) source).getWrappedSource();
138+
final TypeInformation<OUT> resolvedTypeInfo =
139+
getSourceTypeInfo(innerSource, sourceName);
140+
141+
SourceTransformation<OUT, ?, ?> sourceTransformation =
142+
new SourceTransformation<>(
143+
sourceName,
144+
innerSource,
145+
WatermarkStrategy.noWatermarks(),
146+
resolvedTypeInfo,
147+
getParallelism(),
148+
false);
149+
return new NonKeyedPartitionStreamImpl<>(this, sourceTransformation);
150+
} else if (source instanceof FromDataSource) {
151+
Collection<OUT> data = ((FromDataSource<OUT>) source).getData();
152+
TypeInformation<OUT> outType = extractTypeInfoFromCollection(data);
153+
154+
FromElementsGeneratorFunction<OUT> generatorFunction =
155+
new FromElementsGeneratorFunction<>(outType, executionConfig, data);
156+
157+
DataGeneratorSource<OUT> generatorSource =
158+
new DataGeneratorSource<>(generatorFunction, data.size(), outType);
159+
160+
return fromSource(new WrappedSource<>(generatorSource), "Collection Source");
161+
} else {
162+
throw new UnsupportedOperationException(
163+
"Unsupported type of sink, you could use SourceUtils to wrap a FLIP-27 based source.");
164+
}
165+
}
166+
117167
public Configuration getConfiguration() {
118168
return this.configuration;
119169
}
@@ -134,6 +184,54 @@ public List<Transformation<?>> getTransformations() {
134184
// Internal Methods
135185
// -----------------------------------------------
136186

187+
private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) {
188+
Preconditions.checkNotNull(data, "Collection must not be null");
189+
if (data.isEmpty()) {
190+
throw new IllegalArgumentException("Collection must not be empty");
191+
}
192+
193+
OUT first = data.iterator().next();
194+
if (first == null) {
195+
throw new IllegalArgumentException("Collection must not contain null elements");
196+
}
197+
198+
TypeInformation<OUT> typeInfo;
199+
try {
200+
typeInfo = TypeExtractor.getForObject(first);
201+
} catch (Exception e) {
202+
throw new RuntimeException(
203+
"Could not create TypeInformation for type "
204+
+ first.getClass()
205+
+ "; please specify the TypeInformation manually via the version of the "
206+
+ "method that explicitly accepts it as an argument.",
207+
e);
208+
}
209+
return typeInfo;
210+
}
211+
212+
@SuppressWarnings("unchecked")
213+
private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(
214+
org.apache.flink.api.connector.source.Source<OUT, ?, ?> source, String sourceName) {
215+
TypeInformation<OUT> resolvedTypeInfo = null;
216+
if (source instanceof ResultTypeQueryable) {
217+
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
218+
}
219+
if (resolvedTypeInfo == null) {
220+
try {
221+
resolvedTypeInfo =
222+
TypeExtractor.createTypeInfo(
223+
org.apache.flink.api.connector.source.Source.class,
224+
source.getClass(),
225+
0,
226+
null,
227+
null);
228+
} catch (final InvalidTypesException e) {
229+
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
230+
}
231+
}
232+
return (T) resolvedTypeInfo;
233+
}
234+
137235
public void addOperator(Transformation<?> transformation) {
138236
checkNotNull(transformation, "transformation must not be null.");
139237
this.transformations.add(transformation);

flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,20 @@
1919
package org.apache.flink.process.impl;
2020

2121
import org.apache.flink.api.common.typeinfo.Types;
22+
import org.apache.flink.api.connector.v2.SourceUtils;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
2425
import org.apache.flink.process.api.ExecutionEnvironment;
26+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
27+
import org.apache.flink.process.impl.stream.StreamTestUtils;
28+
import org.apache.flink.streaming.api.graph.StreamGraph;
29+
import org.apache.flink.streaming.api.graph.StreamNode;
2530

2631
import org.junit.jupiter.api.Test;
2732

33+
import java.util.Arrays;
34+
import java.util.Collection;
35+
2836
import static org.assertj.core.api.Assertions.assertThat;
2937

3038
/** Tests for {@link ExecutionEnvironmentImpl}. */
@@ -57,4 +65,20 @@ void testAddOperator() {
5765
env.addOperator(t2);
5866
assertThat(env.getTransformations()).containsExactly(t1, t2);
5967
}
68+
69+
@Test
70+
void testFromSource() {
71+
ExecutionEnvironmentImpl env =
72+
new ExecutionEnvironmentImpl(
73+
new DefaultExecutorServiceLoader(), new Configuration(), null);
74+
NonKeyedPartitionStream<Integer> source =
75+
env.fromSource(SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source");
76+
source.process(new StreamTestUtils.NoOpOneInputStreamProcessFunction());
77+
StreamGraph streamGraph = StreamTestUtils.getStreamGraph(env);
78+
Collection<StreamNode> nodes = streamGraph.getStreamNodes();
79+
assertThat(nodes).hasSize(2);
80+
Collection<Integer> sourceIDs = streamGraph.getSourceIDs();
81+
StreamNode sourceNode = nodes.iterator().next();
82+
assertThat(sourceIDs).containsExactly(sourceNode.getId());
83+
}
6084
}

0 commit comments

Comments
 (0)