Skip to content

Commit

Permalink
[test] Fix race condition in DaVinciClientTest (#1361)
Browse files Browse the repository at this point in the history
There exists a race condition in DaVinciClientTest where tests with different schemas running concurrently can cause VeniceInconsistentSchemaException. This is because all of the tests in this file share the same directory, and the validation logic reads the first schema file in the directory.

The fix for this is to create a new temp directory for each test to avoid collisions.
  • Loading branch information
kvargha authored Dec 3, 2024
1 parent 1b40264 commit 3a99bd9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3562,7 +3562,6 @@ private int processKafkaDataMessage(
keyLen = keyBytes.length;
// update checksum for this PUT message if needed.
partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes, put);

if (metricsEnabled && recordLevelMetricEnabled.get() && put.getSchemaId() == CHUNK_MANIFEST_SCHEMA_ID) {
// This must be done before the recordTransformer modifies the putValue, otherwise the size will be incorrect.
recordAssembledRecordSize(keyLen, put.getPutValue(), put.getReplicationMetadataPayload(), currentTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,10 @@ public class DaVinciClientTest {
private VeniceClusterWrapper cluster;
private D2Client d2Client;
private PubSubProducerAdapterFactory pubSubProducerAdapterFactory;
private File inputDir;
private String inputDirPath;

@BeforeClass
public void setUp() {
Utils.thisIsLocalhost();
inputDir = getTempDataDirectory();
inputDirPath = "file://" + inputDir.getAbsolutePath();
Properties clusterConfig = new Properties();
clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);
clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true);
Expand Down Expand Up @@ -1616,6 +1612,8 @@ private void setUpStore(
boolean chunkingEnabled = false;
CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;

File inputDir = getTempDataDirectory();

Runnable writeAvroFileRunnable = () -> {
try {
writeSimpleAvroFileWithIntToStringSchema(inputDir);
Expand All @@ -1632,7 +1630,8 @@ private void setUpStore(
chunkingEnabled,
compressionStrategy,
writeAvroFileRunnable,
valueSchema);
valueSchema,
inputDir);
}

/*
Expand All @@ -1649,6 +1648,9 @@ private void setUpStore(
int numKeys) {
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> {};
Consumer<Properties> propertiesConsumer = properties -> {};

File inputDir = getTempDataDirectory();

Runnable writeAvroFileRunnable = () -> {
try {
writeSimpleAvroFileWithIntToStringSchema(inputDir, customValue, numKeys);
Expand All @@ -1665,7 +1667,8 @@ private void setUpStore(
chunkingEnabled,
compressionStrategy,
writeAvroFileRunnable,
valueSchema);
valueSchema,
inputDir);
}

/*
Expand All @@ -1681,6 +1684,8 @@ private void setUpStore(
int numKeys) {
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> {};
Consumer<Properties> propertiesConsumer = properties -> {};

File inputDir = getTempDataDirectory();
Runnable writeAvroFileRunnable = () -> {
try {
writeSimpleAvroFileWithIntToIntSchema(inputDir, numKeys);
Expand All @@ -1697,7 +1702,8 @@ private void setUpStore(
chunkingEnabled,
compressionStrategy,
writeAvroFileRunnable,
valueSchema);
valueSchema,
inputDir);
}

private void setUpStore(
Expand All @@ -1708,11 +1714,13 @@ private void setUpStore(
boolean chunkingEnabled,
CompressionStrategy compressionStrategy,
Runnable writeAvroFileRunnable,
String valueSchema) {
String valueSchema,
File inputDir) {
// Produce input data.
writeAvroFileRunnable.run();

// Setup VPJ job properties.
String inputDirPath = "file://" + inputDir.getAbsolutePath();
Properties vpjProperties = defaultVPJProps(cluster, inputDirPath, storeName);
propertiesConsumer.accept(vpjProperties);
// Create & update store for test.
Expand Down

0 comments on commit 3a99bd9

Please sign in to comment.