Skip to content

Commit b9853ae

Browse files
committed
[FLINK-34548][API] Implement datastreams
1 parent d44805c commit b9853ae

File tree

12 files changed

+1943
-0
lines changed

12 files changed

+1943
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.stream;
20+
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.dag.Transformation;
23+
import org.apache.flink.api.java.functions.KeySelector;
24+
import org.apache.flink.api.java.typeutils.TypeExtractor;
25+
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
26+
import org.apache.flink.process.api.stream.BroadcastStream;
27+
import org.apache.flink.process.api.stream.KeyedPartitionStream;
28+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
29+
import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
30+
import org.apache.flink.process.impl.operators.KeyedTwoInputBroadcastProcessOperator;
31+
import org.apache.flink.process.impl.operators.TwoInputBroadcastProcessOperator;
32+
import org.apache.flink.process.impl.utils.StreamUtils;
33+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
34+
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
35+
36+
/** The implementation of {@link BroadcastStream}. */
37+
public class BroadcastStreamImpl<T> extends DataStream<T> implements BroadcastStream<T> {
38+
public BroadcastStreamImpl(
39+
ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
40+
this(
41+
environment,
42+
new PartitionTransformation<>(transformation, new BroadcastPartitioner<>()));
43+
}
44+
45+
private BroadcastStreamImpl(
46+
ExecutionEnvironmentImpl environment, PartitionTransformation<T> transformation) {
47+
super(environment, transformation);
48+
}
49+
50+
@Override
51+
public <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
52+
KeyedPartitionStream<K, T_OTHER> other,
53+
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction) {
54+
TypeInformation<OUT> outTypeInfo =
55+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
56+
processFunction,
57+
((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType(),
58+
getType());
59+
KeyedTwoInputBroadcastProcessOperator<K, T_OTHER, T, OUT> processOperator =
60+
new KeyedTwoInputBroadcastProcessOperator<>(processFunction);
61+
Transformation<OUT> outTransformation =
62+
StreamUtils.getTwoInputTransformation(
63+
"Broadcast-Keyed-TwoInput-Process",
64+
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
65+
// we should always take the broadcast input as second input.
66+
this,
67+
outTypeInfo,
68+
processOperator);
69+
environment.addOperator(outTransformation);
70+
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
71+
}
72+
73+
@Override
74+
public <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
75+
NonKeyedPartitionStream<T_OTHER> other,
76+
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction) {
77+
TypeInformation<OUT> outTypeInfo =
78+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
79+
processFunction,
80+
((NonKeyedPartitionStreamImpl<T_OTHER>) other).getType(),
81+
getType());
82+
TwoInputBroadcastProcessOperator<T_OTHER, T, OUT> processOperator =
83+
new TwoInputBroadcastProcessOperator<>(processFunction);
84+
Transformation<OUT> outTransformation =
85+
StreamUtils.getTwoInputTransformation(
86+
"Broadcast-TwoInput-Process",
87+
(NonKeyedPartitionStreamImpl<T_OTHER>) other,
88+
// we should always take the broadcast input as second input.
89+
this,
90+
outTypeInfo,
91+
processOperator);
92+
environment.addOperator(outTransformation);
93+
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
94+
}
95+
96+
@Override
97+
public <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
98+
KeyedPartitionStream<K, T_OTHER> other,
99+
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction,
100+
KeySelector<OUT, K> newKeySelector) {
101+
TypeInformation<OUT> outTypeInfo =
102+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
103+
processFunction,
104+
((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType(),
105+
getType());
106+
KeyedTwoInputBroadcastProcessOperator<K, T_OTHER, T, OUT> processOperator =
107+
new KeyedTwoInputBroadcastProcessOperator<>(processFunction);
108+
Transformation<OUT> outTransformation =
109+
StreamUtils.getTwoInputTransformation(
110+
"Broadcast-Keyed-TwoInput-Process",
111+
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
112+
// we should always take the broadcast input as second input.
113+
this,
114+
outTypeInfo,
115+
processOperator);
116+
117+
NonKeyedPartitionStreamImpl<OUT> outputStream =
118+
new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
119+
environment.addOperator(outTransformation);
120+
// Construct a keyed stream directly without partitionTransformation to avoid shuffle.
121+
return new KeyedPartitionStreamImpl<>(
122+
outputStream,
123+
outTransformation,
124+
newKeySelector,
125+
TypeExtractor.getKeySelectorTypes(newKeySelector, outputStream.getType()));
126+
}
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.stream;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.api.dag.Transformation;
24+
import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
25+
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
26+
import org.apache.flink.util.OutputTag;
27+
import org.apache.flink.util.Preconditions;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
/**
33+
* Base class for all streams.
34+
*
35+
* <p>Note: This is only used for internal implementation. It must not leak to user face api.
36+
*/
37+
@Internal
38+
public abstract class DataStream<T> {
39+
protected final ExecutionEnvironmentImpl environment;
40+
41+
protected final Transformation<T> transformation;
42+
43+
/**
44+
* We keep track of the side outputs that were already requested and their types. With this, we
45+
* can catch the case when a side output with a matching id is requested for a different type
46+
* because this would lead to problems at runtime.
47+
*/
48+
protected final Map<OutputTag<?>, TypeInformation<?>> requestedSideOutputs = new HashMap<>();
49+
50+
public DataStream(ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
51+
this.environment =
52+
Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
53+
this.transformation =
54+
Preconditions.checkNotNull(
55+
transformation, "Stream Transformation must not be null.");
56+
}
57+
58+
public TypeInformation<T> getType() {
59+
return transformation.getOutputType();
60+
}
61+
62+
/** This is only used for internal implementation. It must not leak to user face api. */
63+
public Transformation<T> getTransformation() {
64+
return transformation;
65+
}
66+
67+
public ExecutionEnvironmentImpl getEnvironment() {
68+
return environment;
69+
}
70+
71+
public <X> Transformation<X> getSideOutputTransform(OutputTag<X> outputTag) {
72+
TypeInformation<?> type = requestedSideOutputs.get(outputTag);
73+
if (type != null && !type.equals(outputTag.getTypeInfo())) {
74+
throw new UnsupportedOperationException(
75+
"A side output with a matching id was "
76+
+ "already requested with a different type. This is not allowed, side output "
77+
+ "ids need to be unique.");
78+
}
79+
requestedSideOutputs.put(outputTag, outputTag.getTypeInfo());
80+
81+
return new SideOutputTransformation<>(getTransformation(), outputTag);
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.stream;
20+
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.dag.Transformation;
23+
import org.apache.flink.api.java.functions.KeySelector;
24+
import org.apache.flink.api.java.tuple.Tuple2;
25+
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
26+
import org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction;
27+
import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction;
28+
import org.apache.flink.process.api.stream.BroadcastStream;
29+
import org.apache.flink.process.api.stream.GlobalStream;
30+
import org.apache.flink.process.api.stream.KeyedPartitionStream;
31+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
32+
import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
33+
import org.apache.flink.process.impl.operators.ProcessOperator;
34+
import org.apache.flink.process.impl.operators.TwoInputNonBroadcastProcessOperator;
35+
import org.apache.flink.process.impl.operators.TwoOutputProcessOperator;
36+
import org.apache.flink.process.impl.utils.StreamUtils;
37+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
38+
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
39+
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
40+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
41+
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
42+
import org.apache.flink.util.OutputTag;
43+
44+
/** The implementation of {@link GlobalStream}. */
45+
public class GlobalStreamImpl<T> extends DataStream<T> implements GlobalStream<T> {
46+
public GlobalStreamImpl(
47+
ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
48+
super(environment, transformation);
49+
}
50+
51+
@Override
52+
public <OUT> GlobalStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction) {
53+
TypeInformation<OUT> outType =
54+
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
55+
ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction);
56+
return transform("Global Process", outType, operator);
57+
}
58+
59+
@Override
60+
public <OUT1, OUT2> TwoGlobalStreams<OUT1, OUT2> process(
61+
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
62+
Tuple2<TypeInformation<OUT1>, TypeInformation<OUT2>> twoOutputType =
63+
StreamUtils.getOutputTypesForTwoOutputProcessFunction(processFunction, getType());
64+
TypeInformation<OUT1> firstOutputType = twoOutputType.f0;
65+
TypeInformation<OUT2> secondTOutputType = twoOutputType.f1;
66+
OutputTag<OUT2> secondOutputTag = new OutputTag<OUT2>("Second-Output", secondTOutputType);
67+
68+
TwoOutputProcessOperator<T, OUT1, OUT2> operator =
69+
new TwoOutputProcessOperator<>(processFunction, secondOutputTag);
70+
GlobalStreamImpl<OUT1> firstStream =
71+
transform("Two-Output-Operator", firstOutputType, operator);
72+
GlobalStreamImpl<OUT2> secondStream =
73+
new GlobalStreamImpl<>(
74+
environment, firstStream.getSideOutputTransform(secondOutputTag));
75+
return TwoGlobalStreamsImpl.of(firstStream, secondStream);
76+
}
77+
78+
@Override
79+
public <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
80+
GlobalStream<T_OTHER> other,
81+
TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction) {
82+
TypeInformation<OUT> outTypeInfo =
83+
StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(
84+
processFunction, getType(), ((GlobalStreamImpl<T_OTHER>) other).getType());
85+
TwoInputNonBroadcastProcessOperator<T, T_OTHER, OUT> processOperator =
86+
new TwoInputNonBroadcastProcessOperator<>(processFunction);
87+
Transformation<OUT> outTransformation =
88+
StreamUtils.getTwoInputTransformation(
89+
"Global-Global-TwoInput-Process",
90+
this,
91+
(GlobalStreamImpl<T_OTHER>) other,
92+
outTypeInfo,
93+
processOperator);
94+
// Operator parallelism should always be 1 for global stream.
95+
// parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
96+
outTransformation.setParallelism(1, true);
97+
environment.addOperator(outTransformation);
98+
return new GlobalStreamImpl<>(environment, outTransformation);
99+
}
100+
101+
// TODO add toSink method.
102+
103+
// ---------------------
104+
// Partitioning
105+
// ---------------------
106+
107+
@Override
108+
public <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector) {
109+
return new KeyedPartitionStreamImpl<>(this, keySelector);
110+
}
111+
112+
@Override
113+
public NonKeyedPartitionStream<T> shuffle() {
114+
return new NonKeyedPartitionStreamImpl<>(
115+
environment,
116+
new PartitionTransformation<>(getTransformation(), new ShufflePartitioner<>()));
117+
}
118+
119+
@Override
120+
public BroadcastStream<T> broadcast() {
121+
return new BroadcastStreamImpl<>(environment, getTransformation());
122+
}
123+
124+
private <R> GlobalStreamImpl<R> transform(
125+
String operatorName,
126+
TypeInformation<R> outputTypeInfo,
127+
OneInputStreamOperator<T, R> operator) {
128+
// read the output type of the input Transform to coax out errors about MissingTypeInfo
129+
transformation.getOutputType();
130+
131+
OneInputTransformation<T, R> resultTransform =
132+
new OneInputTransformation<>(
133+
this.transformation,
134+
operatorName,
135+
SimpleUdfStreamOperatorFactory.of(operator),
136+
outputTypeInfo,
137+
// Operator parallelism should always be 1 for global stream.
138+
1,
139+
// parallelismConfigured should be true to avoid overwritten by
140+
// AdaptiveBatchScheduler.
141+
true);
142+
143+
GlobalStreamImpl<R> returnStream = new GlobalStreamImpl<>(environment, resultTransform);
144+
145+
environment.addOperator(resultTransform);
146+
147+
return returnStream;
148+
}
149+
150+
private static class TwoGlobalStreamsImpl<OUT1, OUT2> implements TwoGlobalStreams<OUT1, OUT2> {
151+
152+
private final GlobalStream<OUT1> firstStream;
153+
154+
private final GlobalStream<OUT2> secondStream;
155+
156+
public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(
157+
GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
158+
return new TwoGlobalStreamsImpl<>(firstStream, secondStream);
159+
}
160+
161+
private TwoGlobalStreamsImpl(
162+
GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
163+
this.firstStream = firstStream;
164+
this.secondStream = secondStream;
165+
}
166+
167+
@Override
168+
public GlobalStream<OUT1> getFirst() {
169+
return firstStream;
170+
}
171+
172+
@Override
173+
public GlobalStream<OUT2> getSecond() {
174+
return secondStream;
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)