44import com .linkedin .venice .meta .Store ;
55import com .linkedin .venice .meta .Version ;
66import com .linkedin .venice .meta .VersionStatus ;
7- import com .linkedin .venice .utils .Pair ;
87import com .linkedin .venice .utils .concurrent .VeniceConcurrentHashMap ;
98import java .io .Closeable ;
109import java .util .ArrayList ;
2827public class DegradedModeRecoveryService implements Closeable {
2928 private static final Logger LOGGER = LogManager .getLogger (DegradedModeRecoveryService .class );
3029
31- static final int MAX_RETRIES = 3 ;
32- static final long READINESS_POLL_INTERVAL_MS = 5000 ;
33- static final int READINESS_POLL_MAX_ATTEMPTS = 60 ; // 5 min max
34- static final long DEFAULT_RECOVERY_COMPLETION_POLL_INTERVAL_MS = 30_000 ; // 30 seconds
35- static final int DEFAULT_RECOVERY_COMPLETION_POLL_MAX_ATTEMPTS = 720 ; // 6 hours max
3630 static final int DEFAULT_RECOVERY_THREAD_POOL_SIZE = 5 ;
3731
3832 private final Admin admin ;
@@ -42,9 +36,7 @@ public class DegradedModeRecoveryService implements Closeable {
4236 private final ExecutorService monitorExecutor ;
4337 private final ScheduledExecutorService degradedDcMonitor ;
4438 private final DegradedDcMonitor dcMonitor ;
45- private long recoveryCompletionPollIntervalMs = DEFAULT_RECOVERY_COMPLETION_POLL_INTERVAL_MS ;
46- private int recoveryCompletionPollMaxAttempts = DEFAULT_RECOVERY_COMPLETION_POLL_MAX_ATTEMPTS ;
47- private long retryBackoffBaseMs = READINESS_POLL_INTERVAL_MS ;
39+ private final StoreRecoveryExecutor storeRecoveryExecutor ;
4840
4941 public DegradedModeRecoveryService (Admin admin , DegradedModeStats stats ) {
5042 this (admin , stats , DEFAULT_RECOVERY_THREAD_POOL_SIZE , null );
@@ -57,6 +49,7 @@ public DegradedModeRecoveryService(
5749 VeniceControllerMultiClusterConfig multiClusterConfigs ) {
5850 this .admin = admin ;
5951 this .stats = stats ;
52+ this .storeRecoveryExecutor = new StoreRecoveryExecutor (admin , stats );
6053 int effectivePoolSize = Math .max (1 , threadPoolSize );
6154 this .recoveryExecutor = Executors .newFixedThreadPool (effectivePoolSize , runnable -> {
6255 Thread t = new Thread (runnable );
@@ -129,7 +122,9 @@ public void triggerRecovery(String clusterName, String datacenterName) {
129122 clusterName );
130123 List <Future <?>> futures = new ArrayList <>();
131124 for (RecoveryProgress .StoreVersionPair sv : affected ) {
132- futures .add (recoveryExecutor .submit (() -> recoverSingleStore (clusterName , datacenterName , sv , progress )));
125+ futures .add (
126+ recoveryExecutor
127+ .submit (() -> storeRecoveryExecutor .recoverSingleStore (clusterName , datacenterName , sv , progress )));
133128 }
134129
135130 // Submit monitor task to bounded pool instead of spawning raw threads
@@ -177,7 +172,8 @@ void confirmRecoveryAndTransitionVersions(String clusterName, String datacenterN
177172 for (RecoveryProgress .StoreVersionPair sv : initiatedStores ) {
178173 confirmFutures .add (recoveryExecutor .submit (() -> {
179174 try {
180- VersionPollResult result = pollUntilVersionCurrent (clusterName , sv , datacenterName );
175+ StoreRecoveryExecutor .VersionPollResult result =
176+ storeRecoveryExecutor .pollUntilVersionCurrent (clusterName , sv , datacenterName );
181177 switch (result ) {
182178 case CURRENT :
183179 admin .updateStoreVersionStatus (clusterName , sv .storeName , sv .version , VersionStatus .ONLINE );
@@ -241,64 +237,9 @@ void confirmRecoveryAndTransitionVersions(String clusterName, String datacenterN
241237 progress .getInitiatedStores ().clear ();
242238 }
243239
244- /** Polls until recovered version is current, superseded by a newer version, or timed out. */
245- VersionPollResult pollUntilVersionCurrent (
246- String clusterName ,
247- RecoveryProgress .StoreVersionPair storeVersion ,
248- String datacenterName ) throws InterruptedException {
249- long startMs = System .currentTimeMillis ();
250- long lastLogMs = startMs ;
251- long slowRecoveryThresholdMs = TimeUnit .MINUTES .toMillis (30 );
252- for (int i = 0 ; i < recoveryCompletionPollMaxAttempts ; i ++) {
253- int currentVersionInRegion = admin .getCurrentVersionInRegion (clusterName , storeVersion .storeName , datacenterName );
254- if (currentVersionInRegion == storeVersion .version ) {
255- return VersionPollResult .CURRENT ;
256- }
257- if (currentVersionInRegion > storeVersion .version ) {
258- LOGGER .info (
259- "Store {} v{} in datacenter {} superseded by newer version v{}. Recovery is moot." ,
260- storeVersion .storeName ,
261- storeVersion .version ,
262- datacenterName ,
263- currentVersionInRegion );
264- return VersionPollResult .SUPERSEDED ;
265- }
266- long nowMs = System .currentTimeMillis ();
267- long elapsedMs = nowMs - startMs ;
268- // Log progress every 5 minutes (time-based, not poll-count based)
269- if (nowMs - lastLogMs >= TimeUnit .MINUTES .toMillis (5 )) {
270- lastLogMs = nowMs ;
271- LOGGER .info (
272- "Waiting for store {} v{} to become current in datacenter: {} (elapsed: {} min)" ,
273- storeVersion .storeName ,
274- storeVersion .version ,
275- datacenterName ,
276- TimeUnit .MILLISECONDS .toMinutes (elapsedMs ));
277- }
278- // Warn once when recovery exceeds 30 minutes
279- if (elapsedMs > slowRecoveryThresholdMs
280- && elapsedMs - recoveryCompletionPollIntervalMs <= slowRecoveryThresholdMs ) {
281- LOGGER .warn (
282- "SLOW RECOVERY: Store {} v{} in datacenter {} has been polling for {} min." ,
283- storeVersion .storeName ,
284- storeVersion .version ,
285- datacenterName ,
286- TimeUnit .MILLISECONDS .toMinutes (elapsedMs ));
287- }
288- Thread .sleep (recoveryCompletionPollIntervalMs );
289- }
290- return VersionPollResult .TIMED_OUT ;
291- }
292-
293- enum VersionPollResult {
294- CURRENT , SUPERSEDED , TIMED_OUT
295- }
296-
297- // Visible for testing
240+ // Visible for testing — forwards to the executor.
298241 void setRecoveryCompletionPollParameters (long intervalMs , int maxAttempts ) {
299- this .recoveryCompletionPollIntervalMs = intervalMs ;
300- this .recoveryCompletionPollMaxAttempts = maxAttempts ;
301- this .retryBackoffBaseMs = intervalMs ;
242+ storeRecoveryExecutor .setRecoveryCompletionPollParameters (intervalMs , maxAttempts );
302243 }
303244
304245 List <RecoveryProgress .StoreVersionPair > findPartiallyOnlineStores (String clusterName ) {
@@ -321,151 +262,6 @@ List<RecoveryProgress.StoreVersionPair> findPartiallyOnlineStores(String cluster
321262 return result ;
322263 }
323264
324- void recoverSingleStore (
325- String clusterName ,
326- String datacenterName ,
327- RecoveryProgress .StoreVersionPair storeVersion ,
328- RecoveryProgress progress ) {
329- // Pre-check that the version still exists and is still PARTIALLY_ONLINE.
330- // PARTIALLY_ONLINE is also set by DeferredVersionSwapService and rollbacks, not just
331- // degraded-mode pushes. This check ensures we only recover versions that are genuinely
332- // stuck — DeferredVersionSwapService will have already transitioned its versions to ONLINE.
333- Store currentStore = admin .getStore (clusterName , storeVersion .storeName );
334- if (currentStore == null ) {
335- LOGGER .warn ("Store {} no longer exists. Skipping recovery." , storeVersion .storeName );
336- progress .incrementFailed ();
337- return ;
338- }
339- Version currentVersion = currentStore .getVersion (storeVersion .version );
340- if (currentVersion == null || currentVersion .getStatus () != VersionStatus .PARTIALLY_ONLINE ) {
341- LOGGER .info (
342- "Store {} v{} is no longer PARTIALLY_ONLINE (current status: {}). Skipping recovery." ,
343- storeVersion .storeName ,
344- storeVersion .version ,
345- currentVersion == null ? "deleted" : currentVersion .getStatus ());
346- // Count as recovered so progressFraction reaches 1.0 — the version no longer needs recovery
347- progress .incrementRecovered ();
348- return ;
349- }
350-
351- long recoveryStartMs = System .currentTimeMillis ();
352- for (int attempt = 0 ; attempt < MAX_RETRIES ; attempt ++) {
353- try {
354- String sourceFabric = resolveSourceFabric (clusterName , storeVersion .storeName );
355- LOGGER .debug (
356- "Recovering store {} v{} in datacenter {} from source fabric {} (attempt {}/{})" ,
357- storeVersion .storeName ,
358- storeVersion .version ,
359- datacenterName ,
360- sourceFabric ,
361- attempt + 1 ,
362- MAX_RETRIES );
363-
364- admin .prepareDataRecovery (
365- clusterName ,
366- storeVersion .storeName ,
367- storeVersion .version ,
368- sourceFabric ,
369- datacenterName ,
370- Optional .empty ());
371-
372- pollUntilReady (clusterName , sourceFabric , datacenterName , storeVersion );
373-
374- admin .initiateDataRecovery (
375- clusterName ,
376- storeVersion .storeName ,
377- storeVersion .version ,
378- sourceFabric ,
379- datacenterName ,
380- false ,
381- Optional .empty ());
382-
383- progress .incrementRecovered ();
384- progress .addInitiatedStore (storeVersion );
385- if (stats != null ) {
386- stats .recordRecoveryStoreSuccess (clusterName , storeVersion .storeName );
387- stats .recordRecoveryStoreDurationMs (
388- clusterName ,
389- storeVersion .storeName ,
390- System .currentTimeMillis () - recoveryStartMs );
391- }
392- LOGGER .info (
393- "Successfully initiated recovery for store {} v{} in datacenter {}" ,
394- storeVersion .storeName ,
395- storeVersion .version ,
396- datacenterName );
397- return ;
398- } catch (Exception e ) {
399- LOGGER .warn (
400- "Attempt {}/{} failed for store {} v{} in datacenter {}: {}" ,
401- attempt + 1 ,
402- MAX_RETRIES ,
403- storeVersion .storeName ,
404- storeVersion .version ,
405- datacenterName ,
406- e .getMessage ());
407- if (attempt == MAX_RETRIES - 1 ) {
408- LOGGER .error (
409- "All {} retries exhausted for store {} v{} in datacenter {}" ,
410- MAX_RETRIES ,
411- storeVersion .storeName ,
412- storeVersion .version ,
413- datacenterName ,
414- e );
415- } else {
416- // Linear backoff between retries
417- try {
418- Thread .sleep (retryBackoffBaseMs * (attempt + 1 ));
419- } catch (InterruptedException ie ) {
420- Thread .currentThread ().interrupt ();
421- break ;
422- }
423- }
424- }
425- }
426- progress .incrementFailed ();
427- if (stats != null ) {
428- stats .recordRecoveryStoreFailure (clusterName , storeVersion .storeName );
429- }
430- }
431-
432- private void pollUntilReady (
433- String clusterName ,
434- String sourceFabric ,
435- String datacenterName ,
436- RecoveryProgress .StoreVersionPair storeVersion ) throws InterruptedException {
437- for (int i = 0 ; i < READINESS_POLL_MAX_ATTEMPTS ; i ++) {
438- Pair <Boolean , String > readiness = admin .isStoreVersionReadyForDataRecovery (
439- clusterName ,
440- storeVersion .storeName ,
441- storeVersion .version ,
442- sourceFabric ,
443- datacenterName ,
444- Optional .empty ());
445- if (readiness .getFirst ()) {
446- return ;
447- }
448- LOGGER .debug (
449- "Store {} v{} not ready for recovery in datacenter {} (attempt {}/{}): {}" ,
450- storeVersion .storeName ,
451- storeVersion .version ,
452- datacenterName ,
453- i + 1 ,
454- READINESS_POLL_MAX_ATTEMPTS ,
455- readiness .getSecond ());
456- Thread .sleep (READINESS_POLL_INTERVAL_MS );
457- }
458- throw new RuntimeException (
459- "Timed out waiting for store " + storeVersion .storeName + " v" + storeVersion .version
460- + " to be ready for data recovery in datacenter " + datacenterName );
461- }
462-
463- String resolveSourceFabric (String clusterName , String storeName ) {
464- Store store = admin .getStore (clusterName , storeName );
465- Optional <String > emergencySourceRegion = admin .getEmergencySourceRegion (clusterName );
466- return admin .getNativeReplicationSourceFabric (clusterName , store , Optional .empty (), emergencySourceRegion , null );
467- }
468-
469265 private void logPostRecoveryActions (String clusterName , String datacenterName , RecoveryProgress progress ) {
470266 LOGGER .info (
471267 "Recovery summary for datacenter: {} in cluster: {} — Total: {}, Initiated: {}, Failed: {}, "
0 commit comments