Skip to content

Commit

Permalink
add requestbasedmetarepository integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Feb 12, 2025
1 parent 70236be commit d89a24e
Show file tree
Hide file tree
Showing 14 changed files with 538 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public class DaVinciConfig {
*/
private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD;

/**
* TODO PRANAV correct place to inject this config?
* Determines whether to enable request-based metadata retrieval directly from the Venice Server.
* By default, metadata is retrieved from a system store via a thin client.
*/
private boolean useRequestBasedMetaRepository = false;

public DaVinciConfig() {
}

Expand Down Expand Up @@ -147,4 +154,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl
this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold;
return this;
}

public boolean isUseRequestBasedMetaRepository() {
return useRequestBasedMetaRepository;
}

public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) {
this.useRequestBasedMetaRepository = useRequestBasedMetaRepository;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient(
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
.setSpecificValueClass(valueClass)
.setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository());

DaVinciClient client;
if (config.isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {

NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ protected int getMaxValueSchemaId(String storeName) {
}

protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
// New store
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V7_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
Expand Down Expand Up @@ -502,19 +503,29 @@ public void testGetLatestUpdateSchemaFromSchemaResponse() {
public void testValidateSubsetSchema() {
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V1_SCHEMA, NAME_RECORD_V2_SCHEMA.toString()));
Assert.assertFalse(
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V2_SCHEMA, NAME_RECORD_V3_SCHEMA.toString()));
Assert.assertFalse(
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V3_SCHEMA, NAME_RECORD_V4_SCHEMA.toString()));
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V4_SCHEMA, NAME_RECORD_V5_SCHEMA.toString()));
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V6_SCHEMA.toString()));
Assert.assertFalse(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V6_SCHEMA, NAME_RECORD_V7_SCHEMA.toString()));

// NAME_RECORD_V5_SCHEMA and NAME_RECORD_V6_SCHEMA are different in props for field.
Assert.assertNotEquals(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V6_SCHEMA);

// Test validation skip comparing props when checking for subset schema.
Schema supersetSchemaForV5AndV4 =
AvroSupersetSchemaUtils.generateSupersetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA);
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, supersetSchemaForV5AndV4.toString()));
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V4_SCHEMA, supersetSchemaForV5AndV4.toString()));
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, supersetSchemaForV5AndV4.toString()));
Assert.assertFalse( // V4 + V5 != V6
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V6_SCHEMA, supersetSchemaForV5AndV4.toString()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,6 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException
}

// TODO PRANAV move this test and use the full DVC
// Can we add a new test file where we run a more
// comprehensive integration test with a DVC? i.e
// push some new versions or make some store config
// changes and make sure the DVC pick up those changes.
// You can see examples like the recently added
// testBatchOnlyMaterializedViewDVCConsumer.
// You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper,
// a single region will be sufficient.
@Test(timeOut = 120 * Time.MS_PER_SECOND)
public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static org.testng.Assert.assertNotNull;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestDaVinciRequestBasedMetaRepository {
private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE;

private static final String CLUSTER_NAME = "venice-cluster";
private VeniceClusterWrapper clusterWrapper;

private static final String storeNameStringToString = "store-name-string-to-string";
private static final String storeNameStringToNameRecord = "store-name-string-to-name-record";

// StoreName -> ControllerClient
// Using map to check which stores are created
private final Map<String, ControllerClient> controllerClients = new HashMap<>();
// StoreName -> Directory
private final Map<String, File> pushJobAvroDataDirs = new HashMap<>();

private DaVinciConfig daVinciConfig;
private MetricsRepository dvcMetricsRepo;
private D2Client daVinciD2RemoteFabric;
private CachingDaVinciClientFactory daVinciClientFactory;

@BeforeClass(alwaysRun = true)
public void setUp() throws IOException {

VeniceClusterCreateOptions.Builder options = new VeniceClusterCreateOptions.Builder().clusterName(CLUSTER_NAME)
.numberOfRouters(1)
.numberOfServers(2)
.numberOfControllers(2)
.replicationFactor(2)
.forkServer(false);
clusterWrapper = ServiceFactory.getVeniceCluster(options.build());

// Create stores
runPushJob( // String to String
storeNameStringToString,
TestWriteUtils
.writeSimpleAvroFileWithStringToStringSchema(getPushJobAvroFileDirectory(storeNameStringToString)));
runPushJob( // String to Name Record
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord)));

