Skip to content

Commit 27c8899

Browse files
committed
[Api] Implement datastreams
1 parent 608ef40 commit 27c8899

File tree

6 files changed

+1211
-0
lines changed

6 files changed

+1211
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.stream;
20+
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.dag.Transformation;
23+
import org.apache.flink.api.java.functions.KeySelector;
24+
import org.apache.flink.api.java.typeutils.TypeExtractor;
25+
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
26+
import org.apache.flink.process.api.stream.BroadcastStream;
27+
import org.apache.flink.process.api.stream.KeyedPartitionStream;
28+
import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
29+
import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
30+
import org.apache.flink.process.impl.operators.KeyedTwoInputBroadcastProcessOperator;
31+
import org.apache.flink.process.impl.operators.TwoInputBroadcastProcessOperator;
32+
import org.apache.flink.process.impl.utils.StreamUtils;
33+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
34+
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
35+
36+
/** The implementation of {@link BroadcastStream}. */
37+
public class BroadcastStreamImpl<T> extends DataStream<T> implements BroadcastStream<T> {
38+
public BroadcastStreamImpl(
39+
ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
40+
this(
41+
environment,
42+
new PartitionTransformation<>(transformation, new BroadcastPartitioner<>()));
43+
}
44+
45+
private BroadcastStreamImpl(
46+
ExecutionEnvironmentImpl environment, PartitionTransformation<T> transformation) {
47+
super(environment, transformation);
48+
}
49+
50+
@Override
51+
public <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
52+
KeyedPartitionStream<K, T_OTHER> other,
53+
TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction) {
54+
TypeInformation<OUT> outTypeInfo =
55+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
56+
processFunction,
57+
getType(),
58+
((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType());
59+
KeyedTwoInputBroadcastProcessOperator<K, T, T_OTHER, OUT> processOperator =
60+
new KeyedTwoInputBroadcastProcessOperator<>(processFunction);
61+
Transformation<OUT> outTransformation =
62+
StreamUtils.getTwoInputTransformation(
63+
"Broadcast-Keyed-TwoInput-Process",
64+
this,
65+
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
66+
outTypeInfo,
67+
processOperator);
68+
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
69+
}
70+
71+
@Override
72+
public <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
73+
NonKeyedPartitionStream<T_OTHER> other,
74+
TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction) {
75+
TypeInformation<OUT> outTypeInfo =
76+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
77+
processFunction,
78+
getType(),
79+
((NonKeyedPartitionStreamImpl<T_OTHER>) other).getType());
80+
TwoInputBroadcastProcessOperator<T, T_OTHER, OUT> processOperator =
81+
new TwoInputBroadcastProcessOperator<>(processFunction);
82+
Transformation<OUT> outTransformation =
83+
StreamUtils.getTwoInputTransformation(
84+
"Broadcast-TwoInput-Process",
85+
this,
86+
(NonKeyedPartitionStreamImpl<T_OTHER>) other,
87+
outTypeInfo,
88+
processOperator);
89+
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
90+
}
91+
92+
@Override
93+
public <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
94+
KeyedPartitionStream<K, T_OTHER> other,
95+
TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
96+
KeySelector<OUT, K> newKeySelector) {
97+
TypeInformation<OUT> outTypeInfo =
98+
StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
99+
processFunction,
100+
getType(),
101+
((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType());
102+
KeyedTwoInputBroadcastProcessOperator<K, T, T_OTHER, OUT> processOperator =
103+
new KeyedTwoInputBroadcastProcessOperator<>(processFunction);
104+
Transformation<OUT> outTransformation =
105+
StreamUtils.getTwoInputTransformation(
106+
"Broadcast-Keyed-TwoInput-Process",
107+
this,
108+
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
109+
outTypeInfo,
110+
processOperator);
111+
112+
NonKeyedPartitionStreamImpl<OUT> outputStream =
113+
new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
114+
// Construct a keyed stream directly without partitionTransformation to avoid shuffle.
115+
return new KeyedPartitionStreamImpl<>(
116+
outputStream,
117+
outTransformation,
118+
newKeySelector,
119+
TypeExtractor.getKeySelectorTypes(newKeySelector, outputStream.getType()));
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.stream;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.api.dag.Transformation;
24+
import org.apache.flink.process.impl.ExecutionEnvironmentImpl;
25+
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
26+
import org.apache.flink.util.OutputTag;
27+
import org.apache.flink.util.Preconditions;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
/**
33+
* Base class for all streams.
34+
*
35+
* <p>Note: This is only used for internal implementation. It must not leak to user face api.
36+
*/
37+
@Internal
38+
public abstract class DataStream<T> {
39+
protected final ExecutionEnvironmentImpl environment;
40+
41+
protected final Transformation<T> transformation;
42+
43+
/**
44+
* We keep track of the side outputs that were already requested and their types. With this, we
45+
* can catch the case when a side output with a matching id is requested for a different type
46+
* because this would lead to problems at runtime.
47+
*/
48+
protected final Map<OutputTag<?>, TypeInformation<?>> requestedSideOutputs = new HashMap<>();
49+
50+
public DataStream(ExecutionEnvironmentImpl environment, Transformation<T> transformation) {
51+
this.environment =
52+
Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
53+
this.transformation =
54+
Preconditions.checkNotNull(
55+
transformation, "Stream Transformation must not be null.");
56+
}
57+
58+
public TypeInformation<T> getType() {
59+
return transformation.getOutputType();
60+
}
61+
62+
/** This is only used for internal implementation. It must not leak to user face api. */
63+
public Transformation<T> getTransformation() {
64+
return transformation;
65+
}
66+
67+
public ExecutionEnvironmentImpl getEnvironment() {
68+
return environment;
69+
}
70+
71+
public <X> Transformation<X> getSideOutputTransform(OutputTag<X> outputTag) {
72+
TypeInformation<?> type = requestedSideOutputs.get(outputTag);
73+
if (type != null && !type.equals(outputTag.getTypeInfo())) {
74+
throw new UnsupportedOperationException(
75+
"A side output with a matching id was "
76+
+ "already requested with a different type. This is not allowed, side output "
77+
+ "ids need to be unique.");
78+
}
79+
requestedSideOutputs.put(outputTag, outputTag.getTypeInfo());
80+
81+
return new SideOutputTransformation<>(getTransformation(), outputTag);
82+
}
83+
}

0 commit comments

Comments
 (0)