diff --git a/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java b/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java
new file mode 100644
index 00000000000000..3ab7403c0c5dd5
--- /dev/null
+++ b/flink-core-api/src/main/java/org/apache/flink/api/connector/dsv2/Sink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.dsv2;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Sink interface for DataStream api v2.
+ *
+ *
Note that this interface is just a placeholder because we haven't decided whether to use
+ * sink-v2 based sink or design a new sink connector API for DataStream V2.
+ */
+@Experimental
+public interface Sink {}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java
new file mode 100644
index 00000000000000..4d35ac45380a1c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/DataStreamV2SinkUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.dsv2;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Utils to create the DataStream V2 supported {@link Sink}. */
+@Experimental
+public class DataStreamV2SinkUtils {
+ /**
+ * Wrap a sink-v2 based sink to a DataStream V2 supported sink.
+ *
+ * @param sink The sink-v2 based sink to wrap.
+ * @return The DataStream V2 supported sink.
+ */
+ public static Sink wrapSink(org.apache.flink.api.connector.sink2.Sink sink) {
+ return new WrappedSink<>(sink);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java
new file mode 100644
index 00000000000000..97c2d2eacc76e4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/dsv2/WrappedSink.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.dsv2;
+
+import org.apache.flink.annotation.Internal;
+
+/** A simple {@link Sink} implementation that wrap a sink-v2 based sink. */
+@Internal
+public class WrappedSink implements Sink {
+ org.apache.flink.api.connector.sink2.Sink wrappedSink;
+
+ public WrappedSink(org.apache.flink.api.connector.sink2.Sink wrappedSink) {
+ this.wrappedSink = wrappedSink;
+ }
+
+ public org.apache.flink.api.connector.sink2.Sink getWrappedSink() {
+ return wrappedSink;
+ }
+}
diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java
index cb8e0b83dd7c49..e7c975dfe26dc3 100644
--- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java
+++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
@@ -78,7 +79,7 @@ GlobalStream connectAndProcess(
*/
BroadcastStream broadcast();
- // TODO add toSink method.
+ void toSink(Sink sink);
/**
* This class represents a combination of two {@link GlobalStream}. It will be used as the
diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
index 40e15f08646e86..1c4ffd8a2a9889 100644
--- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
+++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -194,7 +195,7 @@ KeyedPartitionStream connectAndProcess(
*/
BroadcastStream broadcast();
- // TODO add toSink method.
+ void toSink(Sink sink);
/**
* This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
index dd4a1ee2855d60..be21d312400b2c 100644
--- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
+++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -100,7 +101,7 @@ NonKeyedPartitionStream connectAndProcess(
*/
BroadcastStream broadcast();
- // TODO add toSink method.
+ void toSink(Sink sink);
/**
* This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
index a2dc28fe590901..4a3f2bdad0b63f 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
@@ -48,6 +48,7 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -87,6 +88,18 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
*/
private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;
+ static {
+ try {
+ // All transformation translator must be put to a map in StreamGraphGenerator, but
+ // streaming-java is not depend on process-function module, using reflect to handle
+ // this.
+ DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Can not register process function transformation translator.");
+ }
+ }
+
/**
* Create and return an instance of {@link ExecutionEnvironment}.
*
@@ -202,6 +215,10 @@ public void setParallelism(int parallelism) {
executionConfig.setParallelism(parallelism);
}
+ public CheckpointConfig getCheckpointCfg() {
+ return checkpointCfg;
+ }
+
// -----------------------------------------------
// Internal Methods
// -----------------------------------------------
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
index 187918a584dafa..3d977c54468e08 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,6 +37,7 @@
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
@@ -98,7 +100,14 @@ public GlobalStream connectAndProcess(
return new GlobalStreamImpl<>(environment, outTransformation);
}
- // TODO add toSink method.
+ @Override
+ public void toSink(Sink sink) {
+ DataStreamV2SinkTransformation sinkTransformation =
+ StreamUtils.addSinkOperator(this, sink, getType());
+ // Operator parallelism should always be 1 for global stream.
+ // parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
+ sinkTransformation.setParallelism(1, true);
+ }
// ---------------------
// Partitioning
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
index 2b9c3b378a5adf..5ec7c125d9fbc3 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -325,7 +326,10 @@ public KeySelector getKeySelector() {
return keySelector;
}
- // TODO add toSink method.
+ @Override
+ public void toSink(Sink sink) {
+ StreamUtils.addSinkOperator(this, sink, getType());
+ }
// ---------------------
// Partitioning
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
index 9cc9a52dd5f8c6..d987a868020d52 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -132,7 +133,10 @@ public NonKeyedPartitionStream connectAndProcess(
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
}
- // TODO add toSink method.
+ @Override
+ public void toSink(Sink sink) {
+ StreamUtils.addSinkOperator(this, sink, getType());
+ }
// ---------------------
// Partitioning
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
index 0471c0f9a0a694..efe8927dba2a43 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
@@ -19,6 +19,8 @@
package org.apache.flink.datastream.impl.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.dsv2.Sink;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,10 +31,13 @@
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl;
+import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
@@ -228,4 +233,56 @@ public static TwoInputTransformation getTwoInputT
return transform;
}
+
+ /** Construct and return a new DataStream with one input operator. */
+ public static AbstractDataStream transformOneInputOperator(
+ String operatorName,
+ AbstractDataStream inputStream,
+ TypeInformation outTypeInfo,
+ StreamOperatorFactory operatorFactory) {
+ // read the output type of the input Transform to coax out errors about MissingTypeInfo
+ inputStream.getTransformation().getOutputType();
+
+ OneInputTransformation resultTransform =
+ new OneInputTransformation<>(
+ inputStream.getTransformation(),
+ operatorName,
+ operatorFactory,
+ outTypeInfo,
+ inputStream.getEnvironment().getParallelism(),
+ false);
+
+ NonKeyedPartitionStreamImpl returnStream =
+ new NonKeyedPartitionStreamImpl<>(inputStream.getEnvironment(), resultTransform);
+
+ inputStream.getEnvironment().addOperator(resultTransform);
+
+ return returnStream;
+ }
+
+ /** Add sink operator to the input stream. */
+ public static DataStreamV2SinkTransformation addSinkOperator(
+ AbstractDataStream inputStream, Sink sink, TypeInformation typeInformation) {
+ // read the output type of the input Transform to coax out errors about MissingTypeInfo
+ inputStream.getTransformation().getOutputType();
+
+ if (!(sink instanceof WrappedSink)) {
+ throw new UnsupportedOperationException(
+ "Unsupported type of sink, please use DataStreamV2SinkUtils to wrap a sink-v2 sink first.");
+ }
+
+ org.apache.flink.api.connector.sink2.Sink innerSink =
+ ((WrappedSink) sink).getWrappedSink();
+
+ DataStreamV2SinkTransformation sinkTransformation =
+ new DataStreamV2SinkTransformation<>(
+ inputStream,
+ innerSink,
+ typeInformation,
+ "Sink",
+ inputStream.getEnvironment().getParallelism(),
+ false);
+ inputStream.getEnvironment().addOperator(sinkTransformation);
+ return sinkTransformation;
+ }
}
diff --git a/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java b/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java
new file mode 100644
index 00000000000000..ae702d9e2591e4
--- /dev/null
+++ b/flink-datastream/src/main/java/org/apache/flink/streaming/api/transformations/DataStreamV2SinkTransformation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.datastream.impl.stream.AbstractDataStream;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import org.apache.commons.compress.utils.Lists;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link Transformation} for DataStream V2 based sink.
+ *
+ * @param The input type of the {@link SinkWriter}
+ * @param The output type of the {@link Sink}
+ */
+@Internal
+public class DataStreamV2SinkTransformation
+ extends PhysicalTransformation {
+ private final AbstractDataStream inputStream;
+ private final Sink sink;
+ private final Transformation input;
+
+ private ChainingStrategy chainingStrategy;
+
+ public DataStreamV2SinkTransformation(
+ AbstractDataStream inputStream,
+ Sink sink,
+ TypeInformation outputType,
+ String name,
+ int parallelism,
+ boolean parallelismConfigured) {
+ super(name, outputType, parallelism, parallelismConfigured);
+ this.inputStream = checkNotNull(inputStream);
+ this.sink = checkNotNull(sink);
+ this.input = inputStream.getTransformation();
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ chainingStrategy = checkNotNull(strategy);
+ }
+
+ @Override
+ public List> getTransitivePredecessors() {
+ final List> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public List> getInputs() {
+ return Collections.singletonList(input);
+ }
+
+ @Nullable
+ public ChainingStrategy getChainingStrategy() {
+ return chainingStrategy;
+ }
+
+ public AbstractDataStream getInputStream() {
+ return inputStream;
+ }
+
+ public Sink getSink() {
+ return sink;
+ }
+}
diff --git a/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java
new file mode 100644
index 00000000000000..1884ff527a64e6
--- /dev/null
+++ b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.stream.AbstractDataStream;
+import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
+import org.apache.flink.datastream.impl.utils.StreamUtils;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** The {@link TransformationTranslator} for the {@link DataStreamV2SinkTransformation}. */
+@Internal
+public class DataStreamV2SinkTransformationTranslator
+ implements TransformationTranslator