Skip to content

Commit e59422d

Browse files
committed
[FLINK-34549][API] Implement applyToAllPartitions for non-partitioned context
1 parent a7f3780 commit e59422d

22 files changed

+778
-58
lines changed

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java

+41-3
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,61 @@
1818

1919
package org.apache.flink.datastream.impl.context;
2020

21+
import org.apache.flink.datastream.api.common.Collector;
2122
import org.apache.flink.datastream.api.context.JobInfo;
2223
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2324
import org.apache.flink.datastream.api.context.TaskInfo;
2425
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
2526
import org.apache.flink.metrics.MetricGroup;
2627

28+
import java.util.Set;
29+
2730
/** The default implementation of {@link NonPartitionedContext}. */
2831
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
2932
private final DefaultRuntimeContext context;
3033

31-
public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
34+
private final DefaultPartitionedContext partitionedContext;
35+
36+
private final Collector<OUT> collector;
37+
38+
private final boolean isKeyed;
39+
40+
private final Set<Object> keySet;
41+
42+
public DefaultNonPartitionedContext(
43+
DefaultRuntimeContext context,
44+
DefaultPartitionedContext partitionedContext,
45+
Collector<OUT> collector,
46+
boolean isKeyed,
47+
Set<Object> keySet) {
3248
this.context = context;
49+
this.partitionedContext = partitionedContext;
50+
this.collector = collector;
51+
this.isKeyed = isKeyed;
52+
this.keySet = keySet;
3353
}
3454

3555
@Override
36-
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) {
37-
// TODO implements this method.
56+
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction)
57+
throws Exception {
58+
if (isKeyed) {
59+
for (Object key : keySet) {
60+
partitionedContext
61+
.getStateManager()
62+
.executeInKeyContext(
63+
() -> {
64+
try {
65+
applyPartitionFunction.apply(collector, partitionedContext);
66+
} catch (Exception e) {
67+
throw new RuntimeException(e);
68+
}
69+
},
70+
key);
71+
}
72+
} else {
73+
// non-keyed operator has only one partition.
74+
applyPartitionFunction.apply(collector, partitionedContext);
75+
}
3876
}
3977

4078
@Override

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java

+48-4
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,69 @@
1818

1919
package org.apache.flink.datastream.impl.context;
2020

21+
import org.apache.flink.datastream.api.common.Collector;
2122
import org.apache.flink.datastream.api.context.JobInfo;
2223
import org.apache.flink.datastream.api.context.TaskInfo;
2324
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
2425
import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
2526
import org.apache.flink.metrics.MetricGroup;
2627

