From 50280c60e3835c922e139c2853a227d1cce1af8f Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Fri, 16 Aug 2024 09:48:07 -0700 Subject: [PATCH] SAMZA-2801: Support excluding tasks from watermark computation when exceeding idle time (#1705) --- .../org/apache/samza/config/TaskConfig.java | 16 ++ .../operators/impl/OperatorImplGraph.java | 9 +- .../samza/operators/impl/WatermarkStates.java | 85 ++++++++-- .../operators/impl/TestWatermarkStates.java | 147 ++++++++++++++++-- 4 files changed, 236 insertions(+), 21 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index b1a51e5220..0f168be18d 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -147,6 +147,14 @@ public class TaskConfig extends MapConfig { "task.transactional.state.retain.existing.state"; private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true; + // This config allows excluding the tasks that have been "idle" in generating watermark for the configured time, + // so that the watermarks will still be generated from other active tasks. + public static final String WATERMARK_IDLE_TIMEOUT_MS = "task.watermark.idle.timeout.ms"; + public static final long DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS = -1L; + // The quorum size required to generate watermarks when there are idle tasks. + public static final String WATERMARK_QUORUM_SIZE_PERCENTAGE = "task.watermark.quorum.size,percentage"; + public static final double DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE = 0.5; + public TaskConfig(Config config) { super(config); } @@ -402,4 +410,12 @@ public boolean getTransactionalStateRestoreEnabled() { public boolean getTransactionalStateRetainExistingState() { return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE); } + + public long getWatermarkIdleTimeoutMs() { + return getLong(WATERMARK_IDLE_TIMEOUT_MS, DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS); + } + + public double getWatermarkQuorumSizePercentage() { + return getDouble(WATERMARK_QUORUM_SIZE_PERCENTAGE, DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE); + } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index c62b0b2ec9..761fd30465 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -23,6 +23,7 @@ import com.google.common.collect.Multimap; import org.apache.samza.config.Config; import org.apache.samza.config.StreamConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.context.Context; import org.apache.samza.context.InternalTaskContext; import org.apache.samza.job.model.JobModel; @@ -100,7 +101,8 @@ public class OperatorImplGraph { */ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) { this.clock = clock; - StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig()); + Config config = context.getJobContext().getConfig(); + StreamConfig streamConfig = new StreamConfig(config); this.internalTaskContext = new InternalTaskContext(context); Map producerTaskCounts = hasIntermediateStreams(specGraph) @@ -117,9 +119,12 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts)); // set states for watermark; don't include side inputs (see SAMZA-2303) + TaskConfig taskConfig = new TaskConfig(config); internalTaskContext.registerObject(WatermarkStates.class.getName(), new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts, - context.getContainerContext().getContainerMetricsRegistry())); + context.getContainerContext().getContainerMetricsRegistry(), + taskConfig.getWatermarkIdleTimeoutMs(), + taskConfig.getWatermarkQuorumSizePercentage())); // set states for drain; don't include side inputs (see SAMZA-2303) internalTaskContext.registerObject(DrainStates.class.getName(), diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index 84e0687d6d..11957711dc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -24,8 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.function.LongSupplier; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemStream; @@ -37,6 +37,9 @@ /** * This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track * of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time. + * Lagging tasks can be excluded from the calculation by configuring "task.watermark.idle.timeout.ms", and watermarks + * will be computed from the other tasks. This will help unblock downstream aggregations, but at the risk of advancing + * event-time clock faster and events coming later from lagging tasks will become late arrivals. * * This class is thread-safe. However, having parallelism within a task may result in out-of-order processing * and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed. @@ -49,13 +52,28 @@ class WatermarkStates { private final static class WatermarkState { private final int expectedTotal; private final Map timestamps = new HashMap<>(); + private final Map lastUpdateTime = new HashMap<>(); + private final long watermarkIdleTimeout; + private final int quorumSize; + private final long createTime; + private final LongSupplier systemTimeFunc; private volatile long watermarkTime = WATERMARK_NOT_EXIST; + private volatile int quorumCount = 0; - WatermarkState(int expectedTotal) { + WatermarkState( + int expectedTotal, + long watermarkIdleTimeout, + double watermarkQuorumSizePercentage, + LongSupplier systemTimeFunc) { this.expectedTotal = expectedTotal; + this.watermarkIdleTimeout = watermarkIdleTimeout; + this.quorumSize = (int) (expectedTotal * watermarkQuorumSizePercentage); + this.systemTimeFunc = systemTimeFunc; + this.createTime = systemTimeFunc.getAsLong(); } synchronized void update(long timestamp, String taskName) { + long currentTime = systemTimeFunc.getAsLong(); if (taskName != null) { Long ts = timestamps.get(taskName); if (ts != null && ts > timestamp) { @@ -63,20 +81,54 @@ synchronized void update(long timestamp, String taskName) { timestamp, ts, taskName)); } else { timestamps.put(taskName, timestamp); + lastUpdateTime.put(taskName, currentTime); } } if (taskName == null) { // we get watermark either from the source or from the aggregator task watermarkTime = Math.max(watermarkTime, timestamp); - } else if (timestamps.size() == expectedTotal) { - // For any intermediate streams, the expectedTotal is the upstream task count. - // Check whether we got all the watermarks, and set the watermark to be the min. - Optional min = timestamps.values().stream().min(Long::compare); - watermarkTime = min.orElse(timestamp); + } else if (canUpdateWatermark(currentTime)) { + final long minWatermark; + if (watermarkIdleTimeout <= 0) { + // All upstream tasks are required in the computation + minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp); + } else { + // Exclude the tasks that have been idle in watermark emission. + long min = Long.MAX_VALUE; + long watermarkIdleThreshold = currentTime - watermarkIdleTimeout; + int updateCount = 0; + for (Map.Entry entry : timestamps.entrySet()) { + // Check the update happens before the idle timeout + if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) { + min = Math.min(min, entry.getValue()); + updateCount++; + } + } + + // Active tasks must exceed the quorum size + minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST; + + // Log the current quorum count + if (this.quorumCount != updateCount) { + this.quorumCount = updateCount; + LOG.info("Current quorum count is {} for watermark aggregation, and the expected quorum size is {}", + this.quorumCount, this.quorumSize); + } + } + watermarkTime = Math.max(watermarkTime, minWatermark); } } + private boolean canUpdateWatermark(long currentTime) { + // The watermark can be updated if + // 1. we received watermarks from all upstream tasks, or + // 2. we allow task idle in emitting watermarks and the idle time has passed. + return (timestamps.size() == expectedTotal) + // Handle the case we didn't receive the watermarks from some tasks since startup + || (watermarkIdleTimeout > 0 && currentTime - createTime > watermarkIdleTimeout); + } + long getWatermarkTime() { return watermarkTime; } @@ -86,16 +138,31 @@ long getWatermarkTime() { private final List intermediateSsps; private final WatermarkMetrics watermarkMetrics; + WatermarkStates( + Set ssps, + Map producerTaskCounts, + MetricsRegistry metricsRegistry, + long watermarkIdleTimeout, + double watermarkQuorumSizePercentage) { + this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout, + watermarkQuorumSizePercentage, System::currentTimeMillis); + } + + //Internal: test-only WatermarkStates( Set ssps, Map producerTaskCounts, - MetricsRegistry metricsRegistry) { + MetricsRegistry metricsRegistry, + long watermarkIdleTimeout, + double watermarkQuorumSizePercentage, + LongSupplier systemTimeFunc) { final Map states = new HashMap<>(); final List intSsps = new ArrayList<>(); ssps.forEach(ssp -> { final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0); - states.put(ssp, new WatermarkState(producerCount)); + states.put(ssp, + new WatermarkState(producerCount, watermarkIdleTimeout, watermarkQuorumSizePercentage, systemTimeFunc)); if (producerCount != 0) { intSsps.add(ssp); } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java index 2b41ed2bac..abc4a52df3 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -23,12 +23,16 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.function.LongSupplier; + import org.apache.samza.Partition; +import org.apache.samza.config.TaskConfig; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.WatermarkMessage; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -36,25 +40,39 @@ import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST; public class TestWatermarkStates { + static final long TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS = 600000; - @Test - public void testUpdate() { - SystemStream input = new SystemStream("system", "input"); - SystemStream intermediate = new SystemStream("system", "intermediate"); + private SystemStream input; + private SystemStream intermediate; + private Set ssps; + private SystemStreamPartition inputPartition0; + private SystemStreamPartition intPartition0; + private SystemStreamPartition intPartition1; + private Map producerCounts; + + @Before + public void setup() { + input = new SystemStream("system", "input"); + intermediate = new SystemStream("system", "intermediate"); - Set ssps = new HashSet<>(); - SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0)); - SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); - SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); + ssps = new HashSet<>(); + inputPartition0 = new SystemStreamPartition(input, new Partition(0)); + intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); + intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); ssps.add(inputPartition0); ssps.add(intPartition0); ssps.add(intPartition1); - Map producerCounts = new HashMap<>(); + producerCounts = new HashMap<>(); producerCounts.put(intermediate, 2); + } + @Test + public void testUpdate() { // advance watermark on input to 5 - WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap()); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS, + TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE); IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L); watermarkStates.update((WatermarkMessage) envelope.getMessage(), envelope.getSystemStreamPartition()); @@ -100,4 +118,113 @@ public void testUpdate() { // verify we got a watermark 6 (min) for int stream assertEquals(watermarkStates.getWatermark(intermediate), 6L); } + + @Test + public void testIdle() { + MockSystemTime systemTime = new MockSystemTime(); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime); + + // First watermark + WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // Advance currentTime to pass the idle timeout + systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); + + // Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout + watermarkMessage = new WatermarkMessage(5L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // Advance currentTime without exceeding the timeout + systemTime.advance(1); + + // Watermark is computed based on "task 0" since the currentTime already passes the idle threshold + watermarkMessage = new WatermarkMessage(6L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L); + assertEquals(watermarkStates.getWatermark(intermediate), 5L); + + // Watermark from "task 1" is less than current watermark, ignore + watermarkMessage = new WatermarkMessage(2L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L); + // verify we got a watermark (min) for int stream + assertEquals(watermarkStates.getWatermark(intermediate), 5L); + + // Advance currentTime without exceeding the timeout + systemTime.advance(1); + + // Watermark from "task 0" is updated, but less than current watermark + watermarkMessage = new WatermarkMessage(3L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L); + assertEquals(watermarkStates.getWatermark(intermediate), 5L); + + // Advance currentTime without exceeding the timeout + systemTime.advance(1); + + // Watermark is computed this currentTime due to advance in "task 0" + watermarkMessage = new WatermarkMessage(7L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L); + assertEquals(watermarkStates.getWatermark(intermediate), 5L); + + // Advance currentTime without exceeding the timeout + systemTime.advance(1); + + // Watermark is computed this currentTime due to advance in "task 1" + watermarkMessage = new WatermarkMessage(10L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 7L); + assertEquals(watermarkStates.getWatermark(intermediate), 6L); + } + + @Test + public void testQuorum() { + MockSystemTime systemTime = new MockSystemTime(); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, 1.0, systemTime); + + // First watermark + WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // Advance currentTime to pass the idle timeout + systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); + + // Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout + // Not meeting quorum + watermarkMessage = new WatermarkMessage(5L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + systemTime.advance(1); + + // Watermark from task 0, now quorum is met. + watermarkMessage = new WatermarkMessage(3L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + } + + static class MockSystemTime implements LongSupplier { + long currentTime = System.currentTimeMillis(); + + void advance(long delta) { + this.currentTime += delta; + } + + @Override + public long getAsLong() { + return currentTime; + } + } }