Skip to content

Commit

Permalink
add largestUsedRTVersionNumber in store config
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Feb 11, 2025
1 parent 2a34ca2 commit d34bb0c
Show file tree
Hide file tree
Showing 15 changed files with 838 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ControllerApiConstants {
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
public static final String LARGEST_USED_VERSION_NUMBER = "largest_used_version_number";
public static final String LARGEST_USED_RT_VERSION_NUMBER = "largest_used_rt_version_number";
public static final String NUM_VERSIONS_TO_PRESERVE = "num_versions_to_preserve";
public static final String DISABLE_META_STORE = "disable_meta_store";
public static final String DISABLE_DAVINCI_PUSH_STATUS_STORE = "disable_davinci_push_status_store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,11 @@ public VersionResponse getStoreLargestUsedVersion(String clusterName, String sto
return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class);
}

public VersionResponse getStoreLargestUsedRTVersion(String clusterName, String storeName) {
QueryParams params = newParams().add(CLUSTER, clusterName).add(NAME, storeName);
return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class);
}

public RegionPushDetailsResponse getRegionPushDetails(String storeName, boolean isPartitionDetailEnabled) {
QueryParams params = newParams().add(NAME, storeName).add(PARTITION_DETAIL_ENABLED, isPartitionDetailEnabled);
return request(ControllerRoute.GET_REGION_PUSH_DETAILS, params, RegionPushDetailsResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,35 @@ public int getLargestUsedVersionNumber(String storeName) {
return largestUsedVersionNumber;
}

@Override
public int getLargestUsedRTVersionNumber(String storeName) {
if (VeniceSystemStoreUtils.isSystemStore(storeName)) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (systemStoreType != null && systemStoreType.isStoreZkShared()) {
String userStoreName = systemStoreType.extractRegularStoreName(storeName);
return getPerUserStoreSystemStoreLargestUsedRTVersionNumber(userStoreName, systemStoreType);
}
}

List<Store> stores = getStoreFromAllClusters(storeName);
if (stores.isEmpty()) {
// If store does NOT existing in graveyard, it means store has never been deleted, return 0 which is the default
// value of largestUsedRTVersionNumber for a new store.
return Store.NON_EXISTING_VERSION;
}
int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: stores) {
if (deletedStore.getLargestUsedRTVersionNumber() > largestUsedRTVersionNumber) {
largestUsedRTVersionNumber = deletedStore.getLargestUsedRTVersionNumber();
}
}
return largestUsedRTVersionNumber;
}

@Override
public void putStoreIntoGraveyard(String clusterName, Store store) {
int largestUsedVersionNumber = getLargestUsedVersionNumber(store.getName());
int largestUsedRTVersionNumber = getLargestUsedRTVersionNumber(store.getName());

if (store.isMigrating()) {
/**
Expand All @@ -88,7 +114,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
* P: C1:v3*, C2:null
* C: C1:v3*, C2:null
*
* After migration, both clusters shoud have the same store with same largest version and cluster discovery points to C2
* After migration, both clusters should have the same store with same largest version and cluster discovery points to C2
* P: C1:v3, C2:v3*
* C: C1:v3, C2:v3*
*
Expand All @@ -101,7 +127,7 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
* C: C1:v4, C2:null*
*
* Then I realized the error and want to delete the other store as well, but now I can't delete it because the largest
* version number (3) doesn't match with the one retrived from graveyard (4).
* version number (3) doesn't match with the one retrieved from graveyard (4).
* This check will address to this situation, and keep the largest version number in both graveyards the same.
*/
if (largestUsedVersionNumber > store.getLargestUsedVersionNumber()) {
Expand All @@ -112,23 +138,45 @@ public void putStoreIntoGraveyard(String clusterName, Store store) {
largestUsedVersionNumber);
store.setLargestUsedVersionNumber(largestUsedVersionNumber);
}
} else if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
if (largestUsedRTVersionNumber > store.getLargestUsedRTVersionNumber()) {
LOGGER.info(
"Increased largestUsedRTVersionNumber for migrating store {} from {} to {}.",
store.getName(),
store.getLargestUsedVersionNumber(),
largestUsedVersionNumber);
store.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}
} else {
if (store.getLargestUsedVersionNumber() < largestUsedVersionNumber) {
// largestUsedVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedVersionNumber: " + store.getLargestUsedVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
if (store.getLargestUsedRTVersionNumber() < largestUsedRTVersionNumber) {
// largestUsedRTVersion number in re-created store is smaller than the deleted store. It's should be a issue.
String errorMsg = "Invalid largestUsedRTVersionNumber: " + store.getLargestUsedRTVersionNumber() + " in Store: "
+ store.getName() + ", it's smaller than one found in graveyard: " + largestUsedRTVersionNumber;
LOGGER.error(errorMsg);
throw new VeniceException(errorMsg);
}
}

// Store does not exist in graveyard OR store already exists but the re-created store is deleted again so we need to
// update the ZNode.
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
updateZNode(clusterName, store);

LOGGER.info(
"Put store: {} into graveyard with largestUsedVersionNumber {}.",
store.getName(),
largestUsedVersionNumber);
}

void updateZNode(String clusterName, Store store) {
HelixUtils.update(dataAccessor, getStoreGraveyardPath(clusterName, store.getName()), store);
}

