From 70b9ca2e271f3fad754866e74ff7df615cca0911 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Wed, 21 Feb 2024 11:22:43 +0800 Subject: [PATCH] [FLINK-34548][API] Supports FLIP-27 Source --- .../apache/flink/api/connector/v2/Source.java | 28 ++++++ .../api/connector/v2/FromDataSource.java | 37 +++++++ .../flink/api/connector/v2/SourceUtils.java | 46 +++++++++ .../flink/api/connector/v2/WrappedSource.java | 35 +++++++ .../process/api/ExecutionEnvironment.java | 4 +- .../impl/ExecutionEnvironmentImpl.java | 98 +++++++++++++++++++ .../impl/ExecutionEnvironmentImplTest.java | 24 +++++ 7 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 flink-core-api/src/main/java/org/apache/flink/api/connector/v2/Source.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/v2/FromDataSource.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/v2/WrappedSource.java diff --git a/flink-core-api/src/main/java/org/apache/flink/api/connector/v2/Source.java b/flink-core-api/src/main/java/org/apache/flink/api/connector/v2/Source.java new file mode 100644 index 00000000000000..6653835c1a5290 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/connector/v2/Source.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Source interface for DataStream api v2. Note that this is a temporary approach, will be removed + * in the future. + */ +@Experimental +public interface Source {} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/v2/FromDataSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/v2/FromDataSource.java new file mode 100644 index 00000000000000..12658dfd7f3793 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/v2/FromDataSource.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Internal; + +import java.util.Collection; + +/** DataStream v2 source that emit data from pre-provided data. */ +@Internal +public class FromDataSource implements Source { + private final Collection data; + + public FromDataSource(Collection data) { + this.data = data; + } + + public Collection getData() { + return data; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java b/flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java new file mode 100644 index 00000000000000..08a46f35964ad6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */ +@Experimental +public final class SourceUtils { + public static Source wrapSource( + org.apache.flink.api.connector.source.Source source) { + return new WrappedSource<>(source); + } + + /** + * Creates a source that contains the given elements.The type of the data stream is that of the + * elements in the collection. + * + * @param data The collection of elements to create the source from. + * @param The generic type of the returned data stream. + * @return The source representing the given collection + */ + public static Source fromData(Collection data) { + Preconditions.checkNotNull(data, "Collection must not be null"); + return new FromDataSource<>(data); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/v2/WrappedSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/v2/WrappedSource.java new file mode 100644 index 00000000000000..b7e2ab21da9122 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/v2/WrappedSource.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Internal; + +/** A simple {@link Source} implementation that wrap a FLIP-27 source. */ +@Internal +public class WrappedSource implements Source { + org.apache.flink.api.connector.source.Source wrappedSource; + + public WrappedSource(org.apache.flink.api.connector.source.Source wrappedSource) { + this.wrappedSource = wrappedSource; + } + + public org.apache.flink.api.connector.source.Source getWrappedSource() { + return wrappedSource; + } +} diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java index ea1215089d595b..838ad21c129d18 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.connector.v2.Source; +import org.apache.flink.process.api.stream.NonKeyedPartitionStream; /** * This is the context in which a program is executed. @@ -49,5 +51,5 @@ static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperation /** Set the execution mode for this environment. */ ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode); - // TODO introduce method to add source + NonKeyedPartitionStream fromSource(Source source, String sourceName); } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java index bb226c3e1ca6ab..a9279040e71bba 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java @@ -20,24 +20,40 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.v2.FromDataSource; +import org.apache.flink.api.connector.v2.Source; +import org.apache.flink.api.connector.v2.WrappedSource; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.process.api.ExecutionEnvironment; +import org.apache.flink.process.api.stream.NonKeyedPartitionStream; +import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -119,6 +135,40 @@ protected static void resetContextEnvironment() { contextEnvironmentFactory = null; } + @Override + public NonKeyedPartitionStream fromSource(Source source, String sourceName) { + if (source instanceof WrappedSource) { + org.apache.flink.api.connector.source.Source innerSource = + ((WrappedSource) source).getWrappedSource(); + final TypeInformation resolvedTypeInfo = + getSourceTypeInfo(innerSource, sourceName); + + SourceTransformation sourceTransformation = + new SourceTransformation<>( + sourceName, + innerSource, + WatermarkStrategy.noWatermarks(), + resolvedTypeInfo, + getParallelism(), + false); + return new NonKeyedPartitionStreamImpl<>(this, sourceTransformation); + } else if (source instanceof FromDataSource) { + Collection data = ((FromDataSource) source).getData(); + TypeInformation outType = extractTypeInfoFromCollection(data); + + FromElementsGeneratorFunction generatorFunction = + new FromElementsGeneratorFunction<>(outType, executionConfig, data); + + DataGeneratorSource generatorSource = + new DataGeneratorSource<>(generatorFunction, data.size(), outType); + + return fromSource(new WrappedSource<>(generatorSource), "Collection Source"); + } else { + throw new UnsupportedOperationException( + "Unsupported type of sink, you could use SourceUtils to wrap a FLIP-27 based source."); + } + } + public Configuration getConfiguration() { return this.configuration; } @@ -143,6 +193,54 @@ public void setParallelism(int parallelism) { // Internal Methods // ----------------------------------------------- + private static TypeInformation extractTypeInfoFromCollection(Collection data) { + Preconditions.checkNotNull(data, "Collection must not be null"); + if (data.isEmpty()) { + throw new IllegalArgumentException("Collection must not be empty"); + } + + OUT first = data.iterator().next(); + if (first == null) { + throw new IllegalArgumentException("Collection must not contain null elements"); + } + + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForObject(first); + } catch (Exception e) { + throw new RuntimeException( + "Could not create TypeInformation for type " + + first.getClass() + + "; please specify the TypeInformation manually via the version of the " + + "method that explicitly accepts it as an argument.", + e); + } + return typeInfo; + } + + @SuppressWarnings("unchecked") + private static > T getSourceTypeInfo( + org.apache.flink.api.connector.source.Source source, String sourceName) { + TypeInformation resolvedTypeInfo = null; + if (source instanceof ResultTypeQueryable) { + resolvedTypeInfo = ((ResultTypeQueryable) source).getProducedType(); + } + if (resolvedTypeInfo == null) { + try { + resolvedTypeInfo = + TypeExtractor.createTypeInfo( + org.apache.flink.api.connector.source.Source.class, + source.getClass(), + 0, + null, + null); + } catch (final InvalidTypesException e) { + resolvedTypeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e); + } + } + return (T) resolvedTypeInfo; + } + public void addOperator(Transformation transformation) { checkNotNull(transformation, "transformation must not be null."); this.transformations.add(transformation); diff --git a/flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java b/flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java index c358b76d6c0786..e2eda95c2edea0 100644 --- a/flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java +++ b/flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java @@ -20,12 +20,20 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.v2.SourceUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.process.api.ExecutionEnvironment; +import org.apache.flink.process.api.stream.NonKeyedPartitionStream; +import org.apache.flink.process.impl.stream.StreamTestUtils; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collection; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link ExecutionEnvironmentImpl}. */ @@ -67,4 +75,20 @@ void testSetExecutionMode() { env.setExecutionMode(RuntimeExecutionMode.BATCH); assertThat(env.getExecutionMode()).isEqualTo(RuntimeExecutionMode.BATCH); } + + @Test + void testFromSource() { + ExecutionEnvironmentImpl env = + new ExecutionEnvironmentImpl( + new DefaultExecutorServiceLoader(), new Configuration(), null); + NonKeyedPartitionStream source = + env.fromSource(SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.process(new StreamTestUtils.NoOpOneInputStreamProcessFunction()); + StreamGraph streamGraph = StreamTestUtils.getStreamGraph(env); + Collection nodes = streamGraph.getStreamNodes(); + assertThat(nodes).hasSize(2); + Collection sourceIDs = streamGraph.getSourceIDs(); + StreamNode sourceNode = nodes.iterator().next(); + assertThat(sourceIDs).containsExactly(sourceNode.getId()); + } }