Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -422,37 +420,26 @@ private void asyncStart() {
});
}

/**
* Resets all instance CV states using Helix's bulk delete API.
* The bulk delete approach ensures all CV states are cleaned up regardless of local disk state.
*
* @param accessor The Helix partition status accessor
* @param storageService The storage service (kept for backward compatibility, may be removed in future)
* @param currentLogger Logger for diagnostic output
*/
static void resetAllInstanceCVStates(
HelixPartitionStatusAccessor accessor,
StorageService storageService,
Logger currentLogger) {
// Get all hosted stores
currentLogger.info("Started resetting all instance CV states");
Map<String, Set<Integer>> storePartitionMapping = storageService.getStoreAndUserPartitionsMapping();

// TODO: remove the log
currentLogger.info("Reset total {} of stores' CV states", storePartitionMapping.size());
storePartitionMapping.entrySet().stream().limit(10).forEach(entry -> {
String storeName = entry.getKey();
Set<Integer> partitionIds = entry.getValue();
currentLogger.info(
"Resetting store: {}, Total partitions: {}, First 10 partitions: {}",
storeName,
partitionIds.size(),
partitionIds.stream().limit(10).collect(java.util.stream.Collectors.toList()));
});

storePartitionMapping.forEach((storeName, partitionIds) -> {
partitionIds.forEach(partitionId -> {
try {
accessor.deleteReplicaStatus(storeName, partitionId);
} catch (Exception e) {
currentLogger
.error("Failed to delete CV state for resource: {} and partition id: {}", storeName, partitionId, e);
}
});
});
currentLogger.info("Finished resetting all instance CV states");
currentLogger.info("Started resetting all instance CV states via bulk delete");
try {
accessor.deleteAllCustomizedStates();
currentLogger.info("Finished resetting all instance CV states");
} catch (Exception e) {
currentLogger.error("Failed to reset all instance CV states", e);
throw e;
}
}

// test only
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package com.linkedin.davinci.helix;