@Override
public Store getStoreFromGraveyard(String clusterName, String storeName, Stat stat) {
String path = getStoreGraveyardPath(clusterName, storeName);
Expand Down Expand Up @@ -156,7 +204,7 @@ public List<String> listStoreNamesFromGraveyard(String clusterName) {
* @return Matching store from each venice. Normally contains one element.
* If the store existed in some other cluster before, there will be more than one element in the return value.
*/
private List<Store> getStoreFromAllClusters(String storeName) {
List<Store> getStoreFromAllClusters(String storeName) {
List<Store> stores = new ArrayList<>();
for (String clusterName: clusterNames) {
Store store = dataAccessor.get(getStoreGraveyardPath(clusterName, storeName), null, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -196,11 +244,40 @@ private int getPerUserStoreSystemStoreLargestUsedVersionNumber(
return largestUsedVersionNumber;
}

int getPerUserStoreSystemStoreLargestUsedRTVersionNumber(
String userStoreName,
VeniceSystemStoreType systemStoreType) {
String systemStoreName = systemStoreType.getSystemStoreName(userStoreName);
List<Store> deletedStores = getStoreFromAllClusters(userStoreName);
if (deletedStores.isEmpty()) {
LOGGER.info(
"User store: {} does NOT exist in the store graveyard. Hence, no largest used RT version for its system store: {}",
userStoreName,
systemStoreName);
return Store.NON_EXISTING_VERSION;
}
int largestUsedRTVersionNumber = Store.NON_EXISTING_VERSION;
for (Store deletedStore: deletedStores) {
Map<String, SystemStoreAttributes> systemStoreNamesToAttributes = deletedStore.getSystemStores();
SystemStoreAttributes systemStoreAttributes =
systemStoreNamesToAttributes.get(VeniceSystemStoreType.getSystemStoreType(systemStoreName).getPrefix());
if (systemStoreAttributes != null) {
largestUsedRTVersionNumber =
Math.max(largestUsedRTVersionNumber, systemStoreAttributes.getLargestUsedRTVersionNumber());
}
}

if (largestUsedRTVersionNumber == Store.NON_EXISTING_VERSION) {
LOGGER.info("Can not find largest used RT version number for {}.", systemStoreName);
}
return largestUsedRTVersionNumber;
}

private String getGeneralStoreGraveyardPath() {
return getStoreGraveyardPath(PathResourceRegistry.WILDCARD_MATCH_ANY, PathResourceRegistry.WILDCARD_MATCH_ANY);
}

private String getStoreGraveyardPath(String clusterName, String storeName) {
String getStoreGraveyardPath(String clusterName, String storeName) {
return getStoreGraveyardParentPath(clusterName) + "/" + storeName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getCurrentVersion() {
return this.delegate.getCurrentVersion();
Expand Down Expand Up @@ -855,6 +865,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.delegate.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
throw new UnsupportedOperationException();
}

@Override
public long getStorageQuotaInByte() {
return this.delegate.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ static boolean isSystemStore(String storeName) {

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

long getStorageQuotaInByte();

void setStorageQuotaInByte(long storageQuotaInByte);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
*/
public interface StoreGraveyard {
/**
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store dose not
* Retrieve the largest used version number by the given store name from graveyard. Return 0 if the store does not
* exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedVersionNumber(String storeName);

/**
* Retrieve the largest used version number for the real time topic by the given store name from graveyard.
* Return 0 if the store does not exist in the graveyard, which is the default value we used for the new store.
*/
int getLargestUsedRTVersionNumber(String storeName);

/**
* Put the given store into graveyard. If the store has already existed in the graveyard, update it by this given
* store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
systemStoreAttributes.setLargestUsedVersionNumber(largestUsedVersionNumber);
}

@Override
public int getLargestUsedRTVersionNumber() {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(true);
return systemStoreAttributes.getLargestUsedRTVersionNumber();
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
SystemStoreAttributes systemStoreAttributes = fetchAndBackfillSystemStoreAttributes(false);
systemStoreAttributes.setLargestUsedRTVersionNumber(largestUsedRTVersionNumber);
}

@Override
public long getStorageQuotaInByte() {
return zkSharedStore.getStorageQuotaInByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ public interface SystemStoreAttributes extends DataModelBackedStructure<SystemSt

void setLargestUsedVersionNumber(int largestUsedVersionNumber);

int getLargestUsedRTVersionNumber();

void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber);

int getCurrentVersion();

void setCurrentVersion(int currentVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.dataModel.largestUsedVersionNumber = largestUsedVersionNumber;
}

@Override
public int getLargestUsedRTVersionNumber() {
return this.dataModel.largestUsedRTVersionNumber;
}

@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.dataModel.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

@Override
public int getCurrentVersion() {
return this.dataModel.currentVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
this.storeProperties.largestUsedVersionNumber = largestUsedVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public int getLargestUsedRTVersionNumber() {
return this.storeProperties.largestUsedRTVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public void setLargestUsedRTVersionNumber(int largestUsedRTVersionNumber) {
this.storeProperties.largestUsedRTVersionNumber = largestUsedRTVersionNumber;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(27, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(28, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading

0 comments on commit d34bb0c

Please sign in to comment.