Skip to content

Commit 84f4be8

Browse files
authored
[controller] Add DegradedModeRecoveryService, metrics, and E2E test (#2760)
DegradedModeRecoveryService 2-phase recovery orchestrator: Phase 1 initiates data recovery for all PARTIALLY_ONLINE stores in parallel (prepare → poll readiness → initiate). Phase 2 monitors child DC completion and transitions version status from PARTIALLY_ONLINE → ONLINE. Orphan detection: Periodic monitor detects PARTIALLY_ONLINE versions with no active recovery (e.g., after controller leader failover) and re-triggers recovery. The recovery flow is idempotent. Configurable concurrency: Bounded thread pools for recovery and monitoring, configurable via degraded.mode.recovery.thread.pool.size. Retry with exponential backoff: Failed store recoveries are retried up to 3 times. Version supersession handling: If a newer version becomes current during recovery polling, the old version is treated as successfully healed. DegradedModeStats (9 OTel metrics + latency histogram) recovery.store_success_count / recovery.store_failure_count recovery.version_transitioned_count recovery.progress (gauge, 0.0-1.0) push.auto_converted_count / push.blocked_incremental_count dc.active_count / dc.duration_minutes recovery.store_duration_ms (avg/max)
1 parent 2ad367e commit 84f4be8

42 files changed

Lines changed: 3459 additions & 589 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build.gradle

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,7 @@ subprojects {
328328
// Protocol changes should pin the current version via this override and remove the override in a follow-up PR
329329
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
330330
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
331-
def versionOverrides = [
332-
// AdminOperation v100 stages degradedDatacenters on AddVersion.
333-
// Pinned to the current active version until the Java wiring lands in a follow-up PR.
334-
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v99', PathValidation.DIRECTORY),
335-
// StoreMetaValue v45 stages isDegradedPush on StoreVersion (set by parent when a push is
336-
// auto-converted under degraded mode; recovery service uses it as the discriminator).
337-
// Pinned to the current active version until the Java wiring lands in a follow-up PR.
338-
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v44', PathValidation.DIRECTORY)
339-
]
331+
def versionOverrides = []
340332

341333
def schemaDirs = [sourceDir]
342334
sourceDir.eachDir { typeDir ->

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,21 @@ private ConfigKeys() {
591591
*/
592592
public static final String DEGRADED_MODE_ENABLED = "degraded.mode.enabled";
593593

594+
/**
595+
* Whether auto-recovery is enabled when a degraded DC is unmarked.
596+
* When true, the controller will automatically trigger data recovery for stores
597+
* with PARTIALLY_ONLINE versions after a DC is unmarked as degraded.
598+
*/
599+
public static final String DEGRADED_MODE_AUTO_RECOVERY_ENABLED = "degraded.mode.auto.recovery.enabled";
600+
601+
/**
602+
* Thread pool size for the degraded mode recovery service.
603+
* Controls how many store recoveries can run concurrently. Default is 5, which
604+
* supports typical clusters with tens of stores. Increase for clusters with hundreds
605+
* of stores to reduce total recovery wall-clock time.
606+
*/
607+
public static final String DEGRADED_MODE_RECOVERY_THREAD_POOL_SIZE = "degraded.mode.recovery.thread.pool.size";
608+
594609
/**
595610
* Whether stores are allowed to be migrated from/to a specific cluster.
596611
* The value for this config is read from cluster configs in Zk.

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,10 @@ public enum ControllerRoute implements VeniceDimensionInterface {
345345
),
346346
UNMARK_DC_DEGRADED(
347347
"/unmark_dc_degraded", HttpMethod.POST, Arrays.asList(CLUSTER, ControllerApiConstants.DATACENTER_NAME)
348-
), GET_DEGRADED_DCS("/get_degraded_dcs", HttpMethod.GET, Collections.singletonList(CLUSTER));
348+
), GET_DEGRADED_DCS("/get_degraded_dcs", HttpMethod.GET, Collections.singletonList(CLUSTER)),
349+
GET_RECOVERY_PROGRESS(
350+
"/get_recovery_progress", HttpMethod.GET, Arrays.asList(CLUSTER, ControllerApiConstants.DATACENTER_NAME)
351+
);
349352

350353
private final String path;
351354
private final HttpMethod httpMethod;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.linkedin.venice.controllerapi;
2+
3+
public class RecoveryProgressResponse extends ControllerResponse {
4+
/**
5+
* Indicates the state of the recovery operation.
6+
*/
7+
public enum RecoveryStatus {
8+
/** No recovery has been triggered for this datacenter. */
9+
NOT_FOUND,
10+
/** Recovery is currently in progress. */
11+
IN_PROGRESS,
12+
/** Recovery has completed (check failedStores for partial success). */
13+
COMPLETED
14+
}
15+
16+
private String datacenterName;
17+
private RecoveryStatus status;
18+
private int totalStores;
19+
private int recoveredStores;
20+
private int failedStores;
21+
private int versionsTransitioned;
22+
private boolean complete;
23+
private double progressFraction;
24+
25+
public String getDatacenterName() {
26+
return datacenterName;
27+
}
28+
29+
public void setDatacenterName(String datacenterName) {
30+
this.datacenterName = datacenterName;
31+
}
32+
33+
public RecoveryStatus getStatus() {
34+
return status;
35+
}
36+
37+
public void setStatus(RecoveryStatus status) {
38+
this.status = status;
39+
}
40+
41+
public int getTotalStores() {
42+
return totalStores;
43+
}
44+
45+
public void setTotalStores(int totalStores) {
46+
this.totalStores = totalStores;
47+
}
48+
49+
public int getRecoveredStores() {
50+
return recoveredStores;
51+
}
52+
53+
public void setRecoveredStores(int recoveredStores) {
54+
this.recoveredStores = recoveredStores;
55+
}
56+
57+
public int getFailedStores() {
58+
return failedStores;
59+
}
60+
61+
public void setFailedStores(int failedStores) {
62+
this.failedStores = failedStores;
63+
}
64+
65+
public int getVersionsTransitioned() {
66+
return versionsTransitioned;
67+
}
68+
69+
public void setVersionsTransitioned(int versionsTransitioned) {
70+
this.versionsTransitioned = versionsTransitioned;
71+
}
72+
73+
public boolean isComplete() {
74+
return complete;
75+
}
76+
77+
public void setComplete(boolean complete) {
78+
this.complete = complete;
79+
}
80+
81+
public double getProgressFraction() {
82+
return progressFraction;
83+
}
84+
85+
public void setProgressFraction(double progressFraction) {
86+
this.progressFraction = progressFraction;
87+
}
88+
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateClusterConfigQueryParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.venice.controllerapi;
22

3+
import static com.linkedin.venice.ConfigKeys.DEGRADED_MODE_ENABLED;
34
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ALLOW_STORE_MIGRATION;
45
import static com.linkedin.venice.controllerapi.ControllerApiConstants.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED;
56
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
@@ -56,6 +57,14 @@ public Optional<Boolean> getChildControllerAdminTopicConsumptionEnabled() {
5657
return getBoolean(CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED);
5758
}
5859

60+
public UpdateClusterConfigQueryParams setDegradedModeEnabled(boolean degradedModeEnabled) {
61+
return putBoolean(DEGRADED_MODE_ENABLED, degradedModeEnabled);
62+
}
63+
64+
public Optional<Boolean> getDegradedModeEnabled() {
65+
return getBoolean(DEGRADED_MODE_ENABLED);
66+
}
67+
5968
// ***************** above this line are getters and setters *****************
6069

6170
private UpdateClusterConfigQueryParams putBoolean(String name, boolean value) {

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyDegradedDcStatesRepository.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteDegradedDcStatesRepository.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

internal/venice-common/src/main/java/com/linkedin/venice/meta/DegradedDcInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
/**
88
* Represents metadata about a datacenter that has been marked as degraded.
9-
* Stored in a separate ZK node per cluster, not in LiveClusterConfig.
9+
* Stored as a value in {@link LiveClusterConfig#getDegradedDatacenters()} (one entry per DC).
1010
*/
1111
public class DegradedDcInfo {
1212
@JsonProperty("timestamp")

internal/venice-common/src/main/java/com/linkedin/venice/meta/DegradedDcStates.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

0 commit comments

Comments
 (0)