Skip to content

Commit

Permalink
[API] Implement TimestampManager and expose it via RuntimeContext
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Feb 28, 2024
1 parent 4a3eaa8 commit 9d6751a
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ public interface RuntimeContext {

/** Get the {@link ProcessingTimeManager} of this process function. */
ProcessingTimeManager getProcessingTimeManager();

/** Get the {@link TimestampManager} of this process function. */
TimestampManager getTimestampManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.process.api.context;

import java.util.Optional;

/** This is responsibility for retrieving timestamp related things of process function. */
public interface TimestampManager {
/**
* Get the timestamp of current processing record.
*
* @return the timestamp of current processed record.
* @return the timestamp of current processed record. If it does not have timestamp, empty will
* be returned.
*/
long getCurrentRecordTimestamp();
Optional<Long> getCurrentRecordTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.process.api.common.Collector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.Optional;

/** The base {@link Collector} which take care of records timestamp. */
public abstract class TimestampCollector<OUT> implements Collector<OUT> {
protected final StreamRecord<OUT> reuse = new StreamRecord<>(null);
Expand All @@ -40,4 +42,13 @@ public void setAbsoluteTimestamp(long timestamp) {
public void eraseTimestamp() {
reuse.eraseTimestamp();
}

/** Get the timestamp of the latest seen record. */
public Optional<Long> getLatestTimestamp() {
if (reuse.hasTimestamp()) {
return Optional.of(reuse.getTimestamp());
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.StateManager;
import org.apache.flink.process.api.context.TaskInfo;
import org.apache.flink.process.api.context.TimestampManager;
import org.apache.flink.process.api.function.ApplyPartitionFunction;

/** The default implementation of {@link NonPartitionedContext}. */
Expand Down Expand Up @@ -59,4 +60,9 @@ public ProcessingTimeManager getProcessingTimeManager() {
// processing timer is partition-aware, so it's not supported in non-partitioned context.
return UnsupportedProcessingTimeManager.INSTANCE;
}

@Override
public TimestampManager getTimestampManager() {
return context.getTimestampManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class DefaultRuntimeContext implements RuntimeContext {

private final ProcessingTimeManager processingTimeManager;

private final DefaultTimestampManager timestampManager;

public DefaultRuntimeContext(
StreamingRuntimeContext operatorContext,
int parallelism,
Expand All @@ -48,6 +50,7 @@ public DefaultRuntimeContext(
this.taskInfo = new DefaultTaskInfo(parallelism, maxParallelism, taskName);
this.stateManager = new DefaultStateManager(currentKeySupplier);
this.processingTimeManager = processingTimeManager;
this.timestampManager = new DefaultTimestampManager();
}

@Override
Expand All @@ -68,4 +71,9 @@ public DefaultStateManager getStateManager() {
public ProcessingTimeManager getProcessingTimeManager() {
return processingTimeManager;
}

@Override
public DefaultTimestampManager getTimestampManager() {
return timestampManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.process.impl.context;

import org.apache.flink.process.api.context.TimestampManager;

import java.util.Optional;

/** The default implementation of {@link TimestampManager}. */
public class DefaultTimestampManager implements TimestampManager {
private Optional<Long> currentTimestamp;

@Override
public Optional<Long> getCurrentRecordTimestamp() {
return currentTimestamp;
}

public void setTimestamp(Optional<Long> timestamp) {
currentTimestamp = timestamp;
}

public void resetTimestamp() {
currentTimestamp = Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.process.api.context.ProcessingTimeManager;
import org.apache.flink.process.api.context.StateManager;
import org.apache.flink.process.api.context.TaskInfo;
import org.apache.flink.process.api.context.TimestampManager;
import org.apache.flink.process.api.context.TwoOutputNonPartitionedContext;

public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
Expand Down Expand Up @@ -59,4 +60,9 @@ public ProcessingTimeManager getProcessingTimeManager() {
// processing timer is partition-aware, so it's not supported in non-partitioned context.
return UnsupportedProcessingTimeManager.INSTANCE;
}

@Override
public TimestampManager getTimestampManager() {
return context.getTimestampManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public void open() throws Exception {
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
outputCollector.setTimestamp(element);
context.getTimestampManager().setTimestamp(outputCollector.getLatestTimestamp());
userFunction.processRecord(element.getValue(), outputCollector, context);
context.getTimestampManager().resetTimestamp();
}

protected TimestampCollector<OUT> getOutputCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ public void open() throws Exception {
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
context.getTimestampManager().setTimestamp(collector.getLatestTimestamp());
userFunction.processRecordFromNonBroadcastInput(element.getValue(), collector, context);
context.getTimestampManager().resetTimestamp();
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
context.getTimestampManager().setTimestamp(collector.getLatestTimestamp());
userFunction.processRecordFromBroadcastInput(element.getValue(), nonPartitionedContext);
context.getTimestampManager().resetTimestamp();
}

protected TimestampCollector<OUT> getOutputCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ public void open() throws Exception {
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
context.getTimestampManager().setTimestamp(collector.getLatestTimestamp());
userFunction.processRecordFromFirstInput(element.getValue(), collector, context);
context.getTimestampManager().resetTimestamp();
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
context.getTimestampManager().setTimestamp(collector.getLatestTimestamp());
userFunction.processRecordFromSecondInput(element.getValue(), collector, context);
context.getTimestampManager().resetTimestamp();
}

protected TimestampCollector<OUT> getOutputCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public void open() throws Exception {
public void processElement(StreamRecord<IN> element) throws Exception {
mainCollector.setTimestamp(element);
sideCollector.setTimestamp(element);
context.getTimestampManager().setTimestamp(mainCollector.getLatestTimestamp());
userFunction.processRecord(element.getValue(), mainCollector, sideCollector, context);
context.getTimestampManager().resetTimestamp();
}

@Override
Expand Down

0 comments on commit 9d6751a

Please sign in to comment.