Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2801: fix the case when waterrmark is not aggregated when quorum is not met #1707

Merged
merged 9 commits into from
Aug 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsBase;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

class WatermarkMetrics extends MetricsBase {
private final Map<SystemStreamPartition, Gauge<Long>> aggregates = new ConcurrentHashMap<>();
private final Map<SystemStream, Gauge<Integer>> quorumCounts = new ConcurrentHashMap<>();

WatermarkMetrics(MetricsRegistry registry) {
super("watermark-", registry);
Expand All @@ -40,4 +42,10 @@ void setAggregateTime(SystemStreamPartition systemStreamPartition, long time) {
ssp.getStream(), ssp.getPartition().getPartitionId()), 0L));
aggregate.set(time);
}

void setQuorumCount(SystemStream stream, int quorumCount) {
final Gauge<Integer> gauge = quorumCounts.computeIfAbsent(stream,
ssp -> newGauge(String.format("%s-quorum-count", ssp.getStream()), 0));
gauge.set(quorumCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,9 @@ synchronized void update(long timestamp, String taskName) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
} else if (canUpdateWatermark(currentTime)) {
final long minWatermark;
if (watermarkIdleTimeout <= 0) {
// All upstream tasks are required in the computation
minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);
} else {
long minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);

if (minWatermark <= watermarkTime && watermarkIdleTimeout > 0) {
// Exclude the tasks that have been idle in watermark emission.
long min = Long.MAX_VALUE;
long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
Expand All @@ -108,14 +106,11 @@ synchronized void update(long timestamp, String taskName) {

// Active tasks must exceed the quorum size
minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST;

// Log the current quorum count
if (this.quorumCount != updateCount) {
this.quorumCount = updateCount;
LOG.info("Current quorum count is {} for watermark aggregation, and the expected quorum size is {}",
this.quorumCount, this.quorumSize);
}
quorumCount = updateCount;
} else {
quorumCount = timestamps.size();
}

watermarkTime = Math.max(watermarkTime, minWatermark);
}
}
Expand All @@ -126,12 +121,16 @@ private boolean canUpdateWatermark(long currentTime) {
// 2. we allow task idle in emitting watermarks and the idle time has passed.
return (timestamps.size() == expectedTotal)
// Handle the case we didn't receive the watermarks from some tasks since startup
|| (watermarkIdleTimeout > 0 && currentTime - createTime > watermarkIdleTimeout);
|| (watermarkIdleTimeout > 0 && currentTime - createTime >= watermarkIdleTimeout && timestamps.size() >= quorumSize);
}

long getWatermarkTime() {
return watermarkTime;
}

int getQuorumCount() {
return quorumCount;
}
}

private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
Expand Down Expand Up @@ -205,6 +204,11 @@ void updateAggregateMetric(SystemStreamPartition ssp, long time) {
// Only report the aggregates watermarks for intermediate streams
// to reduce the amount of metrics
watermarkMetrics.setAggregateTime(ssp, time);

final WatermarkState state = watermarkStates.get(ssp);
if (state != null && state.getQuorumCount() != 0) {
watermarkMetrics.setQuorumCount(ssp.getSystemStream(), state.getQuorumCount());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ public void testIdle() {
// Advance currentTime to pass the idle timeout
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);

// Watermark is computed based on both task 0 and task 1
watermarkMessage = new WatermarkMessage(4L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
Expand Down Expand Up @@ -196,14 +202,20 @@ public void testQuorum() {
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on both task 0 and task 1
watermarkMessage = new WatermarkMessage(3L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Advance currentTime to pass the idle timeout
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);

// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
// Not meeting quorum
watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

systemTime.advance(1);
Expand All @@ -215,6 +227,26 @@ public void testQuorum() {
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
}

@Test
public void testStartup() {
MockSystemTime systemTime = new MockSystemTime();
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime);

// Only one active task in the startup
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);

// Advance currentTime to pass the idle timeout
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);

// Watermark will be soly computed based on task 0
watermarkMessage = new WatermarkMessage(5L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
}

static class MockSystemTime implements LongSupplier {
long currentTime = System.currentTimeMillis();

Expand Down
Loading