From aabeb54f64a28a212fc584ae081364f0f4a8105a Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Fri, 22 Mar 2024 17:44:43 +0800 Subject: [PATCH] [FLINK-34548][API] Supports sink-v2 Sink --- .../apache/flink/api/connector/dsv2/Sink.java | 30 ++ .../connector/dsv2/DataStreamV2SinkUtils.java | 35 ++ .../flink/api/connector/dsv2/WrappedSink.java | 35 ++ .../datastream/api/stream/GlobalStream.java | 3 +- .../api/stream/KeyedPartitionStream.java | 3 +- .../api/stream/NonKeyedPartitionStream.java | 3 +- .../impl/ExecutionEnvironmentImpl.java | 17 + .../impl/stream/GlobalStreamImpl.java | 11 +- .../impl/stream/KeyedPartitionStreamImpl.java | 6 +- .../stream/NonKeyedPartitionStreamImpl.java | 6 +- .../datastream/impl/utils/StreamUtils.java | 57 +++ .../DataStreamV2SinkTransformation.java | 96 +++++ ...aStreamV2SinkTransformationTranslator.java | 367 ++++++++++++++++++ .../impl/stream/GlobalStreamImplTest.java | 16 + .../stream/KeyedPartitionStreamImplTest.java | 16 + .../NonKeyedPartitionStreamImplTest.java | 16 + 16 files changed, 711 insertions(+), 6 deletions(-) create mode 100644 flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java diff --git a/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java b/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java new file mode 100644 index 00000000000000..3ab7403c0c5dd5 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.dsv2; + +import org.apache.flink.annotation.Experimental; + +/** + * Sink interface for DataStream api v2. + * + *

Note that this interface is just a placeholder because we haven't decided whether to use + * sink-v2 based sink or design a new sink connector API for DataStream V2. + */ +@Experimental +public interface Sink {} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java new file mode 100644 index 00000000000000..4d35ac45380a1c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.dsv2; + +import org.apache.flink.annotation.Experimental; + +/** Utils to create the DataStream V2 supported {@link Sink}. */ +@Experimental +public class DataStreamV2SinkUtils { + /** + * Wrap a sink-v2 based sink to a DataStream V2 supported sink. + * + * @param sink The sink-v2 based sink to wrap. + * @return The DataStream V2 supported sink. + */ + public static Sink wrapSink(org.apache.flink.api.connector.sink2.Sink sink) { + return new WrappedSink<>(sink); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java new file mode 100644 index 00000000000000..97c2d2eacc76e4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.dsv2; + +import org.apache.flink.annotation.Internal; + +/** A simple {@link Sink} implementation that wrap a sink-v2 based sink. */ +@Internal +public class WrappedSink implements Sink { + org.apache.flink.api.connector.sink2.Sink wrappedSink; + + public WrappedSink(org.apache.flink.api.connector.sink2.Sink wrappedSink) { + this.wrappedSink = wrappedSink; + } + + public org.apache.flink.api.connector.sink2.Sink getWrappedSink() { + return wrappedSink; + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java index cb8e0b83dd7c49..e7c975dfe26dc3 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.api.stream; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; @@ -78,7 +79,7 @@ GlobalStream connectAndProcess( */ BroadcastStream broadcast(); - // TODO add toSink method. + void toSink(Sink sink); /** * This class represents a combination of two {@link GlobalStream}. It will be used as the diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java index 40e15f08646e86..1c4ffd8a2a9889 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.api.stream; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; @@ -194,7 +195,7 @@ KeyedPartitionStream connectAndProcess( */ BroadcastStream broadcast(); - // TODO add toSink method. + void toSink(Sink sink); /** * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java index dd4a1ee2855d60..be21d312400b2c 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.api.stream; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; @@ -100,7 +101,7 @@ NonKeyedPartitionStream connectAndProcess( */ BroadcastStream broadcast(); - // TODO add toSink method. + void toSink(Sink sink); /** * This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java index a2dc28fe590901..4a3f2bdad0b63f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java @@ -48,6 +48,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -87,6 +88,18 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment { */ private static ExecutionEnvironmentFactory contextEnvironmentFactory = null; + static { + try { + // All transformation translator must be put to a map in StreamGraphGenerator, but + // streaming-java is not depend on process-function module, using reflect to handle + // this. + DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator(); + } catch (Exception e) { + throw new RuntimeException( + "Can not register process function transformation translator."); + } + } + /** * Create and return an instance of {@link ExecutionEnvironment}. * @@ -202,6 +215,10 @@ public void setParallelism(int parallelism) { executionConfig.setParallelism(parallelism); } + public CheckpointConfig getCheckpointCfg() { + return checkpointCfg; + } + // ----------------------------------------------- // Internal Methods // ----------------------------------------------- diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java index 187918a584dafa..3d977c54468e08 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,6 +37,7 @@ import org.apache.flink.datastream.impl.utils.StreamUtils; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; @@ -98,7 +100,14 @@ public GlobalStream connectAndProcess( return new GlobalStreamImpl<>(environment, outTransformation); } - // TODO add toSink method. + @Override + public void toSink(Sink sink) { + DataStreamV2SinkTransformation sinkTransformation = + StreamUtils.addSinkOperator(this, sink, getType()); + // Operator parallelism should always be 1 for global stream. + // parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler. + sinkTransformation.setParallelism(1, true); + } // --------------------- // Partitioning diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index 2b9c3b378a5adf..5ec7c125d9fbc3 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -325,7 +326,10 @@ public KeySelector getKeySelector() { return keySelector; } - // TODO add toSink method. + @Override + public void toSink(Sink sink) { + StreamUtils.addSinkOperator(this, sink, getType()); + } // --------------------- // Partitioning diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java index 9cc9a52dd5f8c6..d987a868020d52 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.dsv2.Sink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -132,7 +133,10 @@ public NonKeyedPartitionStream connectAndProcess( return new NonKeyedPartitionStreamImpl<>(environment, outTransformation); } - // TODO add toSink method. + @Override + public void toSink(Sink sink) { + StreamUtils.addSinkOperator(this, sink, getType()); + } // --------------------- // Partitioning diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java index 0471c0f9a0a694..efe8927dba2a43 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java @@ -19,6 +19,8 @@ package org.apache.flink.datastream.impl.utils; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.dsv2.Sink; +import org.apache.flink.api.connector.dsv2.WrappedSink; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -29,10 +31,13 @@ import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.stream.AbstractDataStream; import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl; +import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; @@ -228,4 +233,56 @@ public static TwoInputTransformation getTwoInputT return transform; } + + /** Construct and return a new DataStream with one input operator. */ + public static AbstractDataStream transformOneInputOperator( + String operatorName, + AbstractDataStream inputStream, + TypeInformation outTypeInfo, + StreamOperatorFactory operatorFactory) { + // read the output type of the input Transform to coax out errors about MissingTypeInfo + inputStream.getTransformation().getOutputType(); + + OneInputTransformation resultTransform = + new OneInputTransformation<>( + inputStream.getTransformation(), + operatorName, + operatorFactory, + outTypeInfo, + inputStream.getEnvironment().getParallelism(), + false); + + NonKeyedPartitionStreamImpl returnStream = + new NonKeyedPartitionStreamImpl<>(inputStream.getEnvironment(), resultTransform); + + inputStream.getEnvironment().addOperator(resultTransform); + + return returnStream; + } + + /** Add sink operator to the input stream. */ + public static DataStreamV2SinkTransformation addSinkOperator( + AbstractDataStream inputStream, Sink sink, TypeInformation typeInformation) { + // read the output type of the input Transform to coax out errors about MissingTypeInfo + inputStream.getTransformation().getOutputType(); + + if (!(sink instanceof WrappedSink)) { + throw new UnsupportedOperationException( + "Unsupported type of sink, please use DataStreamV2SinkUtils to wrap a sink-v2 sink first."); + } + + org.apache.flink.api.connector.sink2.Sink innerSink = + ((WrappedSink) sink).getWrappedSink(); + + DataStreamV2SinkTransformation sinkTransformation = + new DataStreamV2SinkTransformation<>( + inputStream, + innerSink, + typeInformation, + "Sink", + inputStream.getEnvironment().getParallelism(), + false); + inputStream.getEnvironment().addOperator(sinkTransformation); + return sinkTransformation; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java b/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java new file mode 100644 index 00000000000000..ae702d9e2591e4 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.datastream.impl.stream.AbstractDataStream; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import org.apache.commons.compress.utils.Lists; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The {@link Transformation} for DataStream V2 based sink. + * + * @param The input type of the {@link SinkWriter} + * @param The output type of the {@link Sink} + */ +@Internal +public class DataStreamV2SinkTransformation + extends PhysicalTransformation { + private final AbstractDataStream inputStream; + private final Sink sink; + private final Transformation input; + + private ChainingStrategy chainingStrategy; + + public DataStreamV2SinkTransformation( + AbstractDataStream inputStream, + Sink sink, + TypeInformation outputType, + String name, + int parallelism, + boolean parallelismConfigured) { + super(name, outputType, parallelism, parallelismConfigured); + this.inputStream = checkNotNull(inputStream); + this.sink = checkNotNull(sink); + this.input = inputStream.getTransformation(); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + chainingStrategy = checkNotNull(strategy); + } + + @Override + public List> getTransitivePredecessors() { + final List> result = Lists.newArrayList(); + result.add(this); + result.addAll(input.getTransitivePredecessors()); + return result; + } + + @Override + public List> getInputs() { + return Collections.singletonList(input); + } + + @Nullable + public ChainingStrategy getChainingStrategy() { + return chainingStrategy; + } + + public AbstractDataStream getInputStream() { + return inputStream; + } + + public Sink getSink() { + return sink; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java new file mode 100644 index 00000000000000..1884ff527a64e6 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.translators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; +import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.stream.AbstractDataStream; +import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl; +import org.apache.flink.datastream.impl.utils.StreamUtils; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; +import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.graph.TransformationTranslator; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** The {@link TransformationTranslator} for the {@link DataStreamV2SinkTransformation}. */ +@Internal +public class DataStreamV2SinkTransformationTranslator + implements TransformationTranslator> { + + private static final String COMMITTER_NAME = "Committer"; + private static final String WRITER_NAME = "Writer"; + + @Override + public Collection translateForBatch( + DataStreamV2SinkTransformation transformation, Context context) { + return translateInternal(transformation, context, true); + } + + @Override + public Collection translateForStreaming( + DataStreamV2SinkTransformation transformation, Context context) { + return translateInternal(transformation, context, false); + } + + private Collection translateInternal( + DataStreamV2SinkTransformation transformation, + Context context, + boolean batch) { + SinkExpander expander = + new SinkExpander<>( + transformation.getInputStream(), + transformation.getSink(), + transformation, + context, + batch); + expander.expand(); + return Collections.emptyList(); + } + + @SuppressWarnings("rawtypes,unchecked") + public static void registerSinkTransformationTranslator() throws Exception { + final Field translatorMapField = + StreamGraphGenerator.class.getDeclaredField("translatorMap"); + translatorMapField.setAccessible(true); + final Map, TransformationTranslator> translatorMap = + (Map, TransformationTranslator>) + translatorMapField.get(null); + final Field underlyingMapField = translatorMap.getClass().getDeclaredField("m"); + underlyingMapField.setAccessible(true); + final Map, TransformationTranslator> underlyingMap = + (Map, TransformationTranslator>) + underlyingMapField.get(translatorMap); + + underlyingMap.put( + DataStreamV2SinkTransformation.class, + new DataStreamV2SinkTransformationTranslator<>()); + } + + /** + * Expands the Sink to a sub-topology. Currently, user-defined topologies are not supported. + * That is, sub-topologies will contain only committers and writers. + */ + private static class SinkExpander { + private final DataStreamV2SinkTransformation transformation; + private final Sink sink; + private final Context context; + private final AbstractDataStream inputStream; + private final ExecutionEnvironmentImpl executionEnvironment; + private final boolean isCheckpointingEnabled; + private final boolean isBatchMode; + + public SinkExpander( + AbstractDataStream inputStream, + Sink sink, + DataStreamV2SinkTransformation transformation, + Context context, + boolean isBatchMode) { + this.inputStream = inputStream; + this.executionEnvironment = inputStream.getEnvironment(); + this.isCheckpointingEnabled = + executionEnvironment.getCheckpointCfg().isCheckpointingEnabled(); + this.transformation = transformation; + this.sink = sink; + this.context = context; + this.isBatchMode = isBatchMode; + } + + private void expand() { + + final int sizeBefore = executionEnvironment.getTransformations().size(); + + AbstractDataStream prewritten = inputStream; + + if (sink instanceof SupportsPreWriteTopology) { + throw new UnsupportedOperationException( + "Sink with pre-write topology is not supported for DataStream v2 atm."); + } else if (sink instanceof SupportsPreCommitTopology) { + throw new UnsupportedOperationException( + "Sink with pre-commit topology is not supported for DataStream v2 atm."); + } else if (sink instanceof SupportsPostCommitTopology) { + throw new UnsupportedOperationException( + "Sink with post-commit topology is not supported for DataStream v2 atm."); + } + + if (sink instanceof SupportsCommitter) { + addCommittingTopology(sink, prewritten); + } else { + adjustTransformations( + prewritten, + input -> + StreamUtils.transformOneInputOperator( + WRITER_NAME, + input, + CommittableMessageTypeInfo.noOutput(), + new SinkWriterOperatorFactory<>(sink)), + sink instanceof SupportsConcurrentExecutionAttempts); + } + + final List> sinkTransformations = + executionEnvironment + .getTransformations() + .subList(sizeBefore, executionEnvironment.getTransformations().size()); + sinkTransformations.forEach(context::transform); + + // Remove all added sink sub-transformations to avoid duplications and allow additional + // expansions + while (executionEnvironment.getTransformations().size() > sizeBefore) { + executionEnvironment + .getTransformations() + .remove(executionEnvironment.getTransformations().size() - 1); + } + } + + private void addCommittingTopology( + Sink sink, AbstractDataStream inputStream) { + SupportsCommitter committingSink = (SupportsCommitter) sink; + TypeInformation> committableTypeInformation = + CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer); + + adjustTransformations( + addWriter(sink, inputStream, committableTypeInformation), + stream -> + StreamUtils.transformOneInputOperator( + COMMITTER_NAME, + stream, + committableTypeInformation, + new CommitterOperatorFactory<>( + committingSink, isBatchMode, isCheckpointingEnabled)), + false); + } + + private AbstractDataStream> addWriter( + Sink sink, + AbstractDataStream inputStream, + TypeInformation> typeInformation) { + AbstractDataStream> written = + adjustTransformations( + inputStream, + input -> + StreamUtils.transformOneInputOperator( + WRITER_NAME, + input, + typeInformation, + new SinkWriterOperatorFactory<>(sink)), + sink instanceof SupportsConcurrentExecutionAttempts); + + return addFailOverRegion(written); + } + + /** Adds a batch exchange that materializes the output first. */ + private AbstractDataStream addFailOverRegion(AbstractDataStream input) { + return new NonKeyedPartitionStreamImpl<>( + input.getEnvironment(), + new PartitionTransformation<>( + input.getTransformation(), + new ForwardPartitioner<>(), + StreamExchangeMode.BATCH)); + } + + private R adjustTransformations( + AbstractDataStream inputStream, + Function, R> action, + boolean supportsConcurrentExecutionAttempts) { + int numTransformsBefore = executionEnvironment.getTransformations().size(); + R result = action.apply(inputStream); + List> transformations = executionEnvironment.getTransformations(); + List> expandedTransformations = + transformations.subList(numTransformsBefore, transformations.size()); + + final CustomSinkOperatorUidHashes operatorsUidHashes = + CustomSinkOperatorUidHashes.DEFAULT; + for (Transformation subTransformation : expandedTransformations) { + // Set the operator uid hashes to support stateful upgrades without prior uids + setOperatorUidHashIfPossible( + subTransformation, WRITER_NAME, operatorsUidHashes.getWriterUidHash()); + setOperatorUidHashIfPossible( + subTransformation, + COMMITTER_NAME, + operatorsUidHashes.getCommitterUidHash()); + setOperatorUidHashIfPossible( + subTransformation, + StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, + operatorsUidHashes.getGlobalCommitterUidHash()); + + concatUid( + subTransformation, + Transformation::getUid, + Transformation::setUid, + subTransformation.getName()); + + concatProperty( + subTransformation, + Transformation::getCoLocationGroupKey, + Transformation::setCoLocationGroupKey); + + concatProperty(subTransformation, Transformation::getName, Transformation::setName); + + concatProperty( + subTransformation, + Transformation::getDescription, + Transformation::setDescription); + + Optional ssg = transformation.getSlotSharingGroup(); + + if (ssg.isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) { + subTransformation.setSlotSharingGroup(ssg.get()); + } + + // Since customized topology is not supported, inherit the parallelism value from + // the sinkTransformation is enough. + subTransformation.setParallelism(transformation.getParallelism()); + + if (subTransformation.getMaxParallelism() < 0 + && transformation.getMaxParallelism() > 0) { + subTransformation.setMaxParallelism(transformation.getMaxParallelism()); + } + + if (subTransformation instanceof PhysicalTransformation) { + PhysicalTransformation physicalSubTransformation = + (PhysicalTransformation) subTransformation; + + if (transformation.getChainingStrategy() != null) { + physicalSubTransformation.setChainingStrategy( + transformation.getChainingStrategy()); + } + + // overrides the supportsConcurrentExecutionAttempts of transformation because + // it's not allowed to specify fine-grained concurrent execution attempts yet + physicalSubTransformation.setSupportsConcurrentExecutionAttempts( + supportsConcurrentExecutionAttempts); + } + } + + return result; + } + + private void setOperatorUidHashIfPossible( + Transformation transformation, + String writerName, + @Nullable String operatorUidHash) { + if (operatorUidHash == null || !transformation.getName().equals(writerName)) { + return; + } + transformation.setUidHash(operatorUidHash); + } + + private void concatUid( + Transformation subTransformation, + Function, String> getter, + BiConsumer, String> setter, + @Nullable String transformationName) { + if (transformationName != null && getter.apply(transformation) != null) { + // Use the same uid pattern than for Sink V1. We deliberately decided to use the uid + // pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator. + if (transformationName.equals(COMMITTER_NAME)) { + final String committerFormat = "Sink Committer: %s"; + setter.accept( + subTransformation, + String.format(committerFormat, getter.apply(transformation))); + return; + } + // Set the writer operator uid to the sinks uid to support state migrations + if (transformationName.equals(WRITER_NAME)) { + setter.accept(subTransformation, getter.apply(transformation)); + return; + } + + // Use the same uid pattern than for Sink V1 in Flink 1.14. + if (transformationName.equals( + StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) { + final String committerFormat = "Sink %s Global Committer"; + setter.accept( + subTransformation, + String.format(committerFormat, getter.apply(transformation))); + return; + } + } + concatProperty(subTransformation, getter, setter); + } + + private void concatProperty( + Transformation subTransformation, + Function, String> getter, + BiConsumer, String> setter) { + if (getter.apply(transformation) != null && getter.apply(subTransformation) != null) { + setter.accept( + subTransformation, + getter.apply(transformation) + ": " + getter.apply(subTransformation)); + } + } + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/GlobalStreamImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/GlobalStreamImplTest.java index c63edd74872a9d..22e7843d94fa49 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/GlobalStreamImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/GlobalStreamImplTest.java @@ -19,12 +19,15 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.dsv2.DataStreamV2SinkUtils; import org.apache.flink.api.dag.Transformation; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; import org.apache.flink.datastream.impl.TestingTransformation; import org.apache.flink.datastream.impl.stream.StreamTestUtils.NoOpOneInputStreamProcessFunction; import org.apache.flink.datastream.impl.stream.StreamTestUtils.NoOpTwoInputBroadcastStreamProcessFunction; import org.apache.flink.datastream.impl.stream.StreamTestUtils.NoOpTwoOutputStreamProcessFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.junit.jupiter.api.Test; @@ -72,4 +75,17 @@ void testPartitioning() throws Exception { assertThat(shuffleTransform).isInstanceOf(PartitionTransformation.class); assertThat(transformations.get(2).getParallelism()).isOne(); } + + @Test + void testToSink() throws Exception { + ExecutionEnvironmentImpl env = StreamTestUtils.getEnv(); + GlobalStreamImpl stream = + new GlobalStreamImpl<>(env, new TestingTransformation<>("t1", Types.INT, 1)); + stream.toSink(DataStreamV2SinkUtils.wrapSink(new DiscardingSink<>())); + List> transformations = env.getTransformations(); + assertThat(transformations) + .hasSize(1) + .element(0) + .isInstanceOf(DataStreamV2SinkTransformation.class); + } } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java index 12a7361af466bc..0822292143988c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.dsv2.DataStreamV2SinkUtils; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.datastream.api.stream.KeyedPartitionStream; @@ -27,6 +28,8 @@ import org.apache.flink.datastream.impl.TestingTransformation; import org.apache.flink.datastream.impl.stream.StreamTestUtils.NoOpOneInputStreamProcessFunction; import org.apache.flink.datastream.impl.stream.StreamTestUtils.NoOpTwoOutputStreamProcessFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; @@ -133,6 +136,19 @@ void testConnectBroadcastStream() throws Exception { assertProcessType(transformations.get(1), TwoInputTransformation.class, Types.LONG); } + @Test + void testToSink() throws Exception { + ExecutionEnvironmentImpl env = StreamTestUtils.getEnv(); + GlobalStreamImpl stream = + new GlobalStreamImpl<>(env, new TestingTransformation<>("t1", Types.INT, 1)); + stream.toSink(DataStreamV2SinkUtils.wrapSink(new DiscardingSink<>())); + List> transformations = env.getTransformations(); + assertThat(transformations) + .hasSize(1) + .element(0) + .isInstanceOf(DataStreamV2SinkTransformation.class); + } + private static KeyedPartitionStream createKeyedStream( ExecutionEnvironmentImpl env) { NonKeyedPartitionStreamImpl stream = diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java index 04a9ae8d3fbe12..c203a72d05a201 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java @@ -19,10 +19,13 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.dsv2.DataStreamV2SinkUtils; import org.apache.flink.api.dag.Transformation; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; import org.apache.flink.datastream.impl.TestingTransformation; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; @@ -115,4 +118,17 @@ void testConnectBroadcastStream() throws Exception { assertThat(transformations).hasSize(1); assertProcessType(transformations.get(0), TwoInputTransformation.class, Types.LONG); } + + @Test + void testToSink() throws Exception { + ExecutionEnvironmentImpl env = StreamTestUtils.getEnv(); + GlobalStreamImpl stream = + new GlobalStreamImpl<>(env, new TestingTransformation<>("t1", Types.INT, 1)); + stream.toSink(DataStreamV2SinkUtils.wrapSink(new DiscardingSink<>())); + List> transformations = env.getTransformations(); + assertThat(transformations) + .hasSize(1) + .element(0) + .isInstanceOf(DataStreamV2SinkTransformation.class); + } }