import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.expectThrows;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.annotations.Test;
Expand All @@ -23,25 +20,33 @@ public class HelixParticipationServiceTest {
private static final Logger LOGGER = LogManager.getLogger(HelixParticipationServiceTest.class);

@Test
public void testRestAllInstanceCVStates() {
public void testResetAllInstanceCVStates() {
HelixPartitionStatusAccessor mockAccessor = mock(HelixPartitionStatusAccessor.class);
StorageService mockStorageService = mock(StorageService.class);
String resourceV1 = "test_resource_v1";
String resourceV2 = "test_resource_v2";
Set<Integer> partitionSet = new HashSet<>(Arrays.asList(1, 2, 3));
Map<String, Set<Integer>> storePartitionMapping = new HashMap<>();
storePartitionMapping.put(resourceV1, partitionSet);
storePartitionMapping.put(resourceV2, partitionSet);
doReturn(storePartitionMapping).when(mockStorageService).getStoreAndUserPartitionsMapping();

// Call the reset method
HelixParticipationService.resetAllInstanceCVStates(mockAccessor, mockStorageService, LOGGER);

verify(mockAccessor).deleteReplicaStatus(resourceV1, 1);
verify(mockAccessor).deleteReplicaStatus(resourceV1, 2);
verify(mockAccessor).deleteReplicaStatus(resourceV1, 3);
verify(mockAccessor).deleteReplicaStatus(resourceV2, 1);
verify(mockAccessor).deleteReplicaStatus(resourceV2, 2);
verify(mockAccessor).deleteReplicaStatus(resourceV2, 3);
// Verify that the bulk delete API is called instead of per-partition deletion
verify(mockAccessor).deleteAllCustomizedStates();
}

@Test
public void testResetAllInstanceCVStatesWithException() {
HelixPartitionStatusAccessor mockAccessor = mock(HelixPartitionStatusAccessor.class);
StorageService mockStorageService = mock(StorageService.class);

// Make the delete operation throw an exception
VeniceException testException = new VeniceException("Test exception during CV cleanup");
doThrow(testException).when(mockAccessor).deleteAllCustomizedStates();

// Call the reset method and expect it to throw the exception
expectThrows(
VeniceException.class,
() -> HelixParticipationService.resetAllInstanceCVStates(mockAccessor, mockStorageService, LOGGER));

// Verify that the bulk delete API was attempted
verify(mockAccessor).deleteAllCustomizedStates();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public void deleteReplicaStatus(HelixPartitionState stateType, String topic, Str
customizedStateProvider.deletePerPartitionCustomizedState(stateType.name(), topic, partitionName);
}

protected void deleteAllReplicaStatus(HelixPartitionState stateType) {
customizedStateProvider.deleteAllResourcesCustomizedStates(stateType.name());
}

public String getReplicaStatus(HelixPartitionState stateType, String topic, String partitionName) {
try {
return customizedStateProvider.getPerPartitionCustomizedState(stateType.name(), topic, partitionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,38 @@ public void deleteReplicaStatus(String topic, int partitionId) {
}
}

/**
* Deletes all customized states for this instance across all resources.
* This method follows the same logic as {@link #deleteReplicaStatus(String, int)}:
* - Always deletes OFFLINE_PUSH state
* - Only deletes HYBRID_STORE_QUOTA state if helixHybridStoreQuotaEnabled is true
*
* This uses the Helix {@link org.apache.helix.customizedstate.CustomizedStateProvider#deleteAllResourcesCustomizedStates(String)}
* API to remove all CV states.
*/
public void deleteAllCustomizedStates() {
/**
* We don't want to do the two delete operations atomically; Even if one delete fails,
* the other delete operation should still continue.
*/
try {
super.deleteAllReplicaStatus(HelixPartitionState.OFFLINE_PUSH);
} catch (NullPointerException e) {
LOGGER.warn("Failed to delete all resources with state type {}.", HelixPartitionState.OFFLINE_PUSH.name(), e);
}

if (helixHybridStoreQuotaEnabled) {
try {
super.deleteAllReplicaStatus(HelixPartitionState.HYBRID_STORE_QUOTA);
} catch (NullPointerException e) {
LOGGER.warn(
"Failed to delete all resources with state type {}.",
HelixPartitionState.HYBRID_STORE_QUOTA.name(),
e);
}
}
}

public ExecutionStatus getReplicaStatus(String topic, int partitionId) {
return ExecutionStatus.valueOf(
super.getReplicaStatus(HelixPartitionState.OFFLINE_PUSH, topic, getPartitionNameFromId(topic, partitionId)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.linkedin.venice.helix;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import org.apache.helix.HelixManager;
import org.apache.helix.customizedstate.CustomizedStateProvider;
import org.testng.annotations.Test;


public class HelixPartitionStatusAccessorTest {
@Test
public void testDeleteAllCustomizedStates() {
HelixManager mockHelixManager = mock(HelixManager.class);
CustomizedStateProvider mockProvider = mock(CustomizedStateProvider.class);

// Create accessor with hybrid quota disabled
HelixPartitionStatusAccessor accessor = new HelixPartitionStatusAccessor(mockHelixManager, "testInstance", false);

// Set the mocked provider
accessor.setCustomizedStateProvider(mockProvider);

// Call the method under test
accessor.deleteAllCustomizedStates();

// Verify that deleteAllResourcesCustomizedStates is called for OFFLINE_PUSH
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.OFFLINE_PUSH.name());

// Verify that it's NOT called for HYBRID_STORE_QUOTA since helixHybridStoreQuotaEnabled is false
verify(mockProvider, never()).deleteAllResourcesCustomizedStates(HelixPartitionState.HYBRID_STORE_QUOTA.name());
}

@Test
public void testDeleteAllCustomizedStatesWithHybridQuotaEnabled() {
HelixManager mockHelixManager = mock(HelixManager.class);
CustomizedStateProvider mockProvider = mock(CustomizedStateProvider.class);

// Create accessor with hybrid quota enabled
HelixPartitionStatusAccessor accessor = new HelixPartitionStatusAccessor(mockHelixManager, "testInstance", true);

// Set the mocked provider
accessor.setCustomizedStateProvider(mockProvider);

// Call the method under test
accessor.deleteAllCustomizedStates();

// Verify that deleteAllResourcesCustomizedStates is called for both state types
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.OFFLINE_PUSH.name());
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.HYBRID_STORE_QUOTA.name());
}

@Test
public void testDeleteAllCustomizedStatesWithExceptionOnOfflinePush() {
HelixManager mockHelixManager = mock(HelixManager.class);
CustomizedStateProvider mockProvider = mock(CustomizedStateProvider.class);

// Create accessor with hybrid quota enabled
HelixPartitionStatusAccessor accessor = new HelixPartitionStatusAccessor(mockHelixManager, "testInstance", true);

// Set the mocked provider
accessor.setCustomizedStateProvider(mockProvider);

// Make the OFFLINE_PUSH delete throw an exception
doThrow(new NullPointerException("Test exception")).when(mockProvider)
.deleteAllResourcesCustomizedStates(HelixPartitionState.OFFLINE_PUSH.name());

// Call the method under test - should not throw, exception should be caught and logged
accessor.deleteAllCustomizedStates();

// Verify that both deletes were attempted despite the exception
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.OFFLINE_PUSH.name());
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.HYBRID_STORE_QUOTA.name());
}

@Test
public void testDeleteAllCustomizedStatesWithExceptionOnHybridQuota() {
HelixManager mockHelixManager = mock(HelixManager.class);
CustomizedStateProvider mockProvider = mock(CustomizedStateProvider.class);

// Create accessor with hybrid quota enabled
HelixPartitionStatusAccessor accessor = new HelixPartitionStatusAccessor(mockHelixManager, "testInstance", true);

// Set the mocked provider
accessor.setCustomizedStateProvider(mockProvider);

// Make the HYBRID_STORE_QUOTA delete throw an exception
doThrow(new NullPointerException("Test exception")).when(mockProvider)
.deleteAllResourcesCustomizedStates(HelixPartitionState.HYBRID_STORE_QUOTA.name());

// Call the method under test - should not throw, exception should be caught and logged
accessor.deleteAllCustomizedStates();

// Verify that both deletes were attempted despite the exception
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.OFFLINE_PUSH.name());
verify(mockProvider).deleteAllResourcesCustomizedStates(HelixPartitionState.HYBRID_STORE_QUOTA.name());
}
}
Loading