Skip to content

Commit

Permalink
[API] Implement ProcessingTimeManager and expose it via RuntimeContext
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Feb 28, 2024
1 parent 2b4b7e9 commit 4a3eaa8
Show file tree
Hide file tree
Showing 18 changed files with 335 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ public interface RuntimeContext {

/** Get the {@link StateManager} of this process function. */
StateManager getStateManager();

/** Get the {@link ProcessingTimeManager} of this process function. */
ProcessingTimeManager getProcessingTimeManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction
* @param ctx, the context in which this function is executed.
*/
default void endInput(NonPartitionedContext<OUT> ctx) {}

/**
* Callback for processing timer.
*
* @param timestamp when this callback is triggered.
* @param output to emit record.
* @param ctx, runtime context in which this function is executed.
*/
default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,13 @@ default void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {}
* @param ctx, the context in which this function is executed.
*/
default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}

/**
* Callback for processing timer.
*
* @param timestamp when this callback is triggered.
* @param output to emit record.
* @param ctx, runtime context in which this function is executed.
*/
default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ default void endFirstInput(NonPartitionedContext<OUT> ctx) {}
* @param ctx, the context in which this function is executed.
*/
default void endSecondInput(NonPartitionedContext<OUT> ctx) {}

/**
* Callback for processing timer.
*
* @param timestamp when this callback is triggered.
* @param output to emit record.
* @param ctx, runtime context in which this function is executed.
*/
default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,15 @@ void processRecord(
* @param ctx, the context in which this function is executed.
*/
default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}

