Skip to content

Commit 3cb57c2

Browse files
authored
SAMZA-2801: fix the case when waterrmark is not aggregated when quorum is not met (#1707)
1 parent 50280c6 commit 3cb57c2

File tree

3 files changed

+58
-14
lines changed

3 files changed

+58
-14
lines changed

samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java

+8
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.apache.samza.metrics.Gauge;
2323
import org.apache.samza.metrics.MetricsBase;
2424
import org.apache.samza.metrics.MetricsRegistry;
25+
import org.apache.samza.system.SystemStream;
2526
import org.apache.samza.system.SystemStreamPartition;
2627

2728
import java.util.Map;
2829
import java.util.concurrent.ConcurrentHashMap;
2930

3031
class WatermarkMetrics extends MetricsBase {
3132
private final Map<SystemStreamPartition, Gauge<Long>> aggregates = new ConcurrentHashMap<>();
33+
private final Map<SystemStream, Gauge<Integer>> quorumCounts = new ConcurrentHashMap<>();
3234

3335
WatermarkMetrics(MetricsRegistry registry) {
3436
super("watermark-", registry);
@@ -40,4 +42,10 @@ void setAggregateTime(SystemStreamPartition systemStreamPartition, long time) {
4042
ssp.getStream(), ssp.getPartition().getPartitionId()), 0L));
4143
aggregate.set(time);
4244
}
45+
46+
void setQuorumCount(SystemStream stream, int quorumCount) {
47+
final Gauge<Integer> gauge = quorumCounts.computeIfAbsent(stream,
48+
ssp -> newGauge(String.format("%s-quorum-count", ssp.getStream()), 0));
49+
gauge.set(quorumCount);
50+
}
4351
}

samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java

+17-13
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,9 @@ synchronized void update(long timestamp, String taskName) {
8989
// we get watermark either from the source or from the aggregator task
9090
watermarkTime = Math.max(watermarkTime, timestamp);
9191
} else if (canUpdateWatermark(currentTime)) {
92-
final long minWatermark;
93-
if (watermarkIdleTimeout <= 0) {
94-
// All upstream tasks are required in the computation
95-
minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);
96-
} else {
92+
long minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);
93+
94+
if (minWatermark <= watermarkTime && watermarkIdleTimeout > 0) {
9795
// Exclude the tasks that have been idle in watermark emission.
9896
long min = Long.MAX_VALUE;
9997
long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
@@ -108,14 +106,11 @@ synchronized void update(long timestamp, String taskName) {
108106

109107
// Active tasks must exceed the quorum size
110108
minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST;
111-
112-
// Log the current quorum count
113-
if (this.quorumCount != updateCount) {
114-
this.quorumCount = updateCount;
115-
LOG.info("Current quorum count is {} for watermark aggregation, and the expected quorum size is {}",
116-
this.quorumCount, this.quorumSize);
117-
}
109+
quorumCount = updateCount;
110+
} else {
111+
quorumCount = timestamps.size();
118112
}
113+
119114
watermarkTime = Math.max(watermarkTime, minWatermark);
120115
}
121116
}
@@ -126,12 +121,16 @@ private boolean canUpdateWatermark(long currentTime) {
126121
// 2. we allow task idle in emitting watermarks and the idle time has passed.
127122
return (timestamps.size() == expectedTotal)
128123
// Handle the case we didn't receive the watermarks from some tasks since startup
129-
|| (watermarkIdleTimeout > 0 && currentTime - createTime > watermarkIdleTimeout);
124+
|| (watermarkIdleTimeout > 0 && currentTime - createTime >= watermarkIdleTimeout && timestamps.size() >= quorumSize);
130125
}
131126

132127
long getWatermarkTime() {
133128
return watermarkTime;
134129
}
130+
131+
int getQuorumCount() {
132+
return quorumCount;
133+
}
135134
}
136135

137136
private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
@@ -205,6 +204,11 @@ void updateAggregateMetric(SystemStreamPartition ssp, long time) {
205204
// Only report the aggregates watermarks for intermediate streams
206205
// to reduce the amount of metrics
207206
watermarkMetrics.setAggregateTime(ssp, time);
207+
208+
final WatermarkState state = watermarkStates.get(ssp);
209+
if (state != null && state.getQuorumCount() != 0) {
210+
watermarkMetrics.setQuorumCount(ssp.getSystemStream(), state.getQuorumCount());
211+
}
208212
}
209213
}
210214
}

samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ public void testIdle() {
134134
// Advance currentTime to pass the idle timeout
135135
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);
136136

137+
// Watermark is computed based on both task 0 and task 1
138+
watermarkMessage = new WatermarkMessage(4L, "task 1");
139+
watermarkStates.update(watermarkMessage, intPartition0);
140+
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
141+
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
142+
137143
// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
138144
watermarkMessage = new WatermarkMessage(5L, "task 1");
139145
watermarkStates.update(watermarkMessage, intPartition0);
@@ -196,14 +202,20 @@ public void testQuorum() {
196202
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
197203
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
198204

205+
// Watermark is computed based on both task 0 and task 1
206+
watermarkMessage = new WatermarkMessage(3L, "task 1");
207+
watermarkStates.update(watermarkMessage, intPartition0);
208+
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
209+
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
210+
199211
// Advance currentTime to pass the idle timeout
200212
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);
201213

202214
// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
203215
// Not meeting quorum
204216
watermarkMessage = new WatermarkMessage(5L, "task 1");
205217
watermarkStates.update(watermarkMessage, intPartition0);
206-
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
218+
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
207219
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
208220

209221
systemTime.advance(1);
@@ -215,6 +227,26 @@ public void testQuorum() {
215227
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
216228
}
217229

230+
@Test
231+
public void testStartup() {
232+
MockSystemTime systemTime = new MockSystemTime();
233+
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
234+
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime);
235+
236+
// Only one active task in the startup
237+
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
238+
watermarkStates.update(watermarkMessage, intPartition0);
239+
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
240+
241+
// Advance currentTime to pass the idle timeout
242+
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);
243+
244+
// Watermark will be soly computed based on task 0
245+
watermarkMessage = new WatermarkMessage(5L, "task 0");
246+
watermarkStates.update(watermarkMessage, intPartition0);
247+
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
248+
}
249+
218250
static class MockSystemTime implements LongSupplier {
219251
long currentTime = System.currentTimeMillis();
220252

0 commit comments

Comments
 (0)