Skip to content
6 changes: 1 addition & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,7 @@ subprojects {
// Protocol changes should pin the current version via this override and remove the override in a follow-up PR
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
// AdminOperation v100 stages degradedDatacenters on AddVersion.
// Pinned to the current active version until the Java wiring lands in a follow-up PR.
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v99', PathValidation.DIRECTORY)
]
def versionOverrides = []

def schemaDirs = [sourceDir]
sourceDir.eachDir { typeDir ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public class ControllerApiConstants {
public static final String TARGET_SWAP_REGION = "target_swap_region";
public static final String TARGET_SWAP_REGION_WAIT_TIME = "target_swap_region_wait_time";
public static final String IS_DAVINCI_HEARTBEAT_REPORTED = "is_davinci_heartbeat_reported";
public static final String TARGET_REGION_PROMOTED = "target_region_promoted";
public static final String GLOBAL_RT_DIV_ENABLED = "global_rt_div_enabled";
public static final String ENUM_SCHEMA_EVOLUTION_ALLOWED = "enum_schema_evolution_allowed";
public static final String STORE_LIFECYCLE_HOOKS_LIST = "store_lifecycle_hooks_list";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_REGION_PROMOTED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
Expand Down Expand Up @@ -953,6 +954,14 @@ public Optional<Boolean> getIsDavinciHeartbeatReported() {
return getBoolean(IS_DAVINCI_HEARTBEAT_REPORTED);
}

public UpdateStoreQueryParams setTargetRegionPromoted(boolean targetRegionPromoted) {
return putBoolean(TARGET_REGION_PROMOTED, targetRegionPromoted);
}

public Optional<Boolean> getTargetRegionPromoted() {
return getBoolean(TARGET_REGION_PROMOTED);
}

public UpdateStoreQueryParams setGlobalRtDivEnabled(boolean globalRtDivEnabled) {
return putBoolean(GLOBAL_RT_DIV_ENABLED, globalRtDivEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,18 @@ public void updateVersionStatus(int versionNumber, VersionStatus status) {
throw new VeniceException("Version:" + versionNumber + " does not exist");
}

@Override
public void setVersionTargetRegionPromoted(int versionNumber, boolean targetRegionPromoted) {
checkVersionSupplier();
for (int i = storeVersionsSupplier.getForUpdate().size() - 1; i >= 0; i--) {
Version version = new VersionImpl(storeVersionsSupplier.getForUpdate().get(i));
if (version.getNumber() == versionNumber) {
version.setTargetRegionPromoted(targetRegionPromoted);
return;
}
}
}

@Override
public int peekNextVersionNumber() {
int nextVersionNumber = getLargestUsedVersionNumber() + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,16 @@ public void setGlobalRtDivEnabled(boolean globalRtDivEnabled) {
throw new UnsupportedOperationException();
}

@Override
public void setTargetRegionPromoted(boolean targetRegionPromoted) {
throw new UnsupportedOperationException();
}

@Override
public boolean isTargetRegionPromoted() {
return delegate.isTargetRegionPromoted();
}

@Override
public void setKeyUrnCompressionEnabled(boolean keyUrnCompressionEnabled) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -1610,6 +1620,12 @@ public Version getVersion(int versionNumber) {
return version;
}

@Override
public void setVersionTargetRegionPromoted(int versionNumber, boolean targetRegionPromoted) {
// Delegate to the mutable store to bypass ReadOnlyVersion, which throws on setters.
this.delegate.setVersionTargetRegionPromoted(versionNumber, targetRegionPromoted);
}

@Override
@Nonnull
public Version getVersionOrThrow(int versionNumber) throws StoreVersionNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ default IntSet getVersionNumbers() {

void updateVersionStatus(int versionNumber, VersionStatus status);

default void setVersionTargetRegionPromoted(int versionNumber, boolean targetRegionPromoted) {
// No-op default; AbstractStore overrides this using storeVersionsSupplier.getForUpdate()
// (same pattern as updateVersionStatus) to bypass the ReadOnlyVersion wrapper returned by getVersion().
// ReadOnlyStore overrides to delegate to the mutable backing store.
}

int peekNextVersionNumber();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

boolean getIsDavinciHeartbeatReported();

void setTargetRegionPromoted(boolean targetRegionPromoted);

boolean isTargetRegionPromoted();

/**
* Get the replication metadata version id.
* @deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,16 @@ public boolean getIsDavinciHeartbeatReported() {
return this.storeVersion.isDaVinciHeartBeatReported;
}

@Override
public void setTargetRegionPromoted(boolean targetRegionPromoted) {
this.storeVersion.targetRegionPromoted = targetRegionPromoted;
}

@Override
public boolean isTargetRegionPromoted() {
return this.storeVersion.targetRegionPromoted;
}

@Override
public boolean isGlobalRtDivEnabled() {
return this.storeVersion.globalRtDivEnabled;
Expand Down Expand Up @@ -595,6 +605,7 @@ public Version cloneVersion() {
clonedVersion.setTargetSwapRegion(getTargetSwapRegion());
clonedVersion.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime());
clonedVersion.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported());
clonedVersion.setTargetRegionPromoted(isTargetRegionPromoted());
clonedVersion.setGlobalRtDivEnabled(isGlobalRtDivEnabled());
clonedVersion.setKeyUrnCompressionEnabled(isKeyUrnCompressionEnabled());
clonedVersion.setKeyUrnFields(getKeyUrnFields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.venice.utils.ConfigCommonUtils.ActivationState;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.systemstore.schemas.StoreETLConfig;
import com.linkedin.venice.systemstore.schemas.StoreHybridConfig;
Expand Down Expand Up @@ -358,4 +360,33 @@ public void testReadOnlyEtlConfigRejectsSetEtlActiveFabrics() {
ReadOnlyStore readOnly = new ReadOnlyStore(store);
readOnly.getEtlStoreConfig().setEtlActiveFabrics(Arrays.asList("dc-0"));
}

@Test
public void testSetVersionTargetRegionPromoted() {
ZKStore store = new ZKStore(
"testStore",
"testOwner",
System.currentTimeMillis(),
PersistenceType.ROCKS_DB,
RoutingStrategy.CONSISTENT_HASH,
ReadStrategy.ANY_OF_ONLINE,
OfflinePushStrategy.WAIT_ALL_REPLICAS,
3);
store.addVersion(new VersionImpl("testStore", 1, "push1"));

// Default is false
assertFalse(store.getVersion(1).isTargetRegionPromoted());

// AbstractStore implementation uses getForUpdate() so setters on the mutable Avro record persist
store.setVersionTargetRegionPromoted(1, true);
assertTrue(store.getVersion(1).isTargetRegionPromoted());

// Non-existent version is a no-op (no exception)
store.setVersionTargetRegionPromoted(99, true);

// ReadOnlyStore delegates to the mutable backing store
ReadOnlyStore readOnlyStore = new ReadOnlyStore(store);
readOnlyStore.setVersionTargetRegionPromoted(1, false);
assertFalse(store.getVersion(1).isTargetRegionPromoted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,38 @@ public void deserializeWithWrongFields() throws IOException {
// arbitrary value from constructor
}

@Test
public void testTargetRegionPromotedDefaultsFalse() {
Version version = new VersionImpl("store", 1, "pushJob");
assertFalse(version.isTargetRegionPromoted());
}

@Test
public void testTargetRegionPromotedGetterSetter() {
Version version = new VersionImpl("store", 1, "pushJob");
version.setTargetRegionPromoted(true);
assertTrue(version.isTargetRegionPromoted());

version.setTargetRegionPromoted(false);
assertFalse(version.isTargetRegionPromoted());
}

@Test
public void testCloneVersionCopiesTargetRegionPromoted() {
Version original = new VersionImpl("store", 1, "pushJob");
original.setTargetRegionPromoted(true);

Version clone = original.cloneVersion();
assertTrue(clone.isTargetRegionPromoted());
}

@Test
public void testTargetRegionPromotedDeserializesFromLegacyBlobWithoutField() throws IOException {
// Blobs without the field should deserialize to false (Avro default).
Version parsed = OBJECT_MAPPER.readValue(OLD_SERIALIZED, Version.class);
assertFalse(parsed.isTargetRegionPromoted());
}

@Test
public void testParseStoreFromRealTimeTopic() {
String validRealTimeTopic = "abc_rt";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,30 @@ public void testDvcDelayedIngestionWithTargetRegionSequentialRollout() throws Ex
});

client2.close();

// Verify targetRegionPromoted=true on parent and all child controllers.
// DeferredVersionSwapService sets the field once the target region's push completes
// and propagates it to children via the updateStore admin path.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
Optional<Version> parentVersion = parentControllerClient.getStore(storeName).getStore().getVersion(2);
Assert.assertTrue(parentVersion.isPresent(), "Version 2 must exist on parent");
Assert.assertTrue(
parentVersion.get().isTargetRegionPromoted(),
"Parent controller must set targetRegionPromoted=true after target region push completes");
});

for (VeniceMultiClusterWrapper childDatacenter: childDatacenters) {
try (ControllerClient childControllerClient =
new ControllerClient(CLUSTER_NAMES[0], childDatacenter.getControllerConnectString())) {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
Optional<Version> childVersion = childControllerClient.getStore(storeName).getStore().getVersion(2);
Assert.assertTrue(childVersion.isPresent(), "Version 2 must exist in child controller");
Assert.assertTrue(
childVersion.get().isTargetRegionPromoted(),
"Child controller must reflect targetRegionPromoted=true via admin message propagation");
});
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoClusterException;
import com.linkedin.venice.hooks.StoreLifecycleHooks;
Expand Down Expand Up @@ -1089,6 +1090,10 @@ private void performSequentialRollForward(
return;
}

if (!targetVersion.isTargetRegionPromoted()) {
markTargetRegionPromoted(cluster, storeName, targetVersionNum);
}

// Find next region to roll forward
String nextRegionToRollForward =
getNextRegionToRollForward(parentStore, targetVersionNum, cluster, kafkaTopicName, rolloutOrder);
Expand Down Expand Up @@ -1204,6 +1209,10 @@ private void performParallelRollForward(
return;
}

if (!targetVersion.isTargetRegionPromoted()) {
markTargetRegionPromoted(cluster, storeName, targetVersionNum);
}

// Get eligible non target regions to roll forward in
Set<String> nonTargetRegionsCompleted =
getRegionsToRollForward(remainingRegions, parentStore, targetVersionNum, cluster, kafkaTopicName);
Expand Down Expand Up @@ -1287,6 +1296,21 @@ public void updateStore(String clusterName, String storeName, VersionStatus stat
}
}

private void markTargetRegionPromoted(String clusterName, String storeName, int targetVersionNum) {
try {
LOGGER.info("Marking targetRegionPromoted=true for store: {} version: {}", storeName, targetVersionNum);
veniceParentHelixAdmin
.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setTargetRegionPromoted(true));
} catch (Exception e) {
LOGGER.warn(
"Failed to mark targetRegionPromoted for store: {} version: {} in cluster: {}",
storeName,
targetVersionNum,
clusterName,
e);
}
}

// Only used for testing
Cache<String, Map<String, Long>> getStorePushCompletionTimes() {
return storePushCompletionTimeCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6097,6 +6097,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<String> targetSwapRegion = params.getTargetSwapRegion();
Optional<Integer> targetSwapRegionWaitTime = params.getTargetRegionSwapWaitTime();
Optional<Boolean> isDavinciHeartbeatReported = params.getIsDavinciHeartbeatReported();
Optional<Boolean> targetRegionPromoted = params.getTargetRegionPromoted();
Optional<Boolean> globalRtDivEnabled = params.isGlobalRtDivEnabled();
Optional<Boolean> ttlRepushEnabled = params.isTTLRepushEnabled();
Optional<Boolean> enumSchemaEvolutionAllowed = params.isEnumSchemaEvolutionAllowed();
Expand Down Expand Up @@ -6560,6 +6561,20 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
return store;
}));

if (targetRegionPromoted.orElse(false)) {
storeMetadataUpdate(clusterName, storeName, (store, resources) -> {
int futureVersionNum = store.getLargestUsedVersionNumber();
Version futureVersion = store.getVersion(futureVersionNum);
if (futureVersion != null && !futureVersion.isTargetRegionPromoted()) {
// Use store.setVersionTargetRegionPromoted rather than futureVersion.setTargetRegionPromoted
// because storeMetadataUpdate may provide a ReadOnlyStore whose getVersion() wraps versions
// in ReadOnlyVersion (setters throw). The store-level method bypasses that wrapper.
store.setVersionTargetRegionPromoted(futureVersionNum, true);
}
return store;
});
}
Comment on lines +6564 to +6576

globalRtDivEnabled.ifPresent(aBool -> storeMetadataUpdate(clusterName, storeName, (store, resources) -> {
store.setGlobalRtDivEnabled(aBool);
return store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_LIFECYCLE_HOOKS_LIST;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_REGION_PROMOTED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
Expand Down Expand Up @@ -3385,6 +3386,13 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
.map(addToUpdatedConfigList(updatedConfigsList, IS_DAVINCI_HEARTBEAT_REPORTED))
.orElseGet((currStore::getIsDavinciHeartbeatReported));

setStore.targetRegionPromoted = params.getTargetRegionPromoted()
.map(addToUpdatedConfigList(updatedConfigsList, TARGET_REGION_PROMOTED))
.orElseGet(() -> {
Version futureVersion = currStore.getVersion(currStore.getLargestUsedVersionNumber());
return futureVersion != null && futureVersion.isTargetRegionPromoted();
});

setStore.globalRtDivEnabled = params.isGlobalRtDivEnabled()
.map(addToUpdatedConfigList(updatedConfigsList, GLOBAL_RT_DIV_ENABLED))
.orElseGet((currStore::isGlobalRtDivEnabled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ private void handleSetStore(UpdateStore message) {
.setNearlineProducerCountPerWriter(message.nearlineProducerCountPerWriter)
.setTargetRegionSwapWaitTime(message.targetSwapRegionWaitTime)
.setIsDavinciHeartbeatReported(message.isDaVinciHeartBeatReported)
.setTargetRegionPromoted(message.targetRegionPromoted)
.setGlobalRtDivEnabled(message.globalRtDivEnabled)
.setFlinkVeniceViewsEnabled(message.flinkVeniceViewsEnabled)
.setEnumSchemaEvolutionAllowed(message.enumSchemaEvolutionAllowed)
Expand Down
Loading
Loading