Skip to content

Commit c5dfb57

Browse files
committed
fixup! [FLINK-34252][table] Optimize WatermarkAssignerOperator to avoid calling System.currentTimeMillis per every record
1 parent 5225e81 commit c5dfb57

File tree

2 files changed

+132
-34
lines changed

2 files changed

+132
-34
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.runtime.operators.wmassigners;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.functions.DefaultOpenContext;
2223
import org.apache.flink.api.common.functions.util.FunctionUtils;
2324
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -30,6 +31,7 @@
3031
import org.apache.flink.table.data.RowData;
3132
import org.apache.flink.table.runtime.generated.WatermarkGenerator;
3233

34+
import static org.apache.flink.util.Preconditions.checkArgument;
3335
import static org.apache.flink.util.Preconditions.checkNotNull;
3436

3537
/**
@@ -52,12 +54,22 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData>
5254

5355
private transient long watermarkInterval;
5456

57+
private transient long timerInterval;
58+
5559
private transient long currentWatermark;
5660

57-
private transient long lastRecordTime;
61+
// Last time watermark have been (periodically) emitted
62+
private transient long lastWatermarkPeriodicEmitTime;
63+
64+
// Last time idleness status has been checked
65+
private transient long timeSinceLastIdleCheck;
5866

5967
private transient WatermarkStatus currentStatus = WatermarkStatus.ACTIVE;
6068

69+
private transient long processedElements;
70+
71+
private transient long lastIdleCheckProcessedElements = -1;
72+
6173
/**
6274
* Create a watermark assigner operator.
6375
*
@@ -87,11 +99,14 @@ public void open() throws Exception {
8799
// watermark and timestamp should start from 0
88100
this.currentWatermark = 0;
89101
this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
90-
this.lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
102+
long now = getProcessingTimeService().getCurrentProcessingTime();
103+
this.lastWatermarkPeriodicEmitTime = now;
104+
this.timeSinceLastIdleCheck = now;
91105

92-
if (watermarkInterval > 0) {
93-
long now = getProcessingTimeService().getCurrentProcessingTime();
94-
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
106+
if (watermarkInterval > 0 || idleTimeout > 0) {
107+
this.timerInterval =
108+
calculateProcessingTimeTimerInterval(watermarkInterval, idleTimeout);
109+
getProcessingTimeService().registerTimer(now + timerInterval, this);
95110
}
96111

97112
FunctionUtils.setFunctionRuntimeContext(watermarkGenerator, getRuntimeContext());
@@ -100,12 +115,11 @@ public void open() throws Exception {
100115

101116
@Override
102117
public void processElement(StreamRecord<RowData> element) throws Exception {
103-
if (idleTimeout > 0) {
104-
if (currentStatus.equals(WatermarkStatus.IDLE)) {
105-
// mark the channel active
106-
emitWatermarkStatus(WatermarkStatus.ACTIVE);
107-
}
108-
lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
118+
processedElements++;
119+
120+
if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.IDLE)) {
121+
// mark the channel active
122+
emitWatermarkStatus(WatermarkStatus.ACTIVE);
109123
}
110124

111125
RowData row = element.getValue();
@@ -139,19 +153,28 @@ private void advanceWatermark() {
139153

140154
@Override
141155
public void onProcessingTime(long timestamp) throws Exception {
142-
advanceWatermark();
156+
// timestamp and now can be off in case TM is heavily overloaded.
157+
long now = getProcessingTimeService().getCurrentProcessingTime();
143158

144-
if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
145-
final long currentTime = getProcessingTimeService().getCurrentProcessingTime();
146-
if (currentTime - lastRecordTime > idleTimeout) {
147-
// mark the channel as idle to ignore watermarks from this channel
148-
emitWatermarkStatus(WatermarkStatus.IDLE);
149-
}
159+
if (watermarkInterval > 0 && lastWatermarkPeriodicEmitTime + watermarkInterval <= now) {
160+
lastWatermarkPeriodicEmitTime = now;
161+
advanceWatermark();
162+
}
163+
164+
if (processedElements != lastIdleCheckProcessedElements) {
165+
timeSinceLastIdleCheck = now;
166+
lastIdleCheckProcessedElements = processedElements;
167+
}
168+
169+
if (isIdlenessEnabled()
170+
&& currentStatus.equals(WatermarkStatus.ACTIVE)
171+
&& timeSinceLastIdleCheck + idleTimeout <= now) {
172+
// mark the channel as idle to ignore watermarks from this channel
173+
emitWatermarkStatus(WatermarkStatus.IDLE);
150174
}
151175

152176
// register next timer
153-
long now = getProcessingTimeService().getCurrentProcessingTime();
154-
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
177+
getProcessingTimeService().registerTimer(now + timerInterval, this);
155178
}
156179

157180
/**
@@ -163,7 +186,7 @@ public void processWatermark(Watermark mark) throws Exception {
163186
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
164187
// to signal the end of input and to not block watermark progress downstream
165188
if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
166-
if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.IDLE)) {
189+
if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.IDLE)) {
167190
// mark the channel active
168191
emitWatermarkStatus(WatermarkStatus.ACTIVE);
169192
}
@@ -193,4 +216,36 @@ public void close() throws Exception {
193216
FunctionUtils.closeFunction(watermarkGenerator);
194217
super.close();
195218
}
219+
220+
private boolean isIdlenessEnabled() {
221+
return idleTimeout > 0;
222+
}
223+
224+
@VisibleForTesting
225+
static long calculateProcessingTimeTimerInterval(long watermarkInterval, long idleTimeout) {
226+
checkArgument(watermarkInterval > 0 || idleTimeout > 0);
227+
if (watermarkInterval <= 0) {
228+
return idleTimeout;
229+
}
230+
if (idleTimeout <= 0) {
231+
return watermarkInterval;
232+
}
233+
234+
long smallerInterval = Math.min(watermarkInterval, idleTimeout);
235+
long largerInterval = Math.max(watermarkInterval, idleTimeout);
236+
237+
// If one of the intervals is 5x smaller, just pick the smaller one. The firing interval
238+
// for the smaller one this way will be perfectly accurate, while for the larger one it will
239+
// be good enough™. For example one timer is every 2s the other every 11s, the 2nd timer
240+
// will be effectively checked every 12s, which is an acceptable accuracy.
241+
long timerInterval;
242+
if (smallerInterval * 5 < largerInterval) {
243+
timerInterval = smallerInterval;
244+
} else {
245+
// Otherwise, just pick an interval 5x smaller than the smaller interval. Again accuracy
246+
// will be good enough™.
247+
timerInterval = smallerInterval / 5;
248+
}
249+
return Math.max(timerInterval, 1);
250+
}
196251
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.List;
3838
import java.util.concurrent.ConcurrentLinkedQueue;
3939

40+
import static org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.calculateProcessingTimeTimerInterval;
4041
import static org.assertj.core.api.Assertions.assertThat;
4142
import static org.assertj.core.api.Assertions.fail;
4243

@@ -46,6 +47,24 @@ public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTest
4647
private static final WatermarkGenerator WATERMARK_GENERATOR =
4748
new BoundedOutOfOrderWatermarkGenerator(0, 1);
4849

50+
@Test
51+
public void testCalculateProcessingTimeTimerInterval() {
52+
assertThat(calculateProcessingTimeTimerInterval(5, 0)).isEqualTo(5);
53+
assertThat(calculateProcessingTimeTimerInterval(5, -1)).isEqualTo(5);
54+
55+
assertThat(calculateProcessingTimeTimerInterval(0, 5)).isEqualTo(5);
56+
assertThat(calculateProcessingTimeTimerInterval(-1, 5)).isEqualTo(5);
57+
58+
assertThat(calculateProcessingTimeTimerInterval(5, 42)).isEqualTo(5);
59+
assertThat(calculateProcessingTimeTimerInterval(42, 5)).isEqualTo(5);
60+
61+
assertThat(calculateProcessingTimeTimerInterval(2, 4)).isEqualTo(1);
62+
assertThat(calculateProcessingTimeTimerInterval(4, 2)).isEqualTo(1);
63+
64+
assertThat(calculateProcessingTimeTimerInterval(100, 110)).isEqualTo(20);
65+
assertThat(calculateProcessingTimeTimerInterval(110, 100)).isEqualTo(20);
66+
}
67+
4968
@Test
5069
public void testWatermarkAssignerWithIdleSource() throws Exception {
5170
// with timeout 1000 ms
@@ -68,7 +87,10 @@ public void testWatermarkAssignerWithIdleSource() throws Exception {
6887
expectedOutput.add(new Watermark(3));
6988
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
7089

71-
testHarness.setProcessingTime(1001);
90+
stepProcessingTime(testHarness, 52, 1050, 50);
91+
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
92+
93+
stepProcessingTime(testHarness, 1051, 1100, 50);
7294
expectedOutput.add(WatermarkStatus.IDLE);
7395
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
7496

@@ -79,34 +101,55 @@ public void testWatermarkAssignerWithIdleSource() throws Exception {
79101
testHarness.processElement(new StreamRecord<>(GenericRowData.of(7L)));
80102
testHarness.processElement(new StreamRecord<>(GenericRowData.of(8L)));
81103

82-
testHarness.setProcessingTime(1060);
104+
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
105+
106+
stepProcessingTime(testHarness, 1101, 1200, 50);
83107
expectedOutput.add(new Watermark(7));
84108
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
85109
}
86110

87111
@Test
88-
public void testIdleStateAvoidanceWithConsistentDataFlow() throws Exception {
112+
public void testWatermarkIntervalSmallerThanIdleTimeout() throws Exception {
113+
testIdleTimeout(1000, 50);
114+
}
115+
116+
@Test
117+
public void testIdleTimeoutSmallerThanWatermarkInterval() throws Exception {
118+
testIdleTimeout(50, 1000);
119+
}
120+
121+
private void testIdleTimeout(long idleTimeout, long watermarkInterval) throws Exception {
122+
long step = Math.min(idleTimeout, watermarkInterval);
89123
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
90-
createTestHarness(0, WATERMARK_GENERATOR, 1000);
91-
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
124+
createTestHarness(0, WATERMARK_GENERATOR, idleTimeout);
125+
testHarness.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
92126
testHarness.open();
93127

94128
ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
95-
List<Object> expectedOutput = new ArrayList<>();
96129

130+
long timeBetweenRecords = (long) (idleTimeout * 0.9);
97131
// Process elements at intervals less than idleTimeout (1000ms)
98132
for (long i = 1; i <= 10; i++) {
99-
long timestamp = i * 900;
133+
long timestamp = i * timeBetweenRecords;
100134
testHarness.processElement(new StreamRecord<>(GenericRowData.of(timestamp), timestamp));
101-
testHarness.setProcessingTime(timestamp);
102-
103-
// Expect a watermark for each element, lagging by 1
104-
expectedOutput.add(new Watermark(timestamp - 1));
135+
stepProcessingTime(testHarness, timestamp, timestamp + timeBetweenRecords - 1, step);
105136
}
106137

107138
// Check if the status ever becomes IDLE (it shouldn't)
108139
assertThat(extractWatermarkStatuses(output)).doesNotContain(WatermarkStatus.IDLE);
109-
assertThat(filterOutRecords(output)).isEqualTo(expectedOutput);
140+
}
141+
142+
private void stepProcessingTime(
143+
OneInputStreamOperatorTestHarness<?, ?> testHarness,
144+
long fromInclusive,
145+
long toInclusive,
146+
long step)
147+
throws Exception {
148+
for (long time = fromInclusive; time < toInclusive; time += step) {
149+
// incrementally fire processing time timers
150+
testHarness.setProcessingTime(time);
151+
}
152+
testHarness.setProcessingTime(toInclusive);
110153
}
111154

112155
@Test
@@ -238,11 +281,11 @@ public void testCustomizedWatermarkGenerator() throws Exception {
238281
expected.add(Watermark.MAX_WATERMARK);
239282

240283
// num_watermark + num_records
241-
assertThat(testHarness.getOutput()).hasSize(expected.size() + 11);
242284
List<Watermark> results = extractWatermarks(testHarness.getOutput());
243285
assertThat(results).isEqualTo(expected);
244286
assertThat(MyWatermarkGenerator.openCalled).isTrue();
245287
assertThat(MyWatermarkGenerator.closeCalled).isTrue();
288+
assertThat(testHarness.getOutput()).hasSize(expected.size() + 11);
246289
}
247290

248291
private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(

0 commit comments

Comments
 (0)