Conversation
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3287 +/- ##
==========================================
- Coverage 81.10% 80.91% -0.19%
==========================================
Files 316 316
Lines 72304 72770 +466
==========================================
+ Hits 58639 58884 +245
- Misses 13107 13331 +224
+ Partials 558 555 -3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
The existing watermark system used two separate KV buckets for tracking watermark progression:
Offset-Timeline (OT) Bucket: Stored watermark-offset mappings to track message progression through the pipeline.
Heartbeat (HB) Bucket: A dedicated bucket solely for tracking processor liveness, processors would periodically write to this bucket to signal they're alive.
This dual-bucket architecture introduced several challenges:
Synchronization Complexity: Having two sources of truth for processor state created race conditions and synchronization issues. A processor could appear "alive" in the heartbeat bucket while its OT entries were stale, or vice versa. The system had to reconcile state from both buckets, adding complexity to processor lifecycle management.
No Single Source of Truth: When determining if a processor is alive and what its current watermark is, the system had to consult both buckets. This made reasoning about the system harder and introduced edge cases where the two buckets could be out of sync.
Infrastructure Overhead: Each watermark-enabled edge required creating, managing, and monitoring an extra KV bucket. This doubled the number of buckets in the system for watermark tracking.
Additionally, for source watermarks, there was no mechanism to know how many processors (partitions) should be reporting before the watermark could be considered valid. During startup or rebalancing, fetching watermarks before all processors were up would result in incorrect watermark values.
Solution
Unified Bucket with KV Creation Timestamps: Instead of maintaining a separate heartbeat bucket, we now use the KV store's native entry creation timestamp (
KVEntry.created) as the liveness signal. When a processor publishes aWMBto the OT bucket, the KV store records when that entry was created. TheProcessorManageruses this timestamp to determine if a processor is still alive, if the entry is too old, the processor is marked inactive or deleted.Single Source of Truth: All processor state (watermark, offset, and liveness) now comes from a single OT bucket. The ProcessorManager watches this bucket for updates and uses the entry timestamps to track processor health. No reconciliation between buckets is needed.
Processor Count in WMB for Source Watermarks: Sources now include partition information in the WMB itself:
processor_countindicating the total number of partitions.SourceWatermarkFetcherreads the maximumprocessor_countfrom all active processors' head WMBs.Periodic Publishing for Liveness : The ISBWatermarkPublisher now publishes WMBs periodically (based on a configurable delay) even when the watermark hasn't changed. This ensures the KV entry timestamps stay fresh, allowing downstream vertices to detect processor liveness without a separate heartbeat mechanism.
Key Changes
ProcessorManagerdetermines liveness using KVEntry.created timestamp instead of watching a separate HB bucket.WMBstruct now includes an optionalprocessor_countfield for source watermarks.SourceWatermarkFetcherwaits for all expected processors (based onprocessor_countin WMBs) before computing a valid watermark.active_partitionsandtotal_partitions, enabling proper watermark coordination.SourceIdleDetectorhandles both heartbeat publishing (via periodic WMB updates) and idle detection.Testing