-
Notifications
You must be signed in to change notification settings - Fork 114
[server] Introduce record level delay with heartbeat delay #2399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Why is this important? This seems like an awfully big expense for dubious observability gain. |
Hi Zac - we want to measure the true end to end replication latency and enforce a sla for it. Heartbeats are only emitted every 60s and this metric may not be an accurate representation if we do not process a heartbeat for a few mins, but are actually continuing to process records between the heartbeats |
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Whether this timestamp entry was consumed from input or if the system initialized it as a default entry | ||
| */ | ||
| public final boolean consumedFromUpstream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unnecessary to me. I know this is existing code but why can't we use sentinel values for timestamp to achieve the same goal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it is possible to achieve the same goal by using sentinel values for the timestamp but we will have to update existing code to account for this sentinel value and I feel that it affects readability in the long term. Since we also made a change to reuse the same IngestionTimestampEntry, the extra space is uses shouldn't be too bad
...src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a ton, @misyel for the changes. Your changes LGTM. But the current state of HeartbeatMonitoringService class is not great. I've left some comments to improve it. Let me know what do you think!
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java
Outdated
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
| leaderProducedRecordContext, | ||
| currentTimeMs); | ||
| // Record regular record timestamp for heartbeat monitoring if enabled | ||
| if (recordLevelTimestampEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just throwing it out: should we fold this into if (recordLevelMetricEnabled.get()) {?
...src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java
Outdated
Show resolved
Hide resolved
| * is acceptable. The cached keys are then reused on the per-record hot path to avoid repeated allocation | ||
| * and hash computation. | ||
| */ | ||
| private void refreshCachedHeartbeatKeys(PartitionConsumptionState pcs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required? We are not embedding role of the replica in HB key, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java
Outdated
Show resolved
Hide resolved
| result.put(replicaId + "-" + region.getKey(), replicaHeartbeatInfo); | ||
| } | ||
| } | ||
| for (Map.Entry<HeartbeatKey, IngestionTimestampEntry> entry: heartbeatTimestampMap.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be optimized IMO. We can all the PCS's for given version topic from SIT and then only get the entries for keys inside those PCS entries
| * should be able to tell us all the lag information. | ||
| */ | ||
| for (Map.Entry<String, HeartbeatTimeStampEntry> entry: replicaTimestampMap.entrySet()) { | ||
| for (Map.Entry<HeartbeatKey, IngestionTimestampEntry> entry: getLeaderHeartbeatTimeStamps().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to iterate through all of the keys? We already have handle to partitionConsumptionState. We can just iterate through subset of keys (3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please check other places where we are iterating through the whole map. We should not do that. Almost all lookups in this class should happen with PCS as an arg
Problem Statement
Heartbeats are sent every 60 seconds and it will not always represent the replication lag that the system is operating at. This PR introduces record level delay tracking alongside heartbeat delay.
Solution
This PR introduces record-level timestamp tracking for Venice's ingestion pipeline, providing more granular visibility into data freshness and ingestion delays. The implementation enhances the existing heartbeat monitoring system to track timestamps for individual data records in addition to periodic heartbeat control messages.
Key Features
SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED: Enable record-level timestamp tracking
SERVER_PER_RECORD_OTEL_METRICS_ENABLED: Enable per-record OTel metrics emission
Implementation Details
Suggested Review Order
Configuration and Data Model (Start here for context)
Core Implementation
Integration with Ingestion Pipeline
Tests
Configuration Integration
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
New unit tests added.
New integration tests added.
Modified or extended existing tests.
Verified backward compatibility (if applicable).
Unit tests for both heartbeat and record-level timestamp tracking
Tests for OTel metrics emission
Tests for different configuration combinations
Does this PR introduce any user-facing or breaking changes?