Skip to content

Commit

Permalink
[FLINK-37264][datastream] Fix bugs in JoinExtension for ds v2
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup authored and reswqa committed Feb 7, 2025
1 parent bfea5b5 commit b8fce8e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,11 @@ public <T_OTHER, OUT> Transformation<OUT> getJoinTransformation(
TwoInputNonBroadcastStreamProcessFunction<V, T_OTHER, OUT> processFunction,
TypeInformation<OUT> outTypeInfo) {
ListStateDescriptor<V> leftStateDesc =
new ListStateDescriptor<>(
"join-left-state", TypeExtractor.createTypeInfo(getType().getTypeClass()));
new ListStateDescriptor<>("join-left-state", getType());
ListStateDescriptor<T_OTHER> rightStateDesc =
new ListStateDescriptor<>(
"join-right-state",
TypeExtractor.createTypeInfo(
((KeyedPartitionStreamImpl<Object, T_OTHER>) other)
.getType()
.getTypeClass()));
((KeyedPartitionStreamImpl<Object, T_OTHER>) other).getType());
TwoInputNonBroadcastJoinProcessOperator<K, V, T_OTHER, OUT> joinProcessOperator =
new TwoInputNonBroadcastJoinProcessOperator<>(
processFunction, leftStateDesc, rightStateDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,7 @@ public static <K, IN, OUT, W extends Window> Transformation<OUT> transformOneInp
KeySelector<IN, K> keySelector,
TypeInformation<K> keyType) {
WindowAssigner<IN, W> assigner = internalWindowFunction.getAssigner();
ListStateDescriptor<IN> stateDesc =
new ListStateDescriptor<>(
"window-state", TypeExtractor.createTypeInfo(inputType.getTypeClass()));
ListStateDescriptor<IN> stateDesc = new ListStateDescriptor<>("window-state", inputType);

OneInputWindowProcessOperator<K, IN, OUT, W> windowProcessOperator =
new OneInputWindowProcessOperator<>(
Expand Down Expand Up @@ -474,14 +472,10 @@ Transformation<OUT> transformTwoInputNonBroadcastWindow(
TypeInformation<K> keyType2) {
WindowAssigner<TaggedUnion<IN1, IN2>, W> assigner = internalWindowFunction.getAssigner();
ListStateDescriptor<IN1> leftStateDesc =
new ListStateDescriptor<>(
"two-input-window-left-state",
TypeExtractor.createTypeInfo(inputType1.getTypeClass()));
new ListStateDescriptor<>("two-input-window-left-state", inputType1);

ListStateDescriptor<IN2> rightStateDesc =
new ListStateDescriptor<>(
"two-input-window-right-state",
TypeExtractor.createTypeInfo(inputType2.getTypeClass()));
new ListStateDescriptor<>("two-input-window-right-state", inputType2);

TwoInputNonBroadcastWindowProcessOperator<K, IN1, IN2, OUT, W> windowProcessOperator =
new TwoInputNonBroadcastWindowProcessOperator<>(
Expand Down Expand Up @@ -519,9 +513,7 @@ Transformation<OUT1> transformTwoOutputWindow(
TypeInformation<K> keyType) {
WindowAssigner<IN, W> assigner = internalWindowFunction.getAssigner();
ListStateDescriptor<IN> stateDesc =
new ListStateDescriptor<>(
"two-output-window-state",
TypeExtractor.createTypeInfo(inputType.getTypeClass()));
new ListStateDescriptor<>("two-output-window-state", inputType);

TwoOutputWindowProcessOperator<K, IN, OUT1, OUT2, W> windowProcessOperator =
new TwoOutputWindowProcessOperator<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
Expand Down Expand Up @@ -140,6 +142,40 @@ public SinkExpander(
this.isBatchMode = isBatchMode;
}

/**
* Checks if {@link Sink} will add commit topology.
*
* @param sink the sink to check
* @return true if the {@link Sink} do not add a commit topology, false otherwise
*/
private <CommT, WriteResultT> boolean checkSinkDoNotAddCommitTopology(Sink<T> sink) {
SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>) sink;
SupportsPreCommitTopology<WriteResultT, CommT> preCommittingSink =
(SupportsPreCommitTopology<WriteResultT, CommT>) sink;
TypeInformation<CommittableMessage<WriteResultT>> writeResultTypeInformation =
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
Transformation<CommittableMessage<WriteResultT>> dummyTransformation =
new Transformation<>("dummy", writeResultTypeInformation, 1) {
@Override
protected List<Transformation<?>> getTransitivePredecessorsInternal() {
return List.of();
}

@Override
public List<Transformation<?>> getInputs() {
return List.of();
}
};
DataStream<CommittableMessage<CommT>> committableMessageDataStream =
preCommittingSink.addPreCommitTopology(
new DataStream<>(
new StreamExecutionEnvironment(), dummyTransformation));
return committableMessageDataStream
.getExecutionEnvironment()
.getTransformations()
.isEmpty();
}

private void expand() {

final int sizeBefore = executionEnvironment.getTransformations().size();
Expand All @@ -150,8 +186,17 @@ private void expand() {
throw new UnsupportedOperationException(
"Sink with pre-write topology is not supported for DataStream v2 atm.");
} else if (sink instanceof SupportsPreCommitTopology) {
throw new UnsupportedOperationException(
"Sink with pre-commit topology is not supported for DataStream v2 atm.");
if (sink.getClass()
.getName()
.equals("org.apache.flink.connector.file.sink.FileSink")) {
if (!checkSinkDoNotAddCommitTopology(sink)) {
throw new UnsupportedOperationException(
"Sink with pre-commit topology is not supported for DataStream v2 atm.");
}
} else {
throw new UnsupportedOperationException(
"Sink with pre-commit topology is not supported for DataStream v2 atm.");
}
} else if (sink instanceof SupportsPostCommitTopology) {
throw new UnsupportedOperationException(
"Sink with post-commit topology is not supported for DataStream v2 atm.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
* limitations under the License.
*/

package org.apache.flink.test.streaming.api.datastream;
package org.apache.flink.test.streaming.api.datastream.extension.join;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
import org.apache.flink.datastream.api.common.Collector;
Expand All @@ -35,23 +41,27 @@
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
import static org.assertj.core.api.Assertions.assertThat;

/** Test the Join extension on DataStream V2. */
class JoinITCase implements Serializable {
class JoinITCase extends AbstractTestBase implements Serializable {
private transient ExecutionEnvironment env;
private static List<String> sinkResults;

Expand Down Expand Up @@ -299,6 +309,77 @@ void testJoinWithNonKeyedStream() throws Exception {
"key:2:1", "key:2:2");
}

@Test
void testJoinWithTuple() throws Exception {
final String resultPath = getTempDirPath("result");

NonKeyedPartitionStream<Tuple2<String, Integer>> source1 =
env.fromSource(
DataStreamV2SourceUtils.fromData(
Arrays.asList(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2))),
"source1");

NonKeyedPartitionStream<Tuple2<String, Integer>> source2 =
env.fromSource(
DataStreamV2SourceUtils.fromData(
Arrays.asList(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2))),
"source2");

NonKeyedPartitionStream<Tuple3<String, Integer, Integer>> joinedStream =
BuiltinFuncs.join(
source1,
(KeySelector<Tuple2<String, Integer>, String>) elem -> elem.f0,
source2,
(KeySelector<Tuple2<String, Integer>, String>) elem -> elem.f0,
new JoinFunction<
Tuple2<String, Integer>,
Tuple2<String, Integer>,
Tuple3<String, Integer, Integer>>() {

@Override
public void processRecord(
Tuple2<String, Integer> leftRecord,
Tuple2<String, Integer> rightRecord,
Collector<Tuple3<String, Integer, Integer>> output,
RuntimeContext ctx)
throws Exception {
output.collect(
Tuple3.of(leftRecord.f0, leftRecord.f1, rightRecord.f1));
}
});

joinedStream.toSink(
new WrappedSink<>(
FileSink.<Tuple3<String, Integer, Integer>>forRowFormat(
new Path(resultPath), new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.withRolloverInterval(Duration.ofSeconds(10))
.build())
.build()));

env.execute("testJoinWithTuple");

compareResultsByLinesInMemory(
"(key,0,0)\n"
+ "(key,0,1)\n"
+ "(key,0,2)\n"
+ "(key,1,0)\n"
+ "(key,1,1)\n"
+ "(key,1,2)\n"
+ "(key,2,0)\n"
+ "(key,2,1)\n"
+ "(key,2,2)\n",
resultPath);
}

private static class KeyAndValue {
public final String key;
public final int value;
Expand Down

0 comments on commit b8fce8e

Please sign in to comment.