diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index 8036297e14f17..6eaf7dbac3e53 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -483,15 +483,11 @@ public Transformation getJoinTransformation( TwoInputNonBroadcastStreamProcessFunction processFunction, TypeInformation outTypeInfo) { ListStateDescriptor leftStateDesc = - new ListStateDescriptor<>( - "join-left-state", TypeExtractor.createTypeInfo(getType().getTypeClass())); + new ListStateDescriptor<>("join-left-state", getType()); ListStateDescriptor rightStateDesc = new ListStateDescriptor<>( "join-right-state", - TypeExtractor.createTypeInfo( - ((KeyedPartitionStreamImpl) other) - .getType() - .getTypeClass())); + ((KeyedPartitionStreamImpl) other).getType()); TwoInputNonBroadcastJoinProcessOperator joinProcessOperator = new TwoInputNonBroadcastJoinProcessOperator<>( processFunction, leftStateDesc, rightStateDesc); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java index a072e3e361c77..eccd4cff47c57 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java @@ -441,9 +441,7 @@ public static Transformation transformOneInp KeySelector keySelector, TypeInformation keyType) { WindowAssigner assigner = internalWindowFunction.getAssigner(); - ListStateDescriptor stateDesc = - new ListStateDescriptor<>( - "window-state", TypeExtractor.createTypeInfo(inputType.getTypeClass())); + ListStateDescriptor stateDesc = new ListStateDescriptor<>("window-state", inputType); OneInputWindowProcessOperator windowProcessOperator = new OneInputWindowProcessOperator<>( @@ -474,14 +472,10 @@ Transformation transformTwoInputNonBroadcastWindow( TypeInformation keyType2) { WindowAssigner, W> assigner = internalWindowFunction.getAssigner(); ListStateDescriptor leftStateDesc = - new ListStateDescriptor<>( - "two-input-window-left-state", - TypeExtractor.createTypeInfo(inputType1.getTypeClass())); + new ListStateDescriptor<>("two-input-window-left-state", inputType1); ListStateDescriptor rightStateDesc = - new ListStateDescriptor<>( - "two-input-window-right-state", - TypeExtractor.createTypeInfo(inputType2.getTypeClass())); + new ListStateDescriptor<>("two-input-window-right-state", inputType2); TwoInputNonBroadcastWindowProcessOperator windowProcessOperator = new TwoInputNonBroadcastWindowProcessOperator<>( @@ -519,9 +513,7 @@ Transformation transformTwoOutputWindow( TypeInformation keyType) { WindowAssigner assigner = internalWindowFunction.getAssigner(); ListStateDescriptor stateDesc = - new ListStateDescriptor<>( - "two-output-window-state", - TypeExtractor.createTypeInfo(inputType.getTypeClass())); + new ListStateDescriptor<>("two-output-window-state", inputType); TwoOutputWindowProcessOperator windowProcessOperator = new TwoOutputWindowProcessOperator<>( diff --git a/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java index 1884ff527a64e..6e12ace73b8a8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java +++ b/flink-datastream/src/main/java/org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.java @@ -36,6 +36,8 @@ import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; @@ -140,6 +142,40 @@ public SinkExpander( this.isBatchMode = isBatchMode; } + /** + * Checks if {@link Sink} will add commit topology. + * + * @param sink the sink to check + * @return true if the {@link Sink} do not add a commit topology, false otherwise + */ + private boolean checkSinkDoNotAddCommitTopology(Sink sink) { + SupportsCommitter committingSink = (SupportsCommitter) sink; + SupportsPreCommitTopology preCommittingSink = + (SupportsPreCommitTopology) sink; + TypeInformation> writeResultTypeInformation = + CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer); + Transformation> dummyTransformation = + new Transformation<>("dummy", writeResultTypeInformation, 1) { + @Override + protected List> getTransitivePredecessorsInternal() { + return List.of(); + } + + @Override + public List> getInputs() { + return List.of(); + } + }; + DataStream> committableMessageDataStream = + preCommittingSink.addPreCommitTopology( + new DataStream<>( + new StreamExecutionEnvironment(), dummyTransformation)); + return committableMessageDataStream + .getExecutionEnvironment() + .getTransformations() + .isEmpty(); + } + private void expand() { final int sizeBefore = executionEnvironment.getTransformations().size(); @@ -150,8 +186,17 @@ private void expand() { throw new UnsupportedOperationException( "Sink with pre-write topology is not supported for DataStream v2 atm."); } else if (sink instanceof SupportsPreCommitTopology) { - throw new UnsupportedOperationException( - "Sink with pre-commit topology is not supported for DataStream v2 atm."); + if (sink.getClass() + .getName() + .equals("org.apache.flink.connector.file.sink.FileSink")) { + if (!checkSinkDoNotAddCommitTopology(sink)) { + throw new UnsupportedOperationException( + "Sink with pre-commit topology is not supported for DataStream v2 atm."); + } + } else { + throw new UnsupportedOperationException( + "Sink with pre-commit topology is not supported for DataStream v2 atm."); + } } else if (sink instanceof SupportsPostCommitTopology) { throw new UnsupportedOperationException( "Sink with post-commit topology is not supported for DataStream v2 atm."); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java similarity index 79% rename from flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java index 2c892feb6938b..a5bf9ee94400f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java @@ -16,14 +16,20 @@ * limitations under the License. */ -package org.apache.flink.test.streaming.api.datastream; +package org.apache.flink.test.streaming.api.datastream.extension.join; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; import org.apache.flink.api.connector.dsv2.WrappedSink; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.datastream.api.ExecutionEnvironment; import org.apache.flink.datastream.api.builtin.BuiltinFuncs; import org.apache.flink.datastream.api.common.Collector; @@ -35,6 +41,8 @@ import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.test.util.AbstractTestBase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -42,16 +50,18 @@ import java.io.IOException; import java.io.Serializable; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; import static org.assertj.core.api.Assertions.assertThat; /** Test the Join extension on DataStream V2. */ -class JoinITCase implements Serializable { +class JoinITCase extends AbstractTestBase implements Serializable { private transient ExecutionEnvironment env; private static List sinkResults; @@ -299,6 +309,77 @@ void testJoinWithNonKeyedStream() throws Exception { "key:2:1", "key:2:2"); } + @Test + void testJoinWithTuple() throws Exception { + final String resultPath = getTempDirPath("result"); + + NonKeyedPartitionStream> source1 = + env.fromSource( + DataStreamV2SourceUtils.fromData( + Arrays.asList( + Tuple2.of("key", 0), + Tuple2.of("key", 1), + Tuple2.of("key", 2))), + "source1"); + + NonKeyedPartitionStream> source2 = + env.fromSource( + DataStreamV2SourceUtils.fromData( + Arrays.asList( + Tuple2.of("key", 0), + Tuple2.of("key", 1), + Tuple2.of("key", 2))), + "source2"); + + NonKeyedPartitionStream> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector, String>) elem -> elem.f0, + source2, + (KeySelector, String>) elem -> elem.f0, + new JoinFunction< + Tuple2, + Tuple2, + Tuple3>() { + + @Override + public void processRecord( + Tuple2 leftRecord, + Tuple2 rightRecord, + Collector> output, + RuntimeContext ctx) + throws Exception { + output.collect( + Tuple3.of(leftRecord.f0, leftRecord.f1, rightRecord.f1)); + } + }); + + joinedStream.toSink( + new WrappedSink<>( + FileSink.>forRowFormat( + new Path(resultPath), new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize(MemorySize.ofMebiBytes(1)) + .withRolloverInterval(Duration.ofSeconds(10)) + .build()) + .build())); + + env.execute("testJoinWithTuple"); + + compareResultsByLinesInMemory( + "(key,0,0)\n" + + "(key,0,1)\n" + + "(key,0,2)\n" + + "(key,1,0)\n" + + "(key,1,1)\n" + + "(key,1,2)\n" + + "(key,2,0)\n" + + "(key,2,1)\n" + + "(key,2,2)\n", + resultPath); + } + private static class KeyAndValue { public final String key; public final int value;