-
Notifications
You must be signed in to change notification settings - Fork 114
[controller] Cleanup stale replica statuses in Offlinepush znodes #2445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
sushantmane
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Left few comments
| // Get current instances assigned to this partition | ||
| Partition partition = partitionAssignment.getPartition(partitionId); | ||
| if (partition == null) { | ||
| LOGGER.warn("Partition {} not found in partition assignment for topic {}", partitionId, kafkaTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Utils.getReplicaId to log topic name and partition id together ?
|
|
||
| // Get set of currently assigned instance IDs | ||
| Set<String> currentInstanceIds = | ||
| partition.getAllInstancesSet().stream().map(Instance::getNodeId).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid streams APIs? This is control path but still good to not use as a general practice?
| replicaStatuses.stream().map(ReplicaStatus::getInstanceId).collect(Collectors.toSet()); | ||
|
|
||
| // Find stale replicas (in push status but not in current assignment) | ||
| Set<String> staleInstanceIds = new HashSet<>(existingInstanceIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we augment it with timestamp based checking as well to guard against possible race conditions?
| @Override | ||
| public void updatePartitionStatus(String kafkaTopic, PartitionStatus partitionStatus) { | ||
| if (!pushStatusExists(kafkaTopic)) { | ||
| LOGGER.warn("Push status does not exist for topic {}, skipping partition status update", kafkaTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this is a method in the accessor I'd throw a new exception here like VeniceNoPartitionStatusException here and let the caller decide how to handle it. e.g. cleanup task can choose to log warn but other potential future callers may want a different behavior.
| partitionId, | ||
| clusterName); | ||
| HelixUtils.update(partitionStatusAccessor, partitionStatusPath, partitionStatus); | ||
| LOGGER.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming we want to log this to provide an audit history. In that case wouldn't it be more useful to info the confirmed updates and debug the attempt log? WDYT?
| } | ||
|
|
||
| // Update the partition status in ZK with cleaned up replica statuses | ||
| offlinePushAccessor.updatePartitionStatus(kafkaTopic, updatedPartitionStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this vulnerable to race conditions? What happens in the following scenario?
Scenario 1:
- Get partition status from
offlinePushAccessor.getOfflinePushStatusAndItsPartitionStatuses(kafkaTopic);Let's say partition 0: [A, B, C, D] and D is stale. - Stale replica data won't change in the underlying partition status but new instances could have joined and made their updates to zk from server. e.g. [A, B, C, D, E]
- Cleanup is performed and we attempt to update via
offlinePushAccessor.updatePartitionStatus(kafkaTopic, updatedPartitionStatus);Which will override the partition status to [A, B, C]. We just lose E?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could explore if zk have something like compareAndSet or some sort of generation id to check and update only if data hasn't changed since last read by this writer.
LeakedPushStatusCleanUpService Enhanced LeakedPushStatusCleanUpService to clean up stale replica statuses from PartitionStatus znodes during rebalancing or node failures. This prevents unbounded growth of replica statuses in ZK that can lead to large znode sizes. - Added cleanupStaleReplicaStatuses() method to identify and remove replicas that are no longer assigned to partitions according to Helix - Added updatePartitionStatus() method to OfflinePushAccessor interface - Implemented updatePartitionStatus() in VeniceOfflinePushMonitorAccessor - Updated LeakedPushStatusCleanUpService constructor to accept RoutingDataRepository The cleanup runs as part of the existing background task and only removes replica statuses for instances not currently assigned to the partition. Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
83127ac to
003c3fa
Compare
Problem Statement
OfflinePushes ZNodes in ZooKeeper accumulate stale replica statuses over time, leading to unbounded growth of
PartitionStatusznodes.This occurs due to:
of the partition assignment
Root Cause
The
PartitionStatusclass maintains aMap<String, ReplicaStatus>(seePartitionStatus.java:21) where:updateReplicaStatus()when instances report their statusThis can lead to:
Example Scenario
Initial State (3 replicas):
Partition 0: [instance1, instance2, instance3]
After Rebalance:
Partition 0: [instance1, instance4, instance5] ← Current Helix assignment
But PartitionStatus still contains: [instance1, instance2, instance3, instance4, instance5]
^^^^^^^^^^^^^^^^^^^^^^^^^ STALE
Solution
Enhanced the existing
LeakedPushStatusCleanUpServiceto periodically clean up stale replica statuses by:Implementation Details
New Method:
cleanupStaleReplicaStatuses(String kafkaTopic)RoutingDataRepositoryOfflinePushStatuswith all partition statusesPartitionStatuswith only active replicasupdatePartitionStatus()methodNew Interface Method:
OfflinePushAccessor.updatePartitionStatus()VeniceOfflinePushMonitorAccessorHelixUtils.update()for atomic ZK updatesIntegration with Existing Service
LeakedPushStatusCleanUpServiceLEAKED_PUSH_STATUS_CLEAN_UP_SERVICE_SLEEP_INTERVAL_MS)RoutingDataRepositoryis available (gracefully handles null)Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?