Skip to content

Commit 224500f

Browse files
sixpluszeroclaude
andcommitted
[vpj][controller][common][test] Resolve external-storage dual-write per region
Problem: the VPJ external-storage dual-write path (added in #2821) made a single global decision -- it read one storageMode and dual-wrote (or not) identically for every region. With per-region external-storage sinks this cannot express "region A is DUAL_WRITE, region B is INTERNAL". This change resolves each region's store-level storageMode and fans out external writes only to the DUAL_WRITE regions: - New controller API getStorageModePerRegion (GET_PER_REGION_STORAGE_MODE): the parent fans out to each child region's store-level storageMode and returns Map<region, StorageMode>; a child controller returns its single region. Store-level (not the new version's value) is read so the result is available before the new version has propagated to child regions. - VPJ resolves the DUAL_WRITE region list and forwards it to executors via push.job.dual.write.target.regions, replacing the single push.job.target.storage.mode. - ExternalStorageWriter gains a region-aware configure(jobProps, topic, partition, region) overload; the 3-arg overload is deprecated and kept as a delegating default, so existing impls keep working (non-breaking). - DualWriteVeniceWriter holds one ExternalStorageWriter per DUAL_WRITE region and fans out batchPut/flush/close; a failure on any regional sink (after retries) fails the push so no external store goes partially populated. Testing: new multi-region e2e (TestVPJDualWriteExternalStorageMultiRegion) sets dc0 DUAL_WRITE and dc1 INTERNAL via regionsFilter and asserts only dc0's region-keyed sink is populated while dc1 stays empty and Venice still serves every record. The existing single-region e2e and the VPJ unit tests (including new fan-out tests) pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7b9f890 commit 224500f

21 files changed

Lines changed: 697 additions & 129 deletions

File tree

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
import com.linkedin.venice.jobs.DataWriterComputeJob;
88
import com.linkedin.venice.meta.BufferReplayPolicy;
99
import com.linkedin.venice.meta.HybridStoreConfig;
10-
import com.linkedin.venice.meta.StorageMode;
1110
import com.linkedin.venice.meta.Version;
1211
import com.linkedin.venice.schema.vson.VsonSchema;
1312
import com.linkedin.venice.vpj.VenicePushJobConstants;
1413
import java.io.Serializable;
14+
import java.util.Collections;
15+
import java.util.List;
1516
import java.util.Map;
1617
import java.util.Set;
1718
import org.apache.avro.Schema;
@@ -116,14 +117,15 @@ public class PushJobSetting implements Serializable {
116117
public boolean isRmdChunkingEnabled;
117118
public long storeStorageQuota;
118119
/**
119-
* Storage mode of the new version being pushed to, read from {@code Version.getStorageMode()} after the
120-
* controller creates the new version. The version's storageMode is fixed at creation time (the controller
121-
* copies the store-level value onto the version per linkedin/venice#2823), so reading the version's value
122-
* rather than the store-level value avoids a race where a concurrent UpdateStore mutates the store-level
123-
* value between job setup and version creation. Only populated when the VPJ-side dual-write writer-class
124-
* is configured; otherwise stays {@link StorageMode#INTERNAL} (the default).
120+
* Names of the regions whose store-level storage mode is {@code DUAL_WRITE} for this push, resolved at job
121+
* setup by querying each region's store-level storage mode through the (parent) controller. The partition
122+
* writer loads one {@code ExternalStorageWriter} per region in this list and writes the dataset to that
123+
* region's external-storage endpoint; regions absent from the list stay Kafka-only. Reading the store-level
124+
* value per region (rather than the new version's value) avoids a race against the new version not yet
125+
* being materialized in child regions when VPJ resolves it. Only populated when the VPJ-side dual-write
126+
* writer-class is configured; otherwise stays empty (dual-write off).
125127
*/
126-
public StorageMode targetStorageMode = StorageMode.INTERNAL;
128+
public List<String> dualWriteTargetRegions = Collections.emptyList();
127129
public boolean isSchemaAutoRegisterFromPushJobEnabled;
128130
public CompressionStrategy storeCompressionStrategy;
129131
public boolean isStoreWriteComputeEnabled;

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import com.linkedin.venice.controllerapi.ControllerResponse;
101101
import com.linkedin.venice.controllerapi.D2ControllerClientFactory;
102102
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
103+
import com.linkedin.venice.controllerapi.MultiRegionStorageModeResponse;
103104
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
104105
import com.linkedin.venice.controllerapi.RepushInfo;
105106
import com.linkedin.venice.controllerapi.RepushInfoResponse;
@@ -136,6 +137,7 @@
136137
import com.linkedin.venice.message.KafkaKey;
137138
import com.linkedin.venice.meta.BufferReplayPolicy;
138139
import com.linkedin.venice.meta.HybridStoreConfig;
140+
import com.linkedin.venice.meta.StorageMode;
139141
import com.linkedin.venice.meta.Store;
140142
import com.linkedin.venice.meta.StoreInfo;
141143
import com.linkedin.venice.meta.Version;
@@ -2639,15 +2641,14 @@ void createNewStoreVersion(
26392641
setting.rmdChunkingEnabled = setting.chunkingEnabled && setting.isRmdChunkingEnabled;
26402642
setting.kafkaSourceRegion = versionCreationResponse.getKafkaSourceRegion();
26412643

2642-
// Resolve the target storage mode from the *new version* (set by the controller at version-creation
2643-
// time per linkedin/venice#2823), not from the store-level value. The store-level value is mutable
2644-
// via the UpdateStore admin op and can drift between when VPJ first reads it and when the new
2645-
// version is created; the version's storageMode is fixed for the life of this push. Skip the extra
2646-
// round-trip when dual-write isn't even configured on the VPJ side — the gating predicate will
2647-
// already return false.
2644+
// Resolve which regions should dual-write to external storage by reading each region's store-level
2645+
// storage mode through the (parent) controller, keeping only the DUAL_WRITE regions. The store-level
2646+
// value is read per region (rather than the new version's value) because a just-created version may not
2647+
// yet have propagated to child regions when VPJ resolves this — the store-level value is already settled
2648+
// and is exactly what each child copies onto its version at creation. Skip the round-trip when
2649+
// dual-write isn't even configured on the VPJ side — the gating predicate will already return false.
26482650
if (!props.getString(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS, "").isEmpty()) {
2649-
Version newVersion = getStoreVersion(setting.storeName, setting.version);
2650-
setting.targetStorageMode = newVersion.getStorageMode();
2651+
setting.dualWriteTargetRegions = resolveDualWriteTargetRegions(setting.storeName);
26512652
}
26522653

26532654
// Detect degraded-mode push from controller response
@@ -3068,6 +3069,31 @@ private Version getStoreVersion(String storeName, int version) {
30683069
return newVersion.get();
30693070
}
30703071

3072+
/**
3073+
* Resolve the regions whose store-level storage mode is {@link StorageMode#DUAL_WRITE} for {@code
3074+
* storeName}. The (parent) controller fans out to each region; only {@code DUAL_WRITE} regions are kept,
3075+
* and the partition writer loads one external-storage writer per returned region. A controller error fails
3076+
* the push rather than silently skipping dual-write.
3077+
*/
3078+
private List<String> resolveDualWriteTargetRegions(String storeName) {
3079+
MultiRegionStorageModeResponse response = ControllerClient.retryableRequest(
3080+
controllerClient,
3081+
pushJobSetting.controllerRetries,
3082+
c -> c.getPerRegionStorageMode(storeName));
3083+
if (response.isError()) {
3084+
throw new VeniceException(
3085+
"Failed to resolve per-region storage mode for store: " + storeName + ", error: " + response.getError());
3086+
}
3087+
List<String> dualWriteRegions = new ArrayList<>();
3088+
for (Map.Entry<String, String> entry: response.getRegionToStorageMode().entrySet()) {
3089+
if (StorageMode.DUAL_WRITE.name().equals(entry.getValue())) {
3090+
dualWriteRegions.add(entry.getKey());
3091+
}
3092+
}
3093+
LOGGER.info("Resolved DUAL_WRITE target regions {} for store: {}", dualWriteRegions, storeName);
3094+
return dualWriteRegions;
3095+
}
3096+
30713097
private StoreResponse getStoreResponse(String storeName) {
30723098
return getStoreResponse(storeName, false);
30733099
}

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import static com.linkedin.venice.vpj.VenicePushJobConstants.KEY_FIELD_PROP;
3232
import static com.linkedin.venice.vpj.VenicePushJobConstants.MAP_REDUCE_PARTITIONER_CLASS_CONFIG;
3333
import static com.linkedin.venice.vpj.VenicePushJobConstants.PARTITION_COUNT;
34+
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_DUAL_WRITE_TARGET_REGIONS;
3435
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_PROP_PREFIX;
35-
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_TARGET_STORAGE_MODE;
3636
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_TO_SEPARATE_REALTIME_TOPIC;
3737
import static com.linkedin.venice.vpj.VenicePushJobConstants.REDUCER_SPECULATIVE_EXECUTION_ENABLE;
3838
import static com.linkedin.venice.vpj.VenicePushJobConstants.REPUSH_TTL_ENABLE;
@@ -163,7 +163,7 @@ private void setupDefaultJobConf(JobConf conf, PushJobSetting pushJobSetting, Ve
163163
conf.set(key, props.getString(key));
164164
}
165165
}
166-
conf.setInt(PUSH_JOB_TARGET_STORAGE_MODE, pushJobSetting.targetStorageMode.getValue());
166+
conf.set(PUSH_JOB_DUAL_WRITE_TARGET_REGIONS, String.join(",", pushJobSetting.dualWriteTargetRegions));
167167
conf.setBoolean(ALLOW_DUPLICATE_KEY, pushJobSetting.isDuplicateKeyAllowed);
168168
conf.setBoolean(VeniceWriter.ENABLE_CHUNKING, pushJobSetting.chunkingEnabled);
169169
conf.setBoolean(VeniceWriter.ENABLE_RMD_CHUNKING, pushJobSetting.rmdChunkingEnabled);

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_BATCHPUT_RETRY_BACKOFF_MS;
2424
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_BATCH_SIZE;
2525
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS;
26-
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_TARGET_STORAGE_MODE;
2726
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_TO_SEPARATE_REALTIME_TOPIC;
2827
import static com.linkedin.venice.vpj.VenicePushJobConstants.RMD_SCHEMA_DIR;
2928
import static com.linkedin.venice.vpj.VenicePushJobConstants.RMD_SCHEMA_ID_PROP;
@@ -53,7 +52,6 @@
5352
import com.linkedin.venice.hadoop.input.kafka.KafkaInputUtils;
5453
import com.linkedin.venice.hadoop.schema.HDFSSchemaSource;
5554
import com.linkedin.venice.hadoop.task.TaskTracker;
56-
import com.linkedin.venice.meta.StorageMode;
5755
import com.linkedin.venice.meta.Store;
5856
import com.linkedin.venice.meta.Version;
5957
import com.linkedin.venice.meta.VersionImpl;
@@ -92,8 +90,10 @@
9290
import java.io.IOException;
9391
import java.nio.ByteBuffer;
9492
import java.time.Clock;
93+
import java.util.ArrayList;
9594
import java.util.Arrays;
9695
import java.util.Iterator;
96+
import java.util.List;
9797
import java.util.Map;
9898
import java.util.Properties;
9999
import java.util.concurrent.Executors;
@@ -602,16 +602,17 @@ protected AbstractVeniceWriter<byte[], byte[], byte[]> createBasicVeniceWriter()
602602
private AbstractVeniceWriter<byte[], byte[], byte[]> maybeDecorateForDualWriteToExternalStorage(
603603
AbstractVeniceWriter<byte[], byte[], byte[]> baseWriter,
604604
String topicName) {
605-
StorageMode targetStorageMode =
606-
StorageMode.valueOf(props.getInt(PUSH_JOB_TARGET_STORAGE_MODE, StorageMode.INTERNAL.getValue()));
607-
if (!ExternalStorageWriteUtils.isDualWriteToExternalStorageFromVpjEnabled(props, targetStorageMode)) {
605+
if (!ExternalStorageWriteUtils.isDualWriteToExternalStorageFromVpjEnabled(props)) {
608606
return baseWriter;
609607
}
608+
List<String> dualWriteRegions = ExternalStorageWriteUtils.getDualWriteTargetRegions(props);
610609
// From here on, anything that throws must release the already-constructed Kafka-side writers
611610
// ({@code baseWriter} for the no-views case; {@code mainWriter} + {@code childWriters} for the
612-
// composite-view case). The reflective loader closes the external writer itself on configure failure,
613-
// but the Kafka side isn't protected by it. Wrap the whole decoration so a retry-prone Spark task
614-
// doesn't pile up Kafka producer leaks across attempts.
611+
// composite-view case) AND any per-region external writers already loaded in this method. The reflective
612+
// loader closes the external writer it is constructing on configure failure, but earlier successfully
613+
// loaded regional writers and the Kafka side aren't protected by it. Wrap the whole decoration so a
614+
// retry-prone Spark task doesn't pile up producer/connection leaks across attempts.
615+
List<ExternalStorageWriter> externalWriters = new ArrayList<>(dualWriteRegions.size());
615616
try {
616617
// Validate the buffer threshold and retry policy before touching the impl so a bad config can't
617618
// allocate (and then leak) external resources.
@@ -633,27 +634,35 @@ private AbstractVeniceWriter<byte[], byte[], byte[]> maybeDecorateForDualWriteTo
633634
}
634635
String writerClassName = props.getString(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS);
635636
int partitionId = getEngineTaskConfigProvider().getTaskId();
636-
// loadAndConfigure validates the class implements ExternalStorageWriter, instantiates it, and closes
637-
// the impl on configure() failure so partially-initialized writers don't leak.
638-
ExternalStorageWriter externalWriter =
639-
ExternalStorageWriteUtils.loadAndConfigure(writerClassName, props, topicName, partitionId);
637+
// One external writer per DUAL_WRITE region; each is configured with its region name so the impl
638+
// routes to that region's endpoint. loadAndConfigure validates the class implements
639+
// ExternalStorageWriter, instantiates it, and closes the impl on configure() failure so
640+
// partially-initialized writers don't leak.
641+
for (String region: dualWriteRegions) {
642+
externalWriters
643+
.add(ExternalStorageWriteUtils.loadAndConfigure(writerClassName, props, topicName, partitionId, region));
644+
}
640645
LOGGER.info(
641-
"Dual-write to external storage enabled for topic {} partition {} via impl {} "
646+
"Dual-write to external storage enabled for topic {} partition {} via impl {} for regions {} "
642647
+ "(batchSize={}, batchPutRetries={}, batchPutRetryBackoffMs={})",
643648
topicName,
644649
partitionId,
645650
writerClassName,
651+
dualWriteRegions,
646652
batchSize,
647653
batchPutRetries,
648654
batchPutRetryBackoffMs);
649655
return new DualWriteVeniceWriter(
650656
topicName,
651657
baseWriter,
652-
externalWriter,
658+
externalWriters,
653659
batchSize,
654660
batchPutRetries,
655661
batchPutRetryBackoffMs);
656662
} catch (RuntimeException t) {
663+
for (ExternalStorageWriter externalWriter: externalWriters) {
664+
Utils.closeQuietlyWithErrorLogged(externalWriter);
665+
}
657666
Utils.closeQuietlyWithErrorLogged(baseWriter);
658667
if (mainWriter != null) {
659668
Utils.closeQuietlyWithErrorLogged(mainWriter);

0 commit comments

Comments
 (0)