Skip to content

Commit 08547b7

Browse files
committed
[API] Implement applyToAllPartitions for non-partitioned context
1 parent 1a232ec commit 08547b7

30 files changed

+977
-63
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.context;
20+
21+
import java.util.Iterator;
22+
23+
/**
24+
* This class maintains the context of all input keys. It will be notified once a key is selected.
25+
*/
26+
public interface AllKeysContext {
27+
/** This method should be invoked once a key is selected. */
28+
void onKeySelected(Object newKey);
29+
30+
/** Get the iterator of all keys. */
31+
Iterator<Object> getAllKeysIter();
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.context;
20+
21+
import org.apache.flink.process.api.common.Collector;
22+
import org.apache.flink.process.api.context.NonPartitionedContext;
23+
import org.apache.flink.process.api.function.ApplyPartitionFunction;
24+
25+
import java.util.Iterator;
26+
27+
/**
28+
* {@link NonPartitionedContext} for keyed operator. This will take care of the key context when
29+
* apply to all keyed partitions.
30+
*/
31+
public class DefaultKeyedNonPartitionedContext<OUT> extends DefaultNonPartitionedContext<OUT> {
32+
private final AllKeysContext allKeysContext;
33+
34+
public DefaultKeyedNonPartitionedContext(
35+
AllKeysContext allKeysContext,
36+
DefaultRuntimeContext context,
37+
Collector<OUT> collector) {
38+
super(context, collector);
39+
this.allKeysContext = allKeysContext;
40+
}
41+
42+
@Override
43+
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction)
44+
throws Exception {
45+
// for keyed operator, each key corresponds to a partition.
46+
for (Iterator<Object> it = allKeysContext.getAllKeysIter(); it.hasNext(); ) {
47+
Object key = it.next();
48+
context.getStateManager().setCurrentKey(key);
49+
applyPartitionFunction.apply(collector, context);
50+
context.getStateManager().resetCurrentKey();
51+
}
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.context;
20+
21+
import org.apache.flink.process.api.common.Collector;
22+
import org.apache.flink.process.api.function.TwoOutputApplyPartitionFunction;
23+
24+
import java.util.Iterator;
25+
26+
public class DefaultKeyedTwoOutputNonPartitionedContext<OUT1, OUT2>
27+
extends DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> {
28+
private final AllKeysContext allKeysContext;
29+
30+
public DefaultKeyedTwoOutputNonPartitionedContext(
31+
AllKeysContext allKeysContext,
32+
DefaultRuntimeContext context,
33+
Collector<OUT1> firstCollector,
34+
Collector<OUT2> secondCollector) {
35+
super(context, firstCollector, secondCollector);
36+
this.allKeysContext = allKeysContext;
37+
}
38+
39+
@Override
40+
public void applyToAllPartitions(
41+
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) throws Exception {
42+
// for keyed operator, each key corresponds to a partition.
43+
for (Iterator<Object> it = allKeysContext.getAllKeysIter(); it.hasNext(); ) {
44+
Object key = it.next();
45+
context.getStateManager().setCurrentKey(key);
46+
applyPartitionFunction.apply(firstCollector, secondCollector, context);
47+
context.getStateManager().resetCurrentKey();
48+
}
49+
}
50+
}

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.impl.context;
2020

2121
import org.apache.flink.metrics.groups.OperatorMetricGroup;
22+
import org.apache.flink.process.api.common.Collector;
2223
import org.apache.flink.process.api.context.JobInfo;
2324
import org.apache.flink.process.api.context.NonPartitionedContext;
2425
import org.apache.flink.process.api.context.ProcessingTimeManager;
@@ -29,15 +30,20 @@
2930

3031
/** The default implementation of {@link NonPartitionedContext}. */
3132
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
32-
private final DefaultRuntimeContext context;
33+
protected final DefaultRuntimeContext context;
3334

34-
public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
35+
protected final Collector<OUT> collector;
36+
37+
public DefaultNonPartitionedContext(DefaultRuntimeContext context, Collector<OUT> collector) {
3538
this.context = context;
39+
this.collector = collector;
3640
}
3741

3842
@Override
39-
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) {
40-
// TODO implements this method.
43+
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction)
44+
throws Exception {
45+
// non-keyed operator has only one partition.
46+
applyPartitionFunction.apply(collector, context);
4147
}
4248

4349
@Override

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.process.impl.context;
2020

2121
import org.apache.flink.metrics.groups.OperatorMetricGroup;
22+
import org.apache.flink.process.api.common.Collector;
2223
import org.apache.flink.process.api.context.JobInfo;
2324
import org.apache.flink.process.api.context.ProcessingTimeManager;
2425
import org.apache.flink.process.api.context.StateManager;
@@ -30,16 +31,25 @@
3031
/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
3132
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
3233
implements TwoOutputNonPartitionedContext<OUT1, OUT2> {
33-
private final DefaultRuntimeContext context;
34+
protected final DefaultRuntimeContext context;
3435

35-
public DefaultTwoOutputNonPartitionedContext(DefaultRuntimeContext context) {
36+
protected final Collector<OUT1> firstCollector;
37+
38+
protected final Collector<OUT2> secondCollector;
39+
40+
public DefaultTwoOutputNonPartitionedContext(
41+
DefaultRuntimeContext context,
42+
Collector<OUT1> firstCollector,
43+
Collector<OUT2> secondCollector) {
3644
this.context = context;
45+
this.firstCollector = firstCollector;
46+
this.secondCollector = secondCollector;
3747
}
3848

3949
@Override
4050
public void applyToAllPartitions(
41-
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) {
42-
// TODO implements this method.
51+
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) throws Exception {
52+
applyPartitionFunction.apply(firstCollector, secondCollector, context);
4353
}
4454

4555
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.process.impl.context;
20+
21+
import java.util.HashSet;
22+
import java.util.Iterator;
23+
24+
/** This {@link AllKeysContext} will listen and store all keys it has seen. */
25+
public class StoreAllKeysContext implements AllKeysContext {
26+
/** Used to store all the keys seen by this input. */
27+
private final HashSet<Object> allKeys = new HashSet<>();
28+
29+
@Override
30+
public void onKeySelected(Object newKey) {
31+
allKeys.add(newKey);
32+
}
33+
34+
/** Get the iterator of all keys. */
35+
public Iterator<Object> getAllKeysIter() {
36+
return allKeys.iterator();
37+
}
38+
}

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedProcessOperator.java

+29
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,37 @@
1919
package org.apache.flink.process.impl.operators;
2020

2121
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.process.api.context.NonPartitionedContext;
2223
import org.apache.flink.process.api.context.ProcessingTimeManager;
2324
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.process.api.stream.KeyedPartitionStream;
2526
import org.apache.flink.process.impl.common.KeyCheckedOutputCollector;
2627
import org.apache.flink.process.impl.common.OutputCollector;
2728
import org.apache.flink.process.impl.common.TimestampCollector;
29+
import org.apache.flink.process.impl.context.AllKeysContext;
30+
import org.apache.flink.process.impl.context.DefaultKeyedNonPartitionedContext;
2831
import org.apache.flink.process.impl.context.DefaultProcessingTimeManager;
32+
import org.apache.flink.process.impl.context.StoreAllKeysContext;
2933
import org.apache.flink.runtime.state.VoidNamespace;
3034
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
3135
import org.apache.flink.streaming.api.operators.InternalTimer;
3236
import org.apache.flink.streaming.api.operators.InternalTimerService;
3337
import org.apache.flink.streaming.api.operators.Triggerable;
38+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3439

3540
import javax.annotation.Nullable;
3641

3742
import java.util.Optional;
3843

44+
import static org.apache.flink.util.Preconditions.checkNotNull;
45+
3946
/** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */
4047
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT>
4148
implements Triggerable<KEY, VoidNamespace> {
4249
private transient InternalTimerService<VoidNamespace> timerService;
4350

51+
private transient AllKeysContext allKeysContext;
52+
4453
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
4554

4655
public KeyedProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
@@ -58,6 +67,7 @@ public KeyedProcessOperator(
5867
public void open() throws Exception {
5968
this.timerService =
6069
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
70+
this.allKeysContext = new StoreAllKeysContext();
6171
super.open();
6272
}
6373

@@ -69,6 +79,11 @@ protected TimestampCollector<OUT> getOutputCollector() {
6979
: new OutputCollector<>(output);
7080
}
7181

82+
@Override
83+
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
84+
return new DefaultKeyedNonPartitionedContext<>(allKeysContext, context, outputCollector);
85+
}
86+
7287
@Override
7388
protected Optional<Object> currentKey() {
7489
return Optional.ofNullable(getCurrentKey());
@@ -91,4 +106,18 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
91106
protected ProcessingTimeManager getProcessingTimeManager() {
92107
return new DefaultProcessingTimeManager(timerService);
93108
}
109+
110+
@Override
111+
@SuppressWarnings({"unchecked", "rawtypes"})
112+
public void setKeyContextElement1(StreamRecord record) throws Exception {
113+
setKeyContextElement(record, getStateKeySelector1());
114+
}
115+
116+
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
117+
throws Exception {
118+
checkNotNull(selector);
119+
Object key = selector.getKey(record.getValue());
120+
setCurrentKey(key);
121+
allKeysContext.onKeySelected(key);
122+
}
94123
}

Diff for: flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputBroadcastProcessOperator.java

+30
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,23 @@
1919
package org.apache.flink.process.impl.operators;
2020

2121
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.process.api.context.NonPartitionedContext;
2223
import org.apache.flink.process.api.context.ProcessingTimeManager;
2324
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
2425
import org.apache.flink.process.api.stream.KeyedPartitionStream;
2526
import org.apache.flink.process.impl.common.KeyCheckedOutputCollector;
2627
import org.apache.flink.process.impl.common.OutputCollector;
2728
import org.apache.flink.process.impl.common.TimestampCollector;
29+
import org.apache.flink.process.impl.context.AllKeysContext;
30+
import org.apache.flink.process.impl.context.DefaultKeyedNonPartitionedContext;
2831
import org.apache.flink.process.impl.context.DefaultProcessingTimeManager;
32+
import org.apache.flink.process.impl.context.StoreAllKeysContext;
2933
import org.apache.flink.runtime.state.VoidNamespace;
3034
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
3135
import org.apache.flink.streaming.api.operators.InternalTimer;
3236
import org.apache.flink.streaming.api.operators.InternalTimerService;
3337
import org.apache.flink.streaming.api.operators.Triggerable;
38+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3439

3540
import javax.annotation.Nullable;
3641

@@ -42,6 +47,8 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
4247
implements Triggerable<KEY, VoidNamespace> {
4348
private transient InternalTimerService<VoidNamespace> timerService;
4449

50+
private transient AllKeysContext allKeysContext;
51+
4552
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
4653

4754
public KeyedTwoInputBroadcastProcessOperator(
@@ -60,6 +67,7 @@ public KeyedTwoInputBroadcastProcessOperator(
6067
public void open() throws Exception {
6168
this.timerService =
6269
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
70+
this.allKeysContext = new StoreAllKeysContext();
6371
super.open();
6472
}
6573

@@ -71,6 +79,11 @@ protected TimestampCollector<OUT> getOutputCollector() {
7179
new OutputCollector<>(output), outKeySelector, () -> (KEY) getCurrentKey());
7280
}
7381

82+
@Override
83+
protected NonPartitionedContext<OUT> getNonPartitionedContext() {
84+
return new DefaultKeyedNonPartitionedContext<>(allKeysContext, context, collector);
85+
}
86+
7487
@Override
7588
protected Optional<Object> currentKey() {
7689
return Optional.ofNullable(getCurrentKey());
@@ -92,4 +105,21 @@ public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exc
92105
userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context);
93106
context.getStateManager().resetCurrentKey();
94107
}
108+
109+
@Override
110+
@SuppressWarnings({"unchecked", "rawtypes"})
111+
// Only element from input1 should be considered as the other side is broadcast input.
112+
public void setKeyContextElement1(StreamRecord record) throws Exception {
113+
setKeyContextElement(record, getStateKeySelector1());
114+
}
115+
116+
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
117+
throws Exception {
118+
if (selector == null) {
119+
return;
120+
}
121+
Object key = selector.getKey(record.getValue());
122+
setCurrentKey(key);
123+
allKeysContext.onKeySelected(key);
124+
}
95125
}

0 commit comments

Comments
 (0)