28+
import java.util.Set;
29+
2730
/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
2831
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
2932
implements TwoOutputNonPartitionedContext<OUT1, OUT2> {
30-
private final DefaultRuntimeContext context;
33+
protected final DefaultRuntimeContext context;
34+
35+
private final DefaultPartitionedContext partitionedContext;
36+
37+
protected final Collector<OUT1> firstCollector;
38+
39+
protected final Collector<OUT2> secondCollector;
40+
41+
private final boolean isKeyed;
42+
43+
private final Set<Object> keySet;
3144

32-
public DefaultTwoOutputNonPartitionedContext(DefaultRuntimeContext context) {
45+
public DefaultTwoOutputNonPartitionedContext(
46+
DefaultRuntimeContext context,
47+
DefaultPartitionedContext partitionedContext,
48+
Collector<OUT1> firstCollector,
49+
Collector<OUT2> secondCollector,
50+
boolean isKeyed,
51+
Set<Object> keySet) {
3352
this.context = context;
53+
this.partitionedContext = partitionedContext;
54+
this.firstCollector = firstCollector;
55+
this.secondCollector = secondCollector;
56+
this.isKeyed = isKeyed;
57+
this.keySet = keySet;
3458
}
3559

3660
@Override
3761
public void applyToAllPartitions(
38-
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) {
39-
// TODO implements this method.
62+
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) throws Exception {
63+
if (isKeyed) {
64+
for (Object key : keySet) {
65+
partitionedContext
66+
.getStateManager()
67+
.executeInKeyContext(
68+
() -> {
69+
try {
70+
applyPartitionFunction.apply(
71+
firstCollector,
72+
secondCollector,
73+
partitionedContext);
74+
} catch (Exception e) {
75+
throw new RuntimeException(e);
76+
}
77+
},
78+
key);
79+
}
80+
} else {
81+
// non-keyed operator has only one partition.
82+
applyPartitionFunction.apply(firstCollector, secondCollector, partitionedContext);
83+
}
4084
}
4185

4286
@Override

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java

+32
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,37 @@
1919
package org.apache.flink.datastream.impl.operators;
2020

2121
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2223
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
2324
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
2526
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
2627
import org.apache.flink.datastream.impl.common.OutputCollector;
2728
import org.apache.flink.datastream.impl.common.TimestampCollector;
29+
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
2830
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
2931
import org.apache.flink.runtime.state.VoidNamespace;
3032
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
3133
import org.apache.flink.streaming.api.operators.InternalTimer;
3234
import org.apache.flink.streaming.api.operators.InternalTimerService;
3335
import org.apache.flink.streaming.api.operators.Triggerable;
36+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3437

3538
import javax.annotation.Nullable;
3639

40+
import java.util.HashSet;
41+
import java.util.Set;
42+
43+
import static org.apache.flink.util.Preconditions.checkNotNull;
44+
3745
/** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */
3846
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT>
3947
implements Triggerable<KEY, VoidNamespace> {
4048
private transient InternalTimerService<VoidNamespace> timerService;
4149

50+
// TODO Restore this keySet when task initialized from checkpoint.
51+
private transient Set<Object> keySet;
52+
4253
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
4354

4455
public KeyedProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
@@ -56,6 +67,7 @@ public KeyedProcessOperator(
5667
public void open() throws Exception {
5768
this.timerService =
5869
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
70+
this.keySet = new HashSet<>();
5971
super.open();
6072
}
6173

@@ -95,4 +107,24 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
95107
protected ProcessingTimeManager getProcessingTimeManager() {
96108
return new DefaultProcessingTimeManager(timerService);
97109
}
110+
111+
@Override
112+
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
113+
return new DefaultNonPartitionedContext<>(
114+
context, partitionedContext, outputCollector, true, keySet);
115+
}
116+
117+
@Override
118+
@SuppressWarnings({"unchecked", "rawtypes"})
119+
public void setKeyContextElement1(StreamRecord record) throws Exception {
120+
setKeyContextElement(record, getStateKeySelector1());
121+
}
122+
123+
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
124+
throws Exception {
125+
checkNotNull(selector);
126+
Object key = selector.getKey(record.getValue());
127+
setCurrentKey(key);
128+
keySet.add(key);
129+
}
98130
}

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java

+33
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,36 @@
1919
package org.apache.flink.datastream.impl.operators;
2020

2121
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2223
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
2324
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
2425
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
2526
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
2627
import org.apache.flink.datastream.impl.common.OutputCollector;
2728
import org.apache.flink.datastream.impl.common.TimestampCollector;
29+
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
2830
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
2931
import org.apache.flink.runtime.state.VoidNamespace;
3032
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
3133
import org.apache.flink.streaming.api.operators.InternalTimer;
3234
import org.apache.flink.streaming.api.operators.InternalTimerService;
3335
import org.apache.flink.streaming.api.operators.Triggerable;
36+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3437

3538
import javax.annotation.Nullable;
3639

40+
import java.util.HashSet;
41+
import java.util.Set;
42+
3743
/** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */
3844
public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
3945
extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
4046
implements Triggerable<KEY, VoidNamespace> {
4147
private transient InternalTimerService<VoidNamespace> timerService;
4248

49+
// TODO Restore this keySet when task initialized from checkpoint.
50+
private transient Set<Object> keySet;
51+
4352
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
4453

4554
public KeyedTwoInputBroadcastProcessOperator(
@@ -58,6 +67,7 @@ public KeyedTwoInputBroadcastProcessOperator(
5867
public void open() throws Exception {
5968
this.timerService =
6069
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
70+
this.keySet = new HashSet<>();
6171
super.open();
6272
}
6373

@@ -96,4 +106,27 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
96106
partitionedContext),
97107
timer.getKey());
98108
}
109+
110+
@Override
111+
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
112+
return new DefaultNonPartitionedContext<>(
113+
context, partitionedContext, collector, true, keySet);
114+
}
115+
116+
@Override
117+
@SuppressWarnings({"unchecked", "rawtypes"})
118+
// Only element from input1 should be considered as the other side is broadcast input.
119+
public void setKeyContextElement1(StreamRecord record) throws Exception {
120+
setKeyContextElement(record, getStateKeySelector1());
121+
}
122+
123+
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
124+
throws Exception {
125+
if (selector == null) {
126+
return;
127+
}
128+
Object key = selector.getKey(record.getValue());
129+
setCurrentKey(key);
130+
keySet.add(key);
131+
}
99132
}

Diff for: flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java

+38
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,27 @@
1919
package org.apache.flink.datastream.impl.operators;
2020

2121
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.datastream.api.context.NonPartitionedContext;
2223
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
2324
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
2425
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
2526
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
2627
import org.apache.flink.datastream.impl.common.OutputCollector;
2728
import org.apache.flink.datastream.impl.common.TimestampCollector;
29+
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
2830
import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
2931
import org.apache.flink.runtime.state.VoidNamespace;
3032
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
3133
import org.apache.flink.streaming.api.operators.InternalTimer;
3234
import org.apache.flink.streaming.api.operators.InternalTimerService;
3335
import org.apache.flink.streaming.api.operators.Triggerable;
36+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3437

3538
import javax.annotation.Nullable;
3639

40+
import java.util.HashSet;
41+
import java.util.Set;
42+
3743
/**
3844
* Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}.
3945
*/
@@ -42,6 +48,9 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, IN1, IN2, OUT>
4248
implements Triggerable<KEY, VoidNamespace> {
4349
private transient InternalTimerService<VoidNamespace> timerService;
4450

51+
// TODO Restore this keySet when task initialized from checkpoint.
52+
private transient Set<Object> keySet;
53+
4554
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
4655

4756
public KeyedTwoInputNonBroadcastProcessOperator(
@@ -60,6 +69,7 @@ public KeyedTwoInputNonBroadcastProcessOperator(
6069
public void open() throws Exception {
6170
this.timerService =
6271
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
72+
this.keySet = new HashSet<>();
6373
super.open();
6474
}
6575

@@ -98,4 +108,32 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
98108
partitionedContext),
99109
timer.getKey());
100110
}
111+
112+
@Override
113+
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
114+
return new DefaultNonPartitionedContext<>(
115+
context, partitionedContext, collector, true, keySet);
116+
}
117+
118+
@Override
119+
@SuppressWarnings({"unchecked", "rawtypes"})
120+
public void setKeyContextElement1(StreamRecord record) throws Exception {
121+
setKeyContextElement(record, getStateKeySelector1());
122+
}
123+
124+
@Override
125+
@SuppressWarnings({"unchecked", "rawtypes"})
126+
public void setKeyContextElement2(StreamRecord record) throws Exception {
127+
setKeyContextElement(record, getStateKeySelector2());
128+
}
129+
130+
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
131+
throws Exception {
132+
if (selector == null) {
133+
return;
134+
}
135+
Object key = selector.getKey(record.getValue());
136+
setCurrentKey(key);
137+
keySet.add(key);
138+
}
101139
}

0 commit comments

Comments
 (0)