/**
* Callback for processing timer.
*
* @param timestamp when this callback is triggered.
* @param output1 to emit record.
* @param output2 to emit record.
* @param ctx, runtime context in which this function is executed.
*/
default void onProcessingTimer(
long timestamp, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.NonPartitionedContext;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.StateManager;
import org.apache.flink.process.api.context.TaskInfo;
import org.apache.flink.process.api.function.ApplyPartitionFunction;
Expand Down Expand Up @@ -52,4 +53,10 @@ public StateManager getStateManager() {
// state is partition-aware, so it's always empty in non-partitioned context.
return EmptyStateManager.INSTANCE;
}

@Override
public ProcessingTimeManager getProcessingTimeManager() {
// processing timer is partition-aware, so it's not supported in non-partitioned context.
return UnsupportedProcessingTimeManager.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.process.impl.context;

import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimerService;

/** The default implementation of {@link ProcessingTimeManager}. */
public class DefaultProcessingTimeManager implements ProcessingTimeManager {
private final InternalTimerService<VoidNamespace> timerService;

public DefaultProcessingTimeManager(InternalTimerService<VoidNamespace> timerService) {
this.timerService = timerService;
}

@Override
public void registerProcessingTimer(long timestamp) {
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, timestamp);
}

@Override
public void deleteProcessingTimeTimer(long timestamp) {
timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, timestamp);
}

@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.process.impl.context;

import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.RuntimeContext;
import org.apache.flink.process.api.context.TaskInfo;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
Expand All @@ -34,15 +35,19 @@ public class DefaultRuntimeContext implements RuntimeContext {

private final DefaultStateManager stateManager;

private final ProcessingTimeManager processingTimeManager;

public DefaultRuntimeContext(
StreamingRuntimeContext operatorContext,
int parallelism,
int maxParallelism,
String taskName,
Supplier<Optional<Object>> currentKeySupplier) {
Supplier<Optional<Object>> currentKeySupplier,
ProcessingTimeManager processingTimeManager) {
this.jobInfo = new DefaultJobInfo(operatorContext);
this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName);
this.stateManager = new DefaultStateManager(currentKeySupplier);
this.processingTimeManager = processingTimeManager;
}

@Override
Expand All @@ -59,4 +64,8 @@ public TaskInfo getTaskInfo() {
public DefaultStateManager getStateManager() {
return stateManager;
}

public ProcessingTimeManager getProcessingTimeManager() {
return processingTimeManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.process.impl.context;

import org.apache.flink.process.api.context.JobInfo;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.StateManager;
import org.apache.flink.process.api.context.TaskInfo;
import org.apache.flink.process.api.context.TwoOutputNonPartitionedContext;
Expand Down Expand Up @@ -52,4 +53,10 @@ public StateManager getStateManager() {
// state is partition-aware, so it's always empty in non-partitioned context.
return EmptyStateManager.INSTANCE;
}

@Override
public ProcessingTimeManager getProcessingTimeManager() {
// processing timer is partition-aware, so it's not supported in non-partitioned context.
return UnsupportedProcessingTimeManager.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.process.impl.context;

import org.apache.flink.process.api.context.ProcessingTimeManager;

/**
* The special implementation of {@link ProcessingTimeManager}, all its methods are not supported.
* This is used for context that can not define the key.
*/
public class UnsupportedProcessingTimeManager implements ProcessingTimeManager {
public static UnsupportedProcessingTimeManager INSTANCE =
new UnsupportedProcessingTimeManager();

private UnsupportedProcessingTimeManager() {}

@Override
public void registerProcessingTimer(long timestamp) {
throw new UnsupportedOperationException(
"Register processing timer is unsupported in current context.");
}

@Override
public void deleteProcessingTimeTimer(long timestamp) {
throw new UnsupportedOperationException(
"Delete processing timer is unsupported in current context.");
}

@Override
public long currentProcessingTime() {
throw new UnsupportedOperationException(
"Get current processing time is unsupported in current context.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@
package org.apache.flink.process.impl.operators;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
import org.apache.flink.process.api.stream.KeyedPartitionStream;
import org.apache.flink.process.impl.common.KeyCheckedOutputCollector;
import org.apache.flink.process.impl.common.OutputCollector;
import org.apache.flink.process.impl.common.TimestampCollector;
import org.apache.flink.process.impl.context.DefaultProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;

import javax.annotation.Nullable;

import java.util.Optional;

/** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT> {
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT>
implements Triggerable<KEY, VoidNamespace> {
private transient InternalTimerService<VoidNamespace> timerService;

@Nullable private final KeySelector<OUT, KEY> outKeySelector;

Expand All @@ -45,6 +54,13 @@ public KeyedProcessOperator(
this.outKeySelector = outKeySelector;
}

@Override
public void open() throws Exception {
this.timerService =
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
super.open();
}

@Override
protected TimestampCollector<OUT> getOutputCollector() {
return outKeySelector != null
Expand All @@ -57,4 +73,22 @@ protected TimestampCollector<OUT> getOutputCollector() {
protected Optional<Object> currentKey() {
return Optional.ofNullable(getCurrentKey());
}

@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// do nothing at the moment.
}

@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// align the key context with the registered timer.
context.getStateManager().setCurrentKey(timer.getKey());
userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context);
context.getStateManager().resetCurrentKey();
}

@Override
protected ProcessingTimeManager getProcessingTimeManager() {
return new DefaultProcessingTimeManager(timerService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@
package org.apache.flink.process.impl.operators;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
import org.apache.flink.process.api.stream.KeyedPartitionStream;
import org.apache.flink.process.impl.common.KeyCheckedOutputCollector;
import org.apache.flink.process.impl.common.OutputCollector;
import org.apache.flink.process.impl.common.TimestampCollector;
import org.apache.flink.process.impl.context.DefaultProcessingTimeManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;

import javax.annotation.Nullable;

import java.util.Optional;

/** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */
public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT> {
extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
implements Triggerable<KEY, VoidNamespace> {
private transient InternalTimerService<VoidNamespace> timerService;

@Nullable private final KeySelector<OUT, KEY> outKeySelector;

public KeyedTwoInputBroadcastProcessOperator(
Expand All @@ -46,6 +56,13 @@ public KeyedTwoInputBroadcastProcessOperator(
this.outKeySelector = outKeySelector;
}

@Override
public void open() throws Exception {
this.timerService =
getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
super.open();
}

@Override
protected TimestampCollector<OUT> getOutputCollector() {
return outKeySelector == null
Expand All @@ -58,4 +75,21 @@ protected TimestampCollector<OUT> getOutputCollector() {
protected Optional<Object> currentKey() {
return Optional.ofNullable(getCurrentKey());
}

protected ProcessingTimeManager getProcessingTimeManager() {
return new DefaultProcessingTimeManager(timerService);
}

@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// do nothing at the moment.
}

@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// align the key context with the registered timer.
context.getStateManager().setCurrentKey(timer.getKey());
userFunction.onProcessingTimer(timer.getTimestamp(), getOutputCollector(), context);
context.getStateManager().resetCurrentKey();
}
}
Loading

0 comments on commit 4a3eaa8

Please sign in to comment.