// Set up DVC Client Factory
VeniceProperties backendConfig =
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB)
.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
daVinciConfig = new DaVinciConfig();
daVinciConfig.setUseRequestBasedMetaRepository(true);
daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(clusterWrapper.getZk().getAddress());
dvcMetricsRepo = new MetricsRepository();
daVinciClientFactory = new CachingDaVinciClientFactory(
daVinciD2RemoteFabric,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
dvcMetricsRepo,
backendConfig);
}

@AfterClass(alwaysRun = true)
public void cleanUp() {

// Shutdown remote fabric
D2ClientUtils.shutdownClient(daVinciD2RemoteFabric);

// Close client factory
daVinciClientFactory.close();

// Close controller clients
for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) {
entry.getValue().close();
}

// Close cluster wrapper
clusterWrapper.close();
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToString()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToString, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
Assert
.assertEquals(getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), (double) 1);

// Perform another push with 200 keys to verify future version ingestion
recordCount = 200;
runPushJob(
storeNameStringToString,
TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(
getPushJobAvroFileDirectory(storeNameStringToString),
recordCount));

// Perform another push with 200 keys to verify future version ingestion and version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString),
(double) 2);
});

for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecord() throws ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.renderNameRecord(TestWriteUtils.STRING_TO_NAME_RECORD_V1_SCHEMA, i)
.get(DEFAULT_VALUE_FIELD_PROP)
.toString());
}
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) 1);
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecordVersions()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

for (int i = 0; i < TestWriteUtils.countStringToNameRecordSchemas(); i++) {
Schema schema = TestWriteUtils.getStringToNameRecordSchema(i);
int currentValueVersion = i + 2;

int recordCount = currentValueVersion * TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
runPushJob(
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordSchema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord),
schema,
recordCount));

// Perform another push with 200 keys to verify future version ingestion and version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) (currentValueVersion));
});

for (int j = 1; j <= recordCount; j++) {
Assert.assertEquals(
storeClient.get(Integer.toString(j)).get().toString(),
TestWriteUtils.renderNameRecord(schema, j).get(DEFAULT_VALUE_FIELD_PROP).toString());
}
}
}
}

private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) {
Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName);
assertNotNull(metric, "Expected metric " + metricName + " not found.");
return metric.value();
}

private File getPushJobAvroFileDirectory(String storeName) {
if (!pushJobAvroDataDirs.containsKey(storeName)) {
pushJobAvroDataDirs.put(storeName, getTempDataDirectory());
}

return pushJobAvroDataDirs.get(storeName);
}

private void runPushJob(String storeName, Schema schema) {

ControllerClient controllerClient;
File dataDir = getPushJobAvroFileDirectory(storeName);
String dataDirPath = "file:" + dataDir.getAbsolutePath();

if (!controllerClients.containsKey(storeName)) {
// Init store
controllerClient = IntegrationTestPushUtils.createStoreForJob(
CLUSTER_NAME,
schema,
TestWriteUtils.defaultVPJProps(
clusterWrapper.getVeniceControllers().get(0).getControllerUrl(),
dataDirPath,
storeName));
controllerClients.put(storeName, controllerClient);
} else {
controllerClient = controllerClients.get(storeName);

// Add new schema
Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema();
SchemaResponse schemaResponse = controllerClient.addValueSchema(storeName, valueSchema.toString());
Assert.assertFalse(schemaResponse.isError(), schemaResponse.getError());
}

Properties props =
TestWriteUtils.defaultVPJProps(controllerClient.getLeaderControllerUrl(), dataDirPath, storeName);
TestWriteUtils.runPushJob(storeName + "_" + Utils.getUniqueString("push_job"), props);
}
}
Loading

0 comments on commit d89a24e

Please sign in to comment.