Skip to content

Commit

Permalink
[FLINK-34548][API] Supports FLIP-27 Source
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Mar 18, 2024
1 parent 58a5c2a commit 70b9ca2
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.v2;

import org.apache.flink.annotation.Experimental;

/**
* Source interface for DataStream api v2. Note that this is a temporary approach, will be removed
* in the future.
*/
@Experimental
public interface Source<T> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.v2;

import org.apache.flink.annotation.Internal;

import java.util.Collection;

/** DataStream v2 source that emit data from pre-provided data. */
@Internal
public class FromDataSource<T> implements Source<T> {
private final Collection<T> data;

public FromDataSource(Collection<T> data) {
this.data = data;
}

public Collection<T> getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.v2;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.util.Preconditions;

import java.util.Collection;

/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */
@Experimental
public final class SourceUtils {
public static <T> Source<T> wrapSource(
org.apache.flink.api.connector.source.Source<T, ?, ?> source) {
return new WrappedSource<>(source);
}

/**
* Creates a source that contains the given elements.The type of the data stream is that of the
* elements in the collection.
*
* @param data The collection of elements to create the source from.
* @param <T> The generic type of the returned data stream.
* @return The source representing the given collection
*/
public static <T> Source<T> fromData(Collection<T> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
return new FromDataSource<>(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.v2;

import org.apache.flink.annotation.Internal;

/** A simple {@link Source} implementation that wrap a FLIP-27 source. */
@Internal
public class WrappedSource<T> implements Source<T> {
org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource;

public WrappedSource(org.apache.flink.api.connector.source.Source<T, ?, ?> wrappedSource) {
this.wrappedSource = wrappedSource;
}

public org.apache.flink.api.connector.source.Source<T, ?, ?> getWrappedSource() {
return wrappedSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.v2.Source;
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;

/**
* This is the context in which a program is executed.
Expand Down Expand Up @@ -49,5 +51,5 @@ static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperation
/** Set the execution mode for this environment. */
ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode);

// TODO introduce method to add source
<OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,40 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.v2.FromDataSource;
import org.apache.flink.api.connector.v2.Source;
import org.apache.flink.api.connector.v2.WrappedSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.process.api.ExecutionEnvironment;
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -119,6 +135,40 @@ protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
}

@Override
public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String sourceName) {
if (source instanceof WrappedSource) {
org.apache.flink.api.connector.source.Source<OUT, ?, ?> innerSource =
((WrappedSource<OUT>) source).getWrappedSource();
final TypeInformation<OUT> resolvedTypeInfo =
getSourceTypeInfo(innerSource, sourceName);

SourceTransformation<OUT, ?, ?> sourceTransformation =
new SourceTransformation<>(
sourceName,
innerSource,
WatermarkStrategy.noWatermarks(),
resolvedTypeInfo,
getParallelism(),
false);
return new NonKeyedPartitionStreamImpl<>(this, sourceTransformation);
} else if (source instanceof FromDataSource) {
Collection<OUT> data = ((FromDataSource<OUT>) source).getData();
TypeInformation<OUT> outType = extractTypeInfoFromCollection(data);

FromElementsGeneratorFunction<OUT> generatorFunction =
new FromElementsGeneratorFunction<>(outType, executionConfig, data);

DataGeneratorSource<OUT> generatorSource =
new DataGeneratorSource<>(generatorFunction, data.size(), outType);

return fromSource(new WrappedSource<>(generatorSource), "Collection Source");
} else {
throw new UnsupportedOperationException(
"Unsupported type of sink, you could use SourceUtils to wrap a FLIP-27 based source.");
}
}

public Configuration getConfiguration() {
return this.configuration;
}
Expand All @@ -143,6 +193,54 @@ public void setParallelism(int parallelism) {
// Internal Methods
// -----------------------------------------------

private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
}

OUT first = data.iterator().next();
if (first == null) {
throw new IllegalArgumentException("Collection must not contain null elements");
}

TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(first);
} catch (Exception e) {
throw new RuntimeException(
"Could not create TypeInformation for type "
+ first.getClass()
+ "; please specify the TypeInformation manually via the version of the "
+ "method that explicitly accepts it as an argument.",
e);
}
return typeInfo;
}

@SuppressWarnings("unchecked")
private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(
org.apache.flink.api.connector.source.Source<OUT, ?, ?> source, String sourceName) {
TypeInformation<OUT> resolvedTypeInfo = null;
if (source instanceof ResultTypeQueryable) {
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
}
if (resolvedTypeInfo == null) {
try {
resolvedTypeInfo =
TypeExtractor.createTypeInfo(
org.apache.flink.api.connector.source.Source.class,
source.getClass(),
0,
null,
null);
} catch (final InvalidTypesException e) {
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
return (T) resolvedTypeInfo;
}

public void addOperator(Transformation<?> transformation) {
checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.v2.SourceUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.process.api.ExecutionEnvironment;
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
import org.apache.flink.process.impl.stream.StreamTestUtils;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collection;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ExecutionEnvironmentImpl}. */
Expand Down Expand Up @@ -67,4 +75,20 @@ void testSetExecutionMode() {
env.setExecutionMode(RuntimeExecutionMode.BATCH);
assertThat(env.getExecutionMode()).isEqualTo(RuntimeExecutionMode.BATCH);
}

@Test
void testFromSource() {
ExecutionEnvironmentImpl env =
new ExecutionEnvironmentImpl(
new DefaultExecutorServiceLoader(), new Configuration(), null);
NonKeyedPartitionStream<Integer> source =
env.fromSource(SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source");
source.process(new StreamTestUtils.NoOpOneInputStreamProcessFunction());
StreamGraph streamGraph = StreamTestUtils.getStreamGraph(env);
Collection<StreamNode> nodes = streamGraph.getStreamNodes();
assertThat(nodes).hasSize(2);
Collection<Integer> sourceIDs = streamGraph.getSourceIDs();
StreamNode sourceNode = nodes.iterator().next();
assertThat(sourceIDs).containsExactly(sourceNode.getId());
}
}

0 comments on commit 70b9ca2

Please sign in to comment.