[vpj][controller][common][test] Resolve external-storage dual-write per region#2838
[vpj][controller][common][test] Resolve external-storage dual-write per region#2838sixpluszero wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds per-region resolution of StorageMode for VPJ external-storage dual-write, so only regions whose store-level mode is DUAL_WRITE receive external-sink writes (instead of the previous global all-or-nothing behavior).
Changes:
- Introduces a new controller API/route to fetch per-region store-level
StorageMode(GET_PER_REGION_STORAGE_MODE). - Updates VPJ driver + executor plumbing to forward
push.job.dual.write.target.regionsand to fan out writes to oneExternalStorageWriterinstance per target region. - Expands unit/integration test coverage to validate multi-region behavior and region-aware sink population.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java | Parent implementation fans out to child regions to resolve store-level StorageMode per region. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Child implementation returns local region’s store-level StorageMode. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java | Adds the HTTP route handler that serializes per-region modes into the response payload. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java | Registers the new controller GET endpoint. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java | Adds the getStorageModePerRegion API to the Admin interface. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestVPJDualWriteExternalStorageMultiRegion.java | New multi-region E2E test validating only DUAL_WRITE regions populate the external sink. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/InMemoryExternalStorageWriter.java | Makes the integration-test external sink region-aware via (region, topic) keying and new accessors. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/MultiRegionStorageModeResponse.java | New controller API response type carrying region -> StorageModeName. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java | Adds GET_PER_REGION_STORAGE_MODE route constant. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java | Adds getPerRegionStorageMode() client method. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/vpj/ExternalStorageWriteUtilsTest.java | Updates gating tests and adds parsing tests for the region list config. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/task/datawriter/DualWriteVeniceWriterTest.java | Adds tests for multi-writer fan-out and failure propagation behavior. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java | Replaces the single-mode key with push.job.dual.write.target.regions. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/ExternalStorageWriteUtils.java | Updates dual-write gating to depend on non-empty target region list; adds parsing + region-aware configure helper. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java | Forwards the resolved target-region list into Spark executor task config. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Resolves per-region modes via controller API and stores DUAL_WRITE region list in job settings. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/ExternalStorageWriter.java | Adds region-aware configure overload with defaults to preserve source compatibility. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DualWriteVeniceWriter.java | Supports multiple external writers and fans out each batch to all regional sinks before Kafka produce. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Loads/configures one external writer per target region and wraps with the multi-writer dual-write decorator. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java | Replaces single target storage mode with dualWriteTargetRegions list in push settings. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java | Forwards the resolved target-region list into MR executor task config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
224500f to
5efbbbc
Compare
5efbbbc to
d9aac33
Compare
d9aac33 to
dc494b2
Compare
dc494b2 to
e7cd0d0
Compare
e7cd0d0 to
e3601b7
Compare
| * surfaces the original failure with the {@link InterruptedException} suppressed. | ||
| */ | ||
| private void batchPutWithRetry(List<ExternalStorageRecord> records) { | ||
| private void batchPutWithRetry(ExternalStorageWriter externalWriter, List<ExternalStorageRecord> records) { |
There was a problem hiding this comment.
3 — Metric for retry success (suggestion)
The LOGGER.info("externalWriter.batchPut succeeded on attempt {}/{}") a few lines below is good for incident triage, but it's a one-off log line. Worth emitting a counter too (external_storage.batch_put.retry_succeeded or similar) so operators see retry pressure as a trend on the dashboard rather than having to grep logs across executors. The total-failure path (the final throw lastError at the end of the method) could emit a sibling counter (external_storage.batch_put.exhausted) for the same reason.
There was a problem hiding this comment.
Thanks — good call on surfacing retry pressure as a trend rather than a log grep.
I looked at wiring this into VPJ's counter system. The partition writer emits operational counters through DataWriterTaskTracker (Spark-accumulator and MR-Reporter backends), but it's a closed/enumerated set — adding external_storage.batch_put.retry_succeeded / ...exhausted the idiomatic way means extending DataWriterTaskTracker, DataWriterAccumulators, SparkDataWriterTaskTracker, and MRJobCounterHelper + ReporterBackedMapReduceDataWriterTaskTracker, plus threading a tracker into DualWriteVeniceWriter (which currently takes none). That's a ~5–6 file change spanning both engines.
Given this PR is already sizable, I'd prefer to land it as a focused follow-up rather than expand scope here, and keep the current LOGGER lines in the meantime. Happy to do it in this PR instead if you'd prefer — let me know.
e3601b7 to
d22e290
Compare
…er region Problem: the VPJ external-storage dual-write path (added in linkedin#2821) made a single global decision -- it read one storageMode and dual-wrote (or not) identically for every region. With per-region external-storage sinks this cannot express "region A is DUAL_WRITE, region B is INTERNAL". This change resolves each region's store-level storageMode and fans out external writes only to the DUAL_WRITE regions: - New controller API getStorageModePerRegion (GET_PER_REGION_STORAGE_MODE): the parent fans out to each child region's store-level storageMode and returns Map<region, StorageMode>; a child controller returns its single region. Store-level (not the new version's value) is read so the result is available before the new version has propagated to child regions. - VPJ resolves the DUAL_WRITE region list and forwards it to executors via push.job.dual.write.target.regions, replacing the single push.job.target.storage.mode. - ExternalStorageWriter gains a region-aware configure(jobProps, topic, partition, region) overload; the 3-arg overload is deprecated and kept as a delegating default, so existing impls keep working (non-breaking). - DualWriteVeniceWriter holds one ExternalStorageWriter per DUAL_WRITE region and fans out batchPut/flush/close; a failure on any regional sink (after retries) fails the push so no external store goes partially populated. Testing: new multi-region e2e (TestVPJDualWriteExternalStorageMultiRegion) sets dc0 DUAL_WRITE and dc1 INTERNAL via regionsFilter and asserts only dc0's region-keyed sink is populated while dc1 stays empty and Venice still serves every record. The existing single-region e2e and the VPJ unit tests (including new fan-out tests) pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
d22e290 to
9ef8995
Compare
Problem Statement
The VPJ external-storage dual-write path (added in #2821) makes a single global decision: VPJ reads one
storageModefrom one controller and broadcasts one value to every executor, so all regions dual-write (or don't) identically. With per-region external-storage sinks, this cannot express "region A isDUAL_WRITE(populate its sink), region B isINTERNAL(Kafka-only)" — even though per-regionstorageModeis already representable via theregionsFilteronUpdateStore.Solution
Resolve each region's store-level
storageModeand fan out external writes only to theDUAL_WRITEregions.getStorageModePerRegion(GET_PER_REGION_STORAGE_MODE): the parent controller fans out to each child region's store-levelstorageModeand returnsMap<region, StorageMode>; a child controller returns its single region. It reads the store-level value (not the new version's) so the result is available even before the just-created version has propagated to child regions via the admin channel (which is asynchronous). Modeled on the existinglistStorePushInfofan-out.DUAL_WRITEregion list and forwards it to executors via the new OSS configpush.job.dual.write.target.regions, replacing the single-modepush.job.target.storage.mode.ExternalStorageWritergains a region-awareconfigure(jobProps, topic, partition, region)overload; the 3-arg overload is deprecated and kept as a delegatingdefault, so existing single-endpoint impls keep compiling and working.DualWriteVeniceWriterholds oneExternalStorageWriterperDUAL_WRITEregion and fans outbatchPut/flush/close. Any regional sink failure (after the existing bounded retry) fails the push, so no external store is left partially populated relative to Venice or to the other regions.This is the VPJ-side fan-out approach: the push job writes each region's dataset to that region's sink. Cross-region reachability from executors to each region's endpoint is an infra prerequisite, not part of this change.
Code changes
push.job.dual.write.target.regions(comma-separated region names; default empty = dual-write off). It is populated by the VPJ driver, not set by users directly. The companionpush.job.external.storage.writer.classgate is unchanged.Resolved DUAL_WRITE target regions ...) and one per partition writer (Dual-write to external storage enabled ... for regions ...).Concurrency-Specific Checks
controllerClientMap.ConcurrentHashMap).How was this PR tested?
DualWriteVeniceWriterTestfan-out cases (every regional writer receives each record; a failing regional writer fails the push and blocks the Kafka produce);ExternalStorageWriteUtilsTestregion-list parsing + gating.TestVPJDualWriteExternalStorageMultiRegion— a two-region push with dc0DUAL_WRITEand dc1INTERNAL(viaregionsFilter) asserts only dc0's region-keyed sink is populated, dc1's stays empty, and Venice serves every record.InMemoryExternalStorageWriteris now region-aware (sink keyed by(region, topic), topic-aggregate accessors retained); existing single-regionTestVPJDualWriteExternalStoragestill passes.DUAL_WRITE.Does this PR introduce any user-facing or breaking changes?
ExternalStorageWriter.configurechange is source-compatible via a deprecated delegating default, andpush.job.target.storage.mode(introduced in [vpj][common][test] Add VPJ dual-write external-storage path with batching and integration test #2821) is superseded internally bypush.job.dual.write.target.regions.