diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java index dd9b2c8a3be0b..bdcd731ca1a37 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/context/RuntimeContext.java @@ -37,4 +37,7 @@ public interface RuntimeContext { /** Get the {@link StateManager} of this process function. */ StateManager getStateManager(); + + /** Get the {@link ProcessingTimeManager} of this process function. */ + ProcessingTimeManager getProcessingTimeManager(); } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/OneInputStreamProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/OneInputStreamProcessFunction.java index c5860e06baccb..22de1e7b513cb 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/OneInputStreamProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/OneInputStreamProcessFunction.java @@ -42,4 +42,13 @@ public interface OneInputStreamProcessFunction extends ProcessFunction * @param ctx, the context in which this function is executed. */ default void endInput(NonPartitionedContext ctx) {} + + /** + * Callback for processing timer. + * + * @param timestamp when this callback is triggered. + * @param output to emit record. + * @param ctx, runtime context in which this function is executed. + */ + default void onProcessingTimer(long timestamp, Collector output, RuntimeContext ctx) {} } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputBroadcastStreamProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputBroadcastStreamProcessFunction.java index 1ad3da28e70f5..b0388a75ec1ec 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputBroadcastStreamProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputBroadcastStreamProcessFunction.java @@ -65,4 +65,13 @@ default void endNonBroadcastInput(NonPartitionedContext ctx) {} * @param ctx, the context in which this function is executed. */ default void endBroadcastInput(NonPartitionedContext ctx) {} + + /** + * Callback for processing timer. + * + * @param timestamp when this callback is triggered. + * @param output to emit record. + * @param ctx, runtime context in which this function is executed. + */ + default void onProcessingTimer(long timestamp, Collector output, RuntimeContext ctx) {} } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputNonBroadcastStreamProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputNonBroadcastStreamProcessFunction.java index 19e2a2ba78701..d0d80392ef764 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputNonBroadcastStreamProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoInputNonBroadcastStreamProcessFunction.java @@ -61,4 +61,13 @@ default void endFirstInput(NonPartitionedContext ctx) {} * @param ctx, the context in which this function is executed. */ default void endSecondInput(NonPartitionedContext ctx) {} + + /** + * Callback for processing timer. + * + * @param timestamp when this callback is triggered. + * @param output to emit record. + * @param ctx, runtime context in which this function is executed. + */ + default void onProcessingTimer(long timestamp, Collector output, RuntimeContext ctx) {} } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoOutputStreamProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoOutputStreamProcessFunction.java index b26ca5bab6e8c..e4c5afd9545bb 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoOutputStreamProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/TwoOutputStreamProcessFunction.java @@ -45,4 +45,15 @@ void processRecord( * @param ctx, the context in which this function is executed. */ default void endInput(TwoOutputNonPartitionedContext ctx) {} + + /** + * Callback for processing timer. + * + * @param timestamp when this callback is triggered. + * @param output1 to emit record. + * @param output2 to emit record. + * @param ctx, runtime context in which this function is executed. + */ + default void onProcessingTimer( + long timestamp, Collector output1, Collector output2, RuntimeContext ctx) {} } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java index 63ab902fddcb9..3f247f1cd51bf 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultNonPartitionedContext.java @@ -20,6 +20,7 @@ import org.apache.flink.process.api.context.JobInfo; import org.apache.flink.process.api.context.NonPartitionedContext; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.StateManager; import org.apache.flink.process.api.context.TaskInfo; import org.apache.flink.process.api.function.ApplyPartitionFunction; @@ -52,4 +53,10 @@ public StateManager getStateManager() { // state is partition-aware, so it's always empty in non-partitioned context. return EmptyStateManager.INSTANCE; } + + @Override + public ProcessingTimeManager getProcessingTimeManager() { + // processing timer is partition-aware, so it's not supported in non-partitioned context. + return UnsupportedProcessingTimeManager.INSTANCE; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultProcessingTimeManager.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultProcessingTimeManager.java new file mode 100644 index 0000000000000..8844898f51fc8 --- /dev/null +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultProcessingTimeManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.impl.context; + +import org.apache.flink.process.api.context.ProcessingTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +/** The default implementation of {@link ProcessingTimeManager}. */ +public class DefaultProcessingTimeManager implements ProcessingTimeManager { + private final InternalTimerService timerService; + + public DefaultProcessingTimeManager(InternalTimerService timerService) { + this.timerService = timerService; + } + + @Override + public void registerProcessingTimer(long timestamp) { + timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public void deleteProcessingTimeTimer(long timestamp) { + timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public long currentProcessingTime() { + return timerService.currentProcessingTime(); + } +} diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java index 71e186b334d44..d825935833401 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultRuntimeContext.java @@ -19,6 +19,7 @@ package org.apache.flink.process.impl.context; import org.apache.flink.process.api.context.JobInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.RuntimeContext; import org.apache.flink.process.api.context.TaskInfo; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -34,15 +35,19 @@ public class DefaultRuntimeContext implements RuntimeContext { private final DefaultStateManager stateManager; + private final ProcessingTimeManager processingTimeManager; + public DefaultRuntimeContext( StreamingRuntimeContext operatorContext, int parallelism, int maxParallelism, String taskName, - Supplier> currentKeySupplier) { + Supplier> currentKeySupplier, + ProcessingTimeManager processingTimeManager) { this.jobInfo = new DefaultJobInfo(operatorContext); this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName); this.stateManager = new DefaultStateManager(currentKeySupplier); + this.processingTimeManager = processingTimeManager; } @Override @@ -59,4 +64,8 @@ public TaskInfo getTaskInfo() { public DefaultStateManager getStateManager() { return stateManager; } + + public ProcessingTimeManager getProcessingTimeManager() { + return processingTimeManager; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java index 1f05461226a77..4ed475069694b 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/DefaultTwoOutputNonPartitionedContext.java @@ -19,6 +19,7 @@ package org.apache.flink.process.impl.context; import org.apache.flink.process.api.context.JobInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.StateManager; import org.apache.flink.process.api.context.TaskInfo; import org.apache.flink.process.api.context.TwoOutputNonPartitionedContext; @@ -52,4 +53,10 @@ public StateManager getStateManager() { // state is partition-aware, so it's always empty in non-partitioned context. return EmptyStateManager.INSTANCE; } + + @Override + public ProcessingTimeManager getProcessingTimeManager() { + // processing timer is partition-aware, so it's not supported in non-partitioned context. + return UnsupportedProcessingTimeManager.INSTANCE; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/UnsupportedProcessingTimeManager.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/UnsupportedProcessingTimeManager.java new file mode 100644 index 0000000000000..91f9e2cfb2c78 --- /dev/null +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/context/UnsupportedProcessingTimeManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.impl.context; + +import org.apache.flink.process.api.context.ProcessingTimeManager; + +/** + * The special implementation of {@link ProcessingTimeManager}, all its methods are not supported. + * This is used for context that can not define the key. + */ +public class UnsupportedProcessingTimeManager implements ProcessingTimeManager { + public static UnsupportedProcessingTimeManager INSTANCE = + new UnsupportedProcessingTimeManager(); + + private UnsupportedProcessingTimeManager() {} + + @Override + public void registerProcessingTimer(long timestamp) { + throw new UnsupportedOperationException( + "Register processing timer is unsupported in current context."); + } + + @Override + public void deleteProcessingTimeTimer(long timestamp) { + throw new UnsupportedOperationException( + "Delete processing timer is unsupported in current context."); + } + + @Override + public long currentProcessingTime() { + throw new UnsupportedOperationException( + "Get current processing time is unsupported in current context."); + } +} diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedProcessOperator.java index 44888a063b9f7..b6095caecfd78 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedProcessOperator.java @@ -19,18 +19,27 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.OneInputStreamProcessFunction; import org.apache.flink.process.api.stream.KeyedPartitionStream; import org.apache.flink.process.impl.common.KeyCheckedOutputCollector; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; +import org.apache.flink.process.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; import javax.annotation.Nullable; import java.util.Optional; /** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */ -public class KeyedProcessOperator extends ProcessOperator { +public class KeyedProcessOperator extends ProcessOperator + implements Triggerable { + private transient InternalTimerService timerService; @Nullable private final KeySelector outKeySelector; @@ -45,6 +54,13 @@ public KeyedProcessOperator( this.outKeySelector = outKeySelector; } + @Override + public void open() throws Exception { + this.timerService = + getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this); + super.open(); + } + @Override protected TimestampCollector getOutputCollector() { return outKeySelector != null @@ -57,4 +73,22 @@ protected TimestampCollector getOutputCollector() { protected Optional currentKey() { return Optional.ofNullable(getCurrentKey()); } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // do nothing at the moment. + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // align the key context with the registered timer. + context.getStateManager().setCurrentKey(timer.getKey()); + userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context); + context.getStateManager().resetCurrentKey(); + } + + @Override + protected ProcessingTimeManager getProcessingTimeManager() { + return new DefaultProcessingTimeManager(timerService); + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputBroadcastProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputBroadcastProcessOperator.java index bed6029a1a385..1ba91b86c696f 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputBroadcastProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputBroadcastProcessOperator.java @@ -19,11 +19,18 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction; import org.apache.flink.process.api.stream.KeyedPartitionStream; import org.apache.flink.process.impl.common.KeyCheckedOutputCollector; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; +import org.apache.flink.process.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; import javax.annotation.Nullable; @@ -31,7 +38,10 @@ /** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedTwoInputBroadcastProcessOperator - extends TwoInputBroadcastProcessOperator { + extends TwoInputBroadcastProcessOperator + implements Triggerable { + private transient InternalTimerService timerService; + @Nullable private final KeySelector outKeySelector; public KeyedTwoInputBroadcastProcessOperator( @@ -46,6 +56,13 @@ public KeyedTwoInputBroadcastProcessOperator( this.outKeySelector = outKeySelector; } + @Override + public void open() throws Exception { + this.timerService = + getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this); + super.open(); + } + @Override protected TimestampCollector getOutputCollector() { return outKeySelector == null @@ -58,4 +75,21 @@ protected TimestampCollector getOutputCollector() { protected Optional currentKey() { return Optional.ofNullable(getCurrentKey()); } + + protected ProcessingTimeManager getProcessingTimeManager() { + return new DefaultProcessingTimeManager(timerService); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // do nothing at the moment. + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // align the key context with the registered timer. + context.getStateManager().setCurrentKey(timer.getKey()); + userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context); + context.getStateManager().resetCurrentKey(); + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java index bb7acf4f02df1..9600bb0e498d6 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java @@ -19,11 +19,18 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.process.api.stream.KeyedPartitionStream; import org.apache.flink.process.impl.common.KeyCheckedOutputCollector; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; +import org.apache.flink.process.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; import javax.annotation.Nullable; @@ -33,7 +40,10 @@ * Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedTwoInputNonBroadcastProcessOperator - extends TwoInputNonBroadcastProcessOperator { + extends TwoInputNonBroadcastProcessOperator + implements Triggerable { + private transient InternalTimerService timerService; + @Nullable private final KeySelector outKeySelector; public KeyedTwoInputNonBroadcastProcessOperator( @@ -48,6 +58,13 @@ public KeyedTwoInputNonBroadcastProcessOperator( this.outKeySelector = outKeySelector; } + @Override + public void open() throws Exception { + this.timerService = + getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this); + super.open(); + } + @Override protected TimestampCollector getOutputCollector() { return outKeySelector == null @@ -60,4 +77,21 @@ protected TimestampCollector getOutputCollector() { protected Optional currentKey() { return Optional.ofNullable(getCurrentKey()); } + + protected ProcessingTimeManager getProcessingTimeManager() { + return new DefaultProcessingTimeManager(timerService); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // do nothing at the moment. + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // align the key context with the registered timer. + context.getStateManager().setCurrentKey(timer.getKey()); + userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context); + context.getStateManager().resetCurrentKey(); + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoOutputProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoOutputProcessOperator.java index 702899e877446..9b2cf0c4319a5 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoOutputProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/KeyedTwoOutputProcessOperator.java @@ -19,9 +19,16 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; +import org.apache.flink.process.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.OutputTag; @@ -31,7 +38,10 @@ /** */ public class KeyedTwoOutputProcessOperator - extends TwoOutputProcessOperator { + extends TwoOutputProcessOperator + implements Triggerable { + + private transient InternalTimerService timerService; @Nullable private final KeySelector mainOutKeySelector; @@ -53,6 +63,13 @@ public KeyedTwoOutputProcessOperator( this.sideOutKeySelector = sideOutKeySelector; } + @Override + public void open() throws Exception { + this.timerService = + getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this); + super.open(); + } + @Override protected TimestampCollector getMainCollector() { return mainOutKeySelector != null && sideOutKeySelector != null @@ -73,6 +90,24 @@ protected Optional currentKey() { return Optional.ofNullable(getCurrentKey()); } + protected ProcessingTimeManager getProcessingTimeManager() { + return new DefaultProcessingTimeManager(timerService); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // do nothing at the moment. + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // align the key context with the registered timer. + context.getStateManager().setCurrentKey(timer.getKey()); + userFunction.onProcessingTimer( + timer.getTimestamp(), getMainCollector(), getSideCollector(), context); + context.getStateManager().resetCurrentKey(); + } + private class KeyCheckedOutputCollector extends TimestampCollector { private final TimestampCollector outputCollector; diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/ProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/ProcessOperator.java index 5e5e496137e47..14ad500f93514 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/ProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/ProcessOperator.java @@ -19,11 +19,13 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.OneInputStreamProcessFunction; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; import org.apache.flink.process.impl.context.DefaultNonPartitionedContext; import org.apache.flink.process.impl.context.DefaultRuntimeContext; +import org.apache.flink.process.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -61,7 +63,8 @@ public void open() throws Exception { taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), - this::currentKey); + this::currentKey, + getProcessingTimeManager()); nonPartitionedContext = new DefaultNonPartitionedContext<>(context); outputCollector = getOutputCollector(); } @@ -85,4 +88,8 @@ protected Optional currentKey() { // non-keyed operator always return empty key. return Optional.empty(); } + + protected ProcessingTimeManager getProcessingTimeManager() { + return UnsupportedProcessingTimeManager.INSTANCE; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputBroadcastProcessOperator.java index e2a9bf22dd9b2..da7f5dc265f3e 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputBroadcastProcessOperator.java @@ -19,11 +19,13 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; import org.apache.flink.process.impl.context.DefaultNonPartitionedContext; import org.apache.flink.process.impl.context.DefaultRuntimeContext; +import org.apache.flink.process.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -65,7 +67,8 @@ public void open() throws Exception { taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), - this::currentKey); + this::currentKey, + getProcessingTimeManager()); this.nonPartitionedContext = new DefaultNonPartitionedContext<>(context); } @@ -100,4 +103,8 @@ protected Optional currentKey() { // non-keyed operator always return empty key. return Optional.empty(); } + + protected ProcessingTimeManager getProcessingTimeManager() { + return UnsupportedProcessingTimeManager.INSTANCE; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputNonBroadcastProcessOperator.java index 9352143aa0022..11f2e32722f8a 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -19,11 +19,13 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; import org.apache.flink.process.impl.context.DefaultNonPartitionedContext; import org.apache.flink.process.impl.context.DefaultRuntimeContext; +import org.apache.flink.process.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -65,7 +67,8 @@ public void open() throws Exception { taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), - this::currentKey); + this::currentKey, + getProcessingTimeManager()); this.nonPartitionedContext = new DefaultNonPartitionedContext<>(context); } @@ -100,4 +103,8 @@ protected Optional currentKey() { // non-keyed operator always return empty key. return Optional.empty(); } + + protected ProcessingTimeManager getProcessingTimeManager() { + return UnsupportedProcessingTimeManager.INSTANCE; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoOutputProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoOutputProcessOperator.java index a864d08a06164..82b10b7073a52 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoOutputProcessOperator.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/TwoOutputProcessOperator.java @@ -19,12 +19,14 @@ package org.apache.flink.process.impl.operators; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.process.api.context.ProcessingTimeManager; import org.apache.flink.process.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.process.impl.common.OutputCollector; import org.apache.flink.process.impl.common.TimestampCollector; import org.apache.flink.process.impl.context.DefaultRuntimeContext; import org.apache.flink.process.impl.context.DefaultTwoOutputNonPartitionedContext; +import org.apache.flink.process.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -76,7 +78,8 @@ public void open() throws Exception { taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), - this::currentKey); + this::currentKey, + getProcessingTimeManager()); this.nonPartitionedContext = new DefaultTwoOutputNonPartitionedContext<>(context); } @@ -105,6 +108,10 @@ protected Optional currentKey() { return Optional.empty(); } + protected ProcessingTimeManager getProcessingTimeManager() { + return UnsupportedProcessingTimeManager.INSTANCE; + } + /** * This is a special implementation of {@link TimestampCollector} that using side-output * mechanism to emit data.