Skip to content

Commit 7b6450b

Browse files
committed
[vpj] Support running VTConsistencyCheckerJob via VenicePushJob
1 parent 6eda40a commit 7b6450b

6 files changed

Lines changed: 142 additions & 42 deletions

File tree

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP;
8888
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_REPUSH_SOURCE_PUBSUB_BROKER;
8989
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;
90+
import static com.linkedin.venice.vpj.VenicePushJobConstants.VT_CONSISTENCY_CHECK_ONLY;
9091

9192
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
9293
import com.linkedin.d2.balancer.D2Client;
@@ -148,6 +149,7 @@
148149
import com.linkedin.venice.security.SSLFactory;
149150
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
150151
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
152+
import com.linkedin.venice.spark.consistency.VTConsistencyCheckerJob;
151153
import com.linkedin.venice.spark.utils.RmdPushUtils;
152154
import com.linkedin.venice.status.PushJobDetailsStatus;
153155
import com.linkedin.venice.status.protocol.PushJobDetails;
@@ -536,7 +538,10 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
536538
}
537539
}
538540

539-
pushJobSettingToReturn.inputURI = pushJobSettingToReturn.isSourceKafka ? "" : getInputURI(props);
541+
pushJobSettingToReturn.inputURI =
542+
(pushJobSettingToReturn.isSourceKafka || props.getBoolean(VT_CONSISTENCY_CHECK_ONLY, false))
543+
? ""
544+
: getInputURI(props);
540545
pushJobSettingToReturn.storeName = props.getString(VENICE_STORE_NAME_PROP);
541546
pushJobSettingToReturn.rewindTimeInSecondsOverride = props.getLong(REWIND_TIME_IN_SECONDS_OVERRIDE, NOT_SET);
542547

@@ -724,6 +729,11 @@ DataWriterComputeJob getDataWriterComputeJob() {
724729
* @throws VeniceException
725730
*/
726731
public void run() {
732+
if (props.getBoolean(VT_CONSISTENCY_CHECK_ONLY, false)) {
733+
LOGGER.info("VT consistency check-only mode: skipping push, running VTConsistencyCheckerJob.");
734+
runVTConsistencyCheck();
735+
return;
736+
}
727737
try {
728738
initControllerClient(pushJobSetting.storeName);
729739
pushJobSetting.clusterName = controllerClient.getClusterName();
@@ -1034,6 +1044,13 @@ public void run() {
10341044
}
10351045
}
10361046

1047+
/**
1048+
* Delegates to {@link VTConsistencyCheckerJob#run(Properties)}.
1049+
*/
1050+
void runVTConsistencyCheck() {
1051+
VTConsistencyCheckerJob.run(props.toProperties());
1052+
}
1053+
10371054
/**
10381055
* Sets up a timeout monitor that will cancel and fail the push job if it runs longer than the allowed time.
10391056
* The timeout duration is determined by one of the following:

clients/venice-push-job/src/main/java/com/linkedin/venice/spark/consistency/VTConsistencyCheckerJob.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ public static void run(Properties jobProps) {
230230

231231
LongAccumulator partitionsProcessed = spark.sparkContext().longAccumulator("partitionsProcessed");
232232
LongAccumulator partitionsWithErrors = spark.sparkContext().longAccumulator("partitionsWithErrors");
233+
LongAccumulator inconsistenciesFound = spark.sparkContext().longAccumulator("inconsistenciesFound");
233234

234235
Dataset<Row> inconsistencies = spark.createDataset(partitions, Encoders.INT())
235236
.flatMap(
@@ -239,24 +240,30 @@ public static void run(Properties jobProps) {
239240
jobProps,
240241
numberOfRegions,
241242
partitionsProcessed,
242-
partitionsWithErrors),
243+
partitionsWithErrors,
244+
inconsistenciesFound),
243245
RowEncoder.apply(OUTPUT_SCHEMA));
244246

245247
inconsistencies.write().mode(SaveMode.ErrorIfExists).parquet(outputPath);
246248

247249
LOGGER.info(
248-
"VT consistency check complete. topic={} partitions={} processed={} errors={} output={}",
250+
"VT consistency check complete. topic={} partitions={} processed={} errors={} inconsistencies={} output={}",
249251
versionTopic,
250252
partitionCount,
251253
partitionsProcessed.value(),
252254
partitionsWithErrors.value(),
255+
inconsistenciesFound.value(),
253256
outputPath);
254257

255258
if (partitionsWithErrors.value() > 0) {
256-
throw new RuntimeException(
259+
throw new VeniceException(
257260
partitionsWithErrors.value() + " partition(s) failed during scan of topic " + versionTopic
258261
+ ". Check executor logs for details.");
259262
}
263+
if (inconsistenciesFound.value() > 0) {
264+
throw new VeniceException(
265+
inconsistenciesFound.value() + " inconsistencies found for " + versionTopic + "; see " + outputPath);
266+
}
260267
} finally {
261268
if (spark != null) {
262269
spark.stop();
@@ -274,7 +281,8 @@ static Iterator<Row> findInconsistenciesForPartition(
274281
Properties jobProps,
275282
int numberOfRegions,
276283
LongAccumulator partitionsProcessed,
277-
LongAccumulator partitionsWithErrors) {
284+
LongAccumulator partitionsWithErrors,
285+
LongAccumulator inconsistenciesFound) {
278286
int partition = dc0Split.getPartitionNumber();
279287
String versionTopic = dc0Split.getTopicName();
280288
try {
@@ -341,6 +349,7 @@ static Iterator<Row> findInconsistenciesForPartition(
341349
found.size());
342350

343351
partitionsProcessed.add(1);
352+
inconsistenciesFound.add(found.size());
344353
return found.stream().map(inc -> toRow(inc, versionTopic, partition)).iterator();
345354
} finally {
346355
Utils.closeQuietlyWithErrorLogged(dc0Consumer);

clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,14 @@ private VenicePushJobConstants() {
323323
public static final String ENABLE_WRITE_COMPUTE = "venice.write.compute.enable";
324324
public static final String ENABLE_SSL = "venice.ssl.enable";
325325
public static final String VENICE_STORE_NAME_PROP = "venice.store.name";
326+
327+
/**
328+
* When set to {@code true}, {@link com.linkedin.venice.hadoop.VenicePushJob#run()} skips the
329+
* push and instead invokes {@link com.linkedin.venice.spark.consistency.VTConsistencyCheckerJob}
330+
* against the store's current version topic.
331+
*/
332+
public static final String VT_CONSISTENCY_CHECK_ONLY = "vt.consistency.check.only";
333+
326334
public static final String INPUT_PATH_PROP = "input.path";
327335
public static final String INPUT_PATH_LAST_MODIFIED_TIME = "input.path.last.modified.time";
328336
public static final String BATCH_NUM_BYTES_PROP = "batch.num.bytes";

clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP;
3838
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_REPUSH_SOURCE_PUBSUB_BROKER;
3939
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;
40+
import static com.linkedin.venice.vpj.VenicePushJobConstants.VT_CONSISTENCY_CHECK_ONLY;
4041
import static org.mockito.ArgumentMatchers.any;
4142
import static org.mockito.ArgumentMatchers.anyBoolean;
4243
import static org.mockito.ArgumentMatchers.anyInt;
@@ -204,6 +205,34 @@ public void testCheckLastModifiedTimestamp() throws Exception {
204205
}
205206
}
206207

208+
@Test
209+
public void testRunShortCircuitsToVTConsistencyCheckWhenFlagSet() {
210+
ControllerClient mockClient = mock(ControllerClient.class);
211+
Properties props = new Properties();
212+
props.setProperty(VT_CONSISTENCY_CHECK_ONLY, "true");
213+
try (VenicePushJob spyJob = getSpyVenicePushJob(props, mockClient)) {
214+
doNothing().when(spyJob).runVTConsistencyCheck();
215+
spyJob.run();
216+
verify(spyJob, times(1)).runVTConsistencyCheck();
217+
verifyNoInteractions(mockClient);
218+
}
219+
}
220+
221+
@Test
222+
public void testRunDoesNotShortCircuitWhenFlagAbsent() {
223+
Properties props = new Properties();
224+
try (VenicePushJob spyJob = getSpyVenicePushJob(props, null)) {
225+
doNothing().when(spyJob).runVTConsistencyCheck();
226+
try {
227+
spyJob.run();
228+
} catch (Exception ignored) {
229+
// The push path will throw because the test stubs aren't wired for a full push.
230+
// We only care that the checker gate wasn't taken.
231+
}
232+
verify(spyJob, never()).runVTConsistencyCheck();
233+
}
234+
}
235+
207236
@Test
208237
public void testHandleVersionCreationACLError() {
209238
VenicePushJob mockJob = getSpyVenicePushJob(new Properties(), null);

clients/venice-push-job/src/test/java/com/linkedin/venice/spark/consistency/VTConsistencyCheckerJobTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ public void testOutputSchemaContractIsStable() {
291291
public void testFindInconsistenciesForPartitionReturnsErrorRowOnException() {
292292
LongAccumulator partitionsProcessed = mock(LongAccumulator.class);
293293
LongAccumulator partitionsWithErrors = mock(LongAccumulator.class);
294+
LongAccumulator inconsistenciesFound = mock(LongAccumulator.class);
294295

295296
PubSubTopicRepository repo = new PubSubTopicRepository();
296297
PubSubTopicPartition tp = new PubSubTopicPartitionImpl(repo.getTopic("store_v1"), 3);
@@ -305,7 +306,8 @@ public void testFindInconsistenciesForPartitionReturnsErrorRowOnException() {
305306
new Properties(),
306307
3,
307308
partitionsProcessed,
308-
partitionsWithErrors);
309+
partitionsWithErrors,
310+
inconsistenciesFound);
309311

310312
List<Row> rows = collectRows(result);
311313
assertEquals(rows.size(), 1, "exactly one sentinel ERROR row");

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestVTConsistencyCheckerJob.java

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@
1010
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
1111
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
1212
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP;
13+
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;
14+
import static com.linkedin.venice.vpj.VenicePushJobConstants.VT_CONSISTENCY_CHECK_ONLY;
1315
import static org.testng.Assert.assertEquals;
1416
import static org.testng.Assert.assertFalse;
1517
import static org.testng.Assert.assertNotNull;
18+
import static org.testng.Assert.assertThrows;
1619

1720
import com.linkedin.venice.annotation.PubSubAgnosticTest;
1821
import com.linkedin.venice.client.store.AvroGenericStoreClient;
1922
import com.linkedin.venice.client.store.ClientConfig;
2023
import com.linkedin.venice.client.store.ClientFactory;
2124
import com.linkedin.venice.controllerapi.ControllerClient;
2225
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
26+
import com.linkedin.venice.exceptions.VeniceException;
27+
import com.linkedin.venice.hadoop.VenicePushJob;
2328
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
2429
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
2530
import com.linkedin.venice.meta.Version;
@@ -106,6 +111,39 @@ protected Properties getExtraControllerProperties() {
106111
*/
107112
@Test(timeOut = TEST_TIMEOUT)
108113
public void testFullPipelineWithBatchPushRTWritesAndInjectedInconsistency() throws Exception {
114+
String storeName = setupAACorruptedStore();
115+
String versionTopic = Version.composeKafkaTopic(storeName, 1);
116+
File tempRoot = Files.createTempDirectory("vt-consistency-full-pipeline").toFile();
117+
File outputDir = new File(tempRoot, "output");
118+
try {
119+
Properties jobProps = buildCheckerJobProps(storeName, outputDir);
120+
assertThrows(VeniceException.class, () -> VTConsistencyCheckerJob.run(jobProps));
121+
verifyMismatchInParquet(outputDir, versionTopic);
122+
} finally {
123+
org.apache.commons.io.FileUtils.deleteDirectory(tempRoot);
124+
}
125+
}
126+
127+
@Test(timeOut = TEST_TIMEOUT)
128+
public void testFullPipelineWithVPJDrivenCheckerThrowsOnInconsistency() throws Exception {
129+
String storeName = setupAACorruptedStore();
130+
String versionTopic = Version.composeKafkaTopic(storeName, 1);
131+
File tempRoot = Files.createTempDirectory("vt-consistency-vpj-driven").toFile();
132+
File outputDir = new File(tempRoot, "output");
133+
try {
134+
Properties jobProps = buildCheckerJobProps(storeName, outputDir);
135+
jobProps.setProperty(VT_CONSISTENCY_CHECK_ONLY, "true");
136+
jobProps.setProperty(VENICE_STORE_NAME_PROP, storeName);
137+
try (VenicePushJob vpj = new VenicePushJob(Utils.getUniqueString("vpj-vt-check"), jobProps)) {
138+
assertThrows(VeniceException.class, vpj::run);
139+
}
140+
verifyMismatchInParquet(outputDir, versionTopic);
141+
} finally {
142+
org.apache.commons.io.FileUtils.deleteDirectory(tempRoot);
143+
}
144+
}
145+
146+
private String setupAACorruptedStore() throws Exception {
109147
// 1. Batch push with 5 records via VPJ
110148
File inputDir = getTempDataDirectory();
111149
Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir, 5);
@@ -227,46 +265,43 @@ public void testFullPipelineWithBatchPushRTWritesAndInjectedInconsistency() thro
227265
}
228266
});
229267

230-
// 6. Run VT consistency checker
231-
File tempRoot = Files.createTempDirectory("vt-consistency-full-pipeline").toFile();
232-
File outputDir = new File(tempRoot, "output");
233-
try {
234-
Properties jobProps = new Properties();
235-
jobProps.setProperty(
236-
VTConsistencyCheckerJob.DC0_BROKER_URL,
237-
childDatacenters.get(0).getPubSubBrokerWrapper().getAddress());
238-
jobProps.setProperty(
239-
VTConsistencyCheckerJob.DC1_BROKER_URL,
240-
childDatacenters.get(1).getPubSubBrokerWrapper().getAddress());
241-
jobProps.setProperty(VTConsistencyCheckerJob.STORE_NAME, storeName);
242-
jobProps.setProperty(VENICE_DISCOVER_URL_PROP, childDatacenters.get(0).getControllerConnectString());
243-
jobProps.setProperty(VTConsistencyCheckerJob.OUTPUT_PATH, outputDir.getAbsolutePath());
244-
jobProps.setProperty(VTConsistencyCheckerJob.NUMBER_OF_REGIONS, "2");
268+
return storeName;
269+
}
270+
}
245271

246-
VTConsistencyCheckerJob.run(jobProps);
272+
private Properties buildCheckerJobProps(String storeName, File outputDir) {
273+
Properties jobProps = new Properties();
274+
jobProps.setProperty(
275+
VTConsistencyCheckerJob.DC0_BROKER_URL,
276+
childDatacenters.get(0).getPubSubBrokerWrapper().getAddress());
277+
jobProps.setProperty(
278+
VTConsistencyCheckerJob.DC1_BROKER_URL,
279+
childDatacenters.get(1).getPubSubBrokerWrapper().getAddress());
280+
jobProps.setProperty(VTConsistencyCheckerJob.STORE_NAME, storeName);
281+
jobProps.setProperty(VENICE_DISCOVER_URL_PROP, childDatacenters.get(0).getControllerConnectString());
282+
jobProps.setProperty(VTConsistencyCheckerJob.OUTPUT_PATH, outputDir.getAbsolutePath());
283+
jobProps.setProperty(VTConsistencyCheckerJob.NUMBER_OF_REGIONS, "2");
284+
return jobProps;
285+
}
247286

248-
SparkSession reader =
249-
SparkSession.builder().master("local[*]").appName("TestVTConsistencyCheckerJob-reader").getOrCreate();
250-
try {
251-
Dataset<Row> result = reader.read().parquet(outputDir.getAbsolutePath());
252-
List<Row> rows = result.collectAsList();
287+
private void verifyMismatchInParquet(File outputDir, String versionTopic) {
288+
SparkSession reader =
289+
SparkSession.builder().master("local[*]").appName("TestVTConsistencyCheckerJob-reader").getOrCreate();
290+
try {
291+
Dataset<Row> result = reader.read().parquet(outputDir.getAbsolutePath());
292+
List<Row> rows = result.collectAsList();
253293

254-
// Expect at least one VALUE_MISMATCH for the overwritten key "buggy-key"
255-
List<Row> mismatches =
256-
rows.stream().filter(r -> "VALUE_MISMATCH".equals(r.getAs("type"))).collect(Collectors.toList());
257-
assertFalse(mismatches.isEmpty(), "Expected at least one VALUE_MISMATCH for the corrupted key");
294+
List<Row> mismatches =
295+
rows.stream().filter(r -> "VALUE_MISMATCH".equals(r.getAs("type"))).collect(Collectors.toList());
296+
assertFalse(mismatches.isEmpty(), "Expected at least one VALUE_MISMATCH for the corrupted key");
258297

259-
Row corruptRow = mismatches.get(0);
260-
assertEquals(corruptRow.getAs("version_topic"), versionTopic);
261-
assertFalse(
262-
corruptRow.getAs("dc0_value_hash").equals(corruptRow.getAs("dc1_value_hash")),
263-
"DC value hashes must differ for corrupted key");
264-
} finally {
265-
reader.stop();
266-
}
267-
} finally {
268-
org.apache.commons.io.FileUtils.deleteDirectory(tempRoot);
269-
}
298+
Row corruptRow = mismatches.get(0);
299+
assertEquals(corruptRow.getAs("version_topic"), versionTopic);
300+
assertFalse(
301+
corruptRow.getAs("dc0_value_hash").equals(corruptRow.getAs("dc1_value_hash")),
302+
"DC value hashes must differ for corrupted key");
303+
} finally {
304+
reader.stop();
270305
}
271306
}
272307

0 commit comments

Comments
 (0)