From 277b16ce74b8ab1a7c30423dc1d652d067c31c4e Mon Sep 17 00:00:00 2001 From: pthirun Date: Wed, 22 Jan 2025 16:43:04 -0800 Subject: [PATCH 1/3] add requestbasedmetarepository to dvc --- .../repository/NativeMetadataRepository.java | 120 +++------ .../RequestBasedMetaRepository.java | 153 ++++++++++++ .../ThinClientMetaStoreBasedRepository.java | 70 +++++- .../NativeMetadataRepositoryTest.java | 106 ++++---- .../RequestBasedMetaRepositoryTest.java | 235 ++++++++++++++++++ ...hinClientMetaStoreBasedRepositoryTest.java | 155 ++++++++++++ .../venice/client/store/ClientConfig.java | 14 +- .../linkedin/venice/meta/ReadOnlyStore.java | 2 +- .../venice/meta/ReadOnlyStoreTest.java | 58 +---- .../venice/endToEnd/MetaSystemStoreTest.java | 71 +++++- .../com/linkedin/venice/utils/TestUtils.java | 53 ++++ .../ServerReadMetadataRepository.java | 2 +- .../ServerReadMetadataRepositoryTest.java | 43 +++- 13 files changed, 855 insertions(+), 227 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java index 7d595198864..b1cdb7782a5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java @@ -1,8 +1,6 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import static java.lang.Thread.currentThread; import com.linkedin.davinci.stats.NativeMetadataRepositoryStats; @@ -25,17 +23,11 @@ import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.service.ICProvider; -import com.linkedin.venice.system.store.MetaStoreDataType; -import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; -import com.linkedin.venice.systemstore.schemas.StoreMetaKey; -import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository private final Map storeConfigMap = new VeniceConcurrentHashMap<>(); // Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed // for key schema evolvability. - private final Map schemaMap = new VeniceConcurrentHashMap<>(); + protected Map schemaMap = new VeniceConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final Set listeners = new CopyOnWriteArraySet<>(); private final AtomicLong totalStoreReadQuota = new AtomicLong(); @@ -125,11 +117,20 @@ public static NativeMetadataRepository getInstance( ClientConfig clientConfig, VeniceProperties backendConfig, ICProvider icProvider) { + + NativeMetadataRepository nativeMetadataRepository; + if (clientConfig.isUseRequestBasedMetaRepository()) { + nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig); + } else { + nativeMetadataRepository = new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + } + LOGGER.info( "Initializing {} with {}", NativeMetadataRepository.class.getSimpleName(), - ThinClientMetaStoreBasedRepository.class.getSimpleName()); - return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + nativeMetadataRepository.getClass().getSimpleName()); + + return nativeMetadataRepository; } @Override @@ -168,23 +169,19 @@ public boolean hasStore(String storeName) { return subscribedStoreMap.containsKey(storeName); } + // refreshOneStore will throw the VeniceNoStoreException when + // retrieving metadata for stores in "Deleting" state or "Missing". @Override public Store refreshOneStore(String storeName) { try { - getAndSetStoreConfigFromSystemStore(storeName); - StoreConfig storeConfig = storeConfigMap.get(storeName); + StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName); if (storeConfig == null) { throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName); } - Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster()); - // isDeleting check to detect deleted store is only supported by meta system store based implementation. - if (newStore != null && !storeConfig.isDeleting()) { - putStore(newStore); - getAndCacheSchemaDataFromSystemStore(storeName); - nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); - } else { - removeStore(storeName); - } + Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster()); + putStore(newStore); + getAndCacheSchemaData(storeName); + nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); return newStore; } catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) { throw new VeniceNoStoreException(storeName, e); @@ -393,74 +390,17 @@ public void clear() { * Get the store cluster config from system store and update the local cache with it. Different implementation will * get the data differently but should all populate the store cluster config map. */ - protected void getAndSetStoreConfigFromSystemStore(String storeName) { - storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName)); + protected StoreConfig cacheStoreConfigFromRemote(String storeName) { + StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName); + storeConfigMap.put(storeName, storeConfig); + return storeConfig; } - protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName); + protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName); - protected abstract Store getStoreFromSystemStore(String storeName, String clusterName); + protected abstract Store fetchStoreFromRemote(String storeName, String clusterName); - protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key); - - // Helper function with common code for retrieving StoreConfig from meta system store. - protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) { - StoreClusterConfig clusterConfig = getStoreMetaValue( - storeName, - MetaStoreDataType.STORE_CLUSTER_CONFIG - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; - return new StoreConfig(clusterConfig); - } - - // Helper function with common code for retrieving SchemaData from meta system store. - protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) { - SchemaData schemaData = schemaMap.get(storeName); - SchemaEntry keySchema; - if (schemaData == null) { - // Retrieve the key schema and initialize SchemaData only if it's not cached yet. - StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map keySchemaMap = - getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; - if (keySchemaMap.isEmpty()) { - throw new VeniceException("No key schema found for store: " + storeName); - } - Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); - keySchema = - new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); - schemaData = new SchemaData(storeName, keySchema); - } - StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map valueSchemaMap = - getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; - // Check the value schema string, if it's empty then try to query the other key space for individual value schema. - for (Map.Entry entry: valueSchemaMap.entrySet()) { - // Check if we already have the corresponding value schema - int valueSchemaId = Integer.parseInt(entry.getKey().toString()); - if (schemaData.getValueSchema(valueSchemaId) != null) { - continue; - } - if (entry.getValue().toString().isEmpty()) { - // The value schemas might be too large to be stored in a single K/V. - StoreMetaKey individualValueSchemaKey = - MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { - { - put(KEY_STRING_STORE_NAME, storeName); - put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); - } - }); - // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in - // the individual value schema key space. - String valueSchema = - getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); - } else { - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); - } - } - return schemaData; - } + protected abstract SchemaData getSchemaData(String storeName); protected Store putStore(Store newStore) { // Workaround to make old metadata compatible with new fields @@ -516,11 +456,11 @@ protected void notifyStoreChanged(Store store) { } } - protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { + protected SchemaData getAndCacheSchemaData(String storeName) { if (!hasStore(storeName)) { throw new VeniceNoStoreException(storeName); } - SchemaData schemaData = getSchemaDataFromSystemStore(storeName); + SchemaData schemaData = getSchemaData(storeName); schemaMap.put(storeName, schemaData); return schemaData; } @@ -532,7 +472,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException { SchemaData schemaData = schemaMap.get(storeName); if (schemaData == null) { - schemaData = getAndCacheSchemaDataFromSystemStore(storeName); + schemaData = getAndCacheSchemaData(storeName); } return schemaData; } @@ -545,8 +485,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) { return schemaData.getValueSchema(id); } - protected abstract SchemaData getSchemaDataFromSystemStore(String storeName); - /** * This function is used to remove schema entry for the given store from local cache, * and related listeners as well. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java new file mode 100644 index 00000000000..6a7bb50900c --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java @@ -0,0 +1,153 @@ +package com.linkedin.davinci.repository; + +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.D2ServiceDiscovery; +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.meta.QueryAction; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; +import com.linkedin.venice.systemstore.schemas.StoreProperties; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import org.apache.avro.Schema; + + +public class RequestBasedMetaRepository extends NativeMetadataRepository { + + // cluster -> client + private final Map d2TransportClientMap = new VeniceConcurrentHashMap<>(); + + // storeName -> T + protected Map storeSchemaMap = new VeniceConcurrentHashMap<>(); + + private final D2TransportClient d2DiscoveryTransportClient; + private D2ServiceDiscovery d2ServiceDiscovery; + + public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) { + super(clientConfig, backendConfig); + this.d2ServiceDiscovery = new D2ServiceDiscovery(); + this.d2DiscoveryTransportClient = + new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client()); + } + + @Override + public void clear() { + super.clear(); + + // Clear cache + d2TransportClientMap.clear(); + storeSchemaMap.clear(); + } + + @Override + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { + // Create StoreConfig from D2 + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + + StoreClusterConfig storeClusterConfig = new StoreClusterConfig(); + storeClusterConfig.setStoreName(storeName); + storeClusterConfig.setCluster(d2TransportClient.getServiceName()); + + return new StoreConfig(storeClusterConfig); + } + + @Override + protected Store fetchStoreFromRemote(String storeName, String clusterName) { + // Fetch store, bypass cache + StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName); + StoreProperties storeProperties = record.storeMetaValue.storeProperties; + return new ZKStore(storeProperties); + } + + @Override + protected SchemaData getSchemaData(String storeName) { + if (!storeSchemaMap.containsKey(storeName)) { + // Cache miss + fetchAndCacheStorePropertiesResponseRecord(storeName); + } + return storeSchemaMap.get(storeName); + } + + protected StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) { + + // Request + int maxValueSchemaId = getMaxValueSchemaId(storeName); + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName; + if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) { + requestBasedStorePropertiesURL += "/" + maxValueSchemaId; + } + + TransportClientResponse response; + try { + response = d2TransportClient.get(requestBasedStorePropertiesURL).get(); + } catch (Exception e) { + throw new RuntimeException( + "Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL + + ": " + e); + } + + // Deserialize + Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$; + RecordDeserializer recordDeserializer = FastSerializerDeserializerFactory + .getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class); + StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody()); + + // Cache + cacheStoreSchema(storeName, record); + + return record; + } + + D2TransportClient getD2TransportClient(String storeName) { + synchronized (this) { + // Get cluster for store + String serverD2ServiceName = + d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service(); + if (d2TransportClientMap.containsKey(serverD2ServiceName)) { + return d2TransportClientMap.get(serverD2ServiceName); + } + D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client()); + d2TransportClientMap.put(serverD2ServiceName, d2TransportClient); + return d2TransportClient; + } + } + + protected int getMaxValueSchemaId(String storeName) { + if (!schemaMap.containsKey(storeName)) { + return SchemaData.UNKNOWN_SCHEMA_ID; + } + return schemaMap.get(storeName).getMaxValueSchemaId(); + } + + protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) { + + if (!storeSchemaMap.containsKey(storeName)) { + // New schema data + Map.Entry keySchemaEntry = + record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next(); + SchemaData schemaData = new SchemaData( + storeName, + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString())); + storeSchemaMap.put(storeName, schemaData); + } + + // Store Value Schemas + for (Map.Entry entry: record.getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .entrySet()) { + storeSchemaMap.get(storeName) + .addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString())); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java index 637ba1682ed..fa8ba11357c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import com.linkedin.venice.client.exceptions.ServiceDiscoveryException; @@ -9,13 +10,16 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.MissingKeyInStoreMetadataException; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceRetriableException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.system.store.MetaStoreDataType; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.systemstore.schemas.StoreProperties; @@ -63,12 +67,16 @@ public void subscribe(String storeName) throws InterruptedException { } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { - return getStoreConfigFromMetaSystemStore(storeName); + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { + StoreClusterConfig clusterConfig = getStoreMetaValue( + storeName, + MetaStoreDataType.STORE_CLUSTER_CONFIG + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; + return new StoreConfig(clusterConfig); } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { StoreProperties storeProperties = getStoreMetaValue(storeName, MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap() { { @@ -79,12 +87,6 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { return new ZKStore(storeProperties); } - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); - } - - @Override protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { final Callable> supplier = () -> getAvroClientForMetaStore(storeName).get(key); Callable> wrappedSupplier = @@ -107,6 +109,56 @@ protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { return value; } + @Override + protected SchemaData getSchemaData(String storeName) { + SchemaData schemaData = schemaMap.get(storeName); + SchemaEntry keySchema; + if (schemaData == null) { + // Retrieve the key schema and initialize SchemaData only if it's not cached yet. + StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map keySchemaMap = + getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; + if (keySchemaMap.isEmpty()) { + throw new VeniceException("No key schema found for store: " + storeName); + } + Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); + keySchema = + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); + schemaData = new SchemaData(storeName, keySchema); + } + StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map valueSchemaMap = + getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; + // Check the value schema string, if it's empty then try to query the other key space for individual value schema. + for (Map.Entry entry: valueSchemaMap.entrySet()) { + // Check if we already have the corresponding value schema + int valueSchemaId = Integer.parseInt(entry.getKey().toString()); + if (schemaData.getValueSchema(valueSchemaId) != null) { + continue; + } + if (entry.getValue().toString().isEmpty()) { + // The value schemas might be too large to be stored in a single K/V. + StoreMetaKey individualValueSchemaKey = + MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, storeName); + put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); + } + }); + // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in + // the individual value schema key space. + String valueSchema = + getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); + } else { + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); + } + } + return schemaData; + } + private AvroSpecificStoreClient getAvroClientForMetaStore(String storeName) { return storeClientMap.computeIfAbsent(storeName, k -> { ClientConfig clonedClientConfig = ClientConfig.cloneConfig(clientConfig) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java index 8477945c160..333c778fdfe 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java @@ -8,25 +8,16 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.client.store.ClientConfig; -import com.linkedin.venice.client.store.schemas.TestKeyRecord; -import com.linkedin.venice.client.store.schemas.TestValueRecord; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.schema.SchemaData; -import com.linkedin.venice.system.store.MetaStoreDataType; -import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; -import com.linkedin.venice.systemstore.schemas.StoreMetaKey; -import com.linkedin.venice.systemstore.schemas.StoreMetaValue; -import com.linkedin.venice.systemstore.schemas.StoreValueSchema; -import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import java.time.Clock; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -34,7 +25,8 @@ public class NativeMetadataRepositoryTest { - private ClientConfig clientConfig; + private ClientConfig clientConfigThinClient; + private ClientConfig clientConfigRequestBased; private VeniceProperties backendConfig; private MetricsRepository metricsRepository; private Clock clock; @@ -42,19 +34,22 @@ public class NativeMetadataRepositoryTest { @BeforeMethod public void setUpMocks() { - clientConfig = mock(ClientConfig.class); + clientConfigThinClient = mock(ClientConfig.class); + clientConfigRequestBased = mock(ClientConfig.class); backendConfig = mock(VeniceProperties.class); doReturn(1L).when(backendConfig).getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); metricsRepository = new MetricsRepository(); - doReturn(metricsRepository).when(clientConfig).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigThinClient).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigRequestBased).getMetricsRepository(); + doReturn(true).when(clientConfigRequestBased).isUseRequestBasedMetaRepository(); clock = mock(Clock.class); doReturn(0L).when(clock).millis(); } @Test - public void testGetInstance() { + public void testGetThinClientInstance() { NativeMetadataRepository nativeMetadataRepository = - NativeMetadataRepository.getInstance(clientConfig, backendConfig); + NativeMetadataRepository.getInstance(clientConfigThinClient, backendConfig); Assert.assertTrue(nativeMetadataRepository instanceof ThinClientMetaStoreBasedRepository); Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); @@ -64,9 +59,22 @@ public void testGetInstance() { Assert.assertThrows(() -> nativeMetadataRepository.start()); } + @Test + public void testGetRequestBasedInstance() { + NativeMetadataRepository nativeMetadataRepository = + NativeMetadataRepository.getInstance(clientConfigRequestBased, backendConfig); + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + nativeMetadataRepository.start(); + nativeMetadataRepository.clear(); + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + Assert.assertThrows(() -> nativeMetadataRepository.start()); + } + @Test public void testGetSchemaDataFromReadThroughCache() throws InterruptedException { - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertThrows(VeniceNoStoreException.class, () -> nmr.getKeySchema(STORE_NAME)); nmr.subscribe(STORE_NAME); @@ -77,7 +85,7 @@ public void testGetSchemaDataFromReadThroughCache() throws InterruptedException public void testGetSchemaDataEfficiently() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertEquals(nmr.keySchemaRequestCount, 0); Assert.assertEquals(nmr.valueSchemasRequestCount, 0); @@ -95,13 +103,12 @@ public void testGetSchemaDataEfficiently() throws InterruptedException { Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1); Assert.assertNotNull(nmr.getKeySchema(STORE_NAME)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1)); - // Refresh the store a few more times to retrieve value schema v2 for (int i = 0; i < 10; i++) { nmr.refreshOneStore(STORE_NAME); } Assert.assertEquals(nmr.keySchemaRequestCount, 1); Assert.assertEquals(nmr.valueSchemasRequestCount, 12); - Assert.assertEquals(nmr.specificValueSchemaRequestCount, 2); + Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1); Assert.assertNotNull(nmr.getKeySchema(STORE_NAME)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 2)); @@ -115,7 +122,7 @@ public void testGetSchemaDataEfficiently() throws InterruptedException { public void testNativeMetadataRepositoryStats() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); nmr.subscribe(STORE_NAME); doReturn(1000L).when(clock).millis(); @@ -154,19 +161,27 @@ static class TestNMR extends NativeMetadataRepository { int valueSchemasRequestCount = 0; int specificValueSchemaRequestCount = 0; + private static final String INT_KEY_SCHEMA = "\"int\""; + + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig, Clock clock) { super(clientConfig, backendConfig, clock); } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { StoreConfig storeConfig = mock(StoreConfig.class); when(storeConfig.isDeleting()).thenReturn(false); return storeConfig; } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { Store store = mock(Store.class); when(store.getName()).thenReturn(storeName); when(store.getReadQuotaInCU()).thenReturn(1L); @@ -174,38 +189,25 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { } @Override - protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { - StoreMetaValue storeMetaValue = new StoreMetaValue(); - MetaStoreDataType metaStoreDataType = MetaStoreDataType.valueOf(key.metadataType); - switch (metaStoreDataType) { - case STORE_KEY_SCHEMAS: - Map keySchemaMap = new HashMap<>(); - keySchemaMap.put(String.valueOf(1), TestKeyRecord.SCHEMA$.toString()); - storeMetaValue.storeKeySchemas = new StoreKeySchemas(keySchemaMap); - keySchemaRequestCount++; - break; - case STORE_VALUE_SCHEMAS: - Map valueSchemaMap = new HashMap<>(); - valueSchemaMap.put(String.valueOf(1), ""); - if (valueSchemasRequestCount > 1) { - valueSchemaMap.put(String.valueOf(2), ""); - } - storeMetaValue.storeValueSchemas = new StoreValueSchemas(valueSchemaMap); - valueSchemasRequestCount++; - break; - case STORE_VALUE_SCHEMA: - storeMetaValue.storeValueSchema = new StoreValueSchema(TestValueRecord.SCHEMA$.toString()); - specificValueSchemaRequestCount++; - break; - default: - // do nothing + protected SchemaData getSchemaData(String storeName) { + if (schemaMap.containsKey(storeName)) { + valueSchemasRequestCount++; + return schemaMap.get(storeName); } - return storeMetaValue; - } - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); + // Mock schemas for testing + SchemaEntry schemaEntry = new SchemaEntry(0, INT_KEY_SCHEMA); + SchemaData schemaData = new SchemaData(storeName, schemaEntry); + schemaData.addValueSchema(new SchemaEntry(1, VALUE_SCHEMA_1)); + schemaData.addValueSchema(new SchemaEntry(2, VALUE_SCHEMA_2)); + + // Mock metrics + keySchemaRequestCount++; + valueSchemasRequestCount++; + specificValueSchemaRequestCount++; + + schemaMap.put(storeName, schemaData); + return schemaData; } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java new file mode 100644 index 00000000000..cff5a02d54f --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java @@ -0,0 +1,235 @@ +package com.linkedin.davinci.repository; + +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.meta.QueryAction; +import com.linkedin.venice.meta.ReadOnlyStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serializer.FastAvroSerializer; +import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class RequestBasedMetaRepositoryTest { + private static final Logger LOGGER = LogManager.getLogger(RequestBasedMetaRepositoryTest.class); + private Random RANDOM; + + private Store store; + private static final String D2_SERVICE_NAME = "D2_SERVICE_NAME"; + private StorePropertiesResponseRecord MOCK_STORE_PROPERTIES_RESPONSE_RECORD; + + // Mock schemas + private static final String INT_KEY_SCHEMA = "\"int\""; + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + + @BeforeClass + public void beforeClassSetup() { + long seed = System.nanoTime(); + RANDOM = new Random(seed); + LOGGER.info("Random seed set: {}", seed); + } + + @BeforeMethod + public void beforeMethodSetup() { + + // Store + setupTestStore(); + + // StorePropertiesResponseRecord + setupTestStorePropertiesResponse(); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreConfigFromRemote() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + + // Test FetchStoreConfigFromRemote + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), D2_SERVICE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertNull(storeConfig.getMigrationDestCluster()); + Assert.assertNull(storeConfig.getMigrationSrcCluster()); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreFromRemote() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + + // Test FetchStoreConfigFromRemote + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), D2_SERVICE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertEquals(storeConfig.getMigrationDestCluster(), null); + Assert.assertEquals(storeConfig.getMigrationSrcCluster(), null); + } + + @Test + public void testRequestBasedMetaRepositoryFetchAndCacheStorePropertiesResponseRecord() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchAndCacheStorePropertiesResponseRecord(store.getName())).thenCallRealMethod(); + + // Test FetchAndCacheStorePropertiesResponseRecord + StorePropertiesResponseRecord record = + requestBasedMetaRepository.fetchAndCacheStorePropertiesResponseRecord(store.getName()); + Assert.assertNotNull(record); + Assert.assertNotNull(record.getStoreMetaValue()); + Assert.assertNotNull(record.getStoreMetaValue().getStoreProperties()); + Assert.assertEquals(record.getStoreMetaValue().getStoreProperties().getName().toString(), store.getName()); + Assert.assertEquals(record.getStoreMetaValue().getStoreProperties().getOwner().toString(), store.getOwner()); + Assert + .assertEquals(record.getStoreMetaValue().getStoreProperties().getVersions().size(), store.getVersions().size()); + Assert.assertEquals(record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().size(), 1); + Assert.assertEquals(record.getStoreMetaValue().getStoreValueSchemas().getValueSchemaMap().size(), 2); + } + + @Test + public void testRequestBasedMetaRepositoryGetMaxValueSchemaId() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.getMaxValueSchemaId(store.getName())).thenCallRealMethod(); + + // Test GetMaxValueSchemaId + int schemaId = requestBasedMetaRepository.getMaxValueSchemaId(store.getName()); + Assert.assertEquals(schemaId, SchemaData.UNKNOWN_SCHEMA_ID); + + // Put schema + SchemaEntry schemaEntryKey = new SchemaEntry(0, INT_KEY_SCHEMA); + SchemaEntry schemaEntryValue1 = new SchemaEntry(1, VALUE_SCHEMA_1); + SchemaEntry schemaEntryValue2 = new SchemaEntry(2, VALUE_SCHEMA_2); + SchemaData schemaData = new SchemaData(store.getName(), schemaEntryKey); + schemaData.addValueSchema(schemaEntryValue1); + schemaData.addValueSchema(schemaEntryValue2); + requestBasedMetaRepository.schemaMap.put(store.getName(), schemaData); + + // Test GetMaxValueSchemaId + schemaId = requestBasedMetaRepository.getMaxValueSchemaId(store.getName()); + Assert.assertEquals(schemaId, 2); + } + + @Test + public void testRequestBasedMetaRepositoryCacheStoreSchema() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + doCallRealMethod().when(requestBasedMetaRepository) + .cacheStoreSchema(store.getName(), MOCK_STORE_PROPERTIES_RESPONSE_RECORD); + + requestBasedMetaRepository.cacheStoreSchema(store.getName(), MOCK_STORE_PROPERTIES_RESPONSE_RECORD); + SchemaData schemaData = requestBasedMetaRepository.storeSchemaMap.get(store.getName()); + Assert.assertNotNull(schemaData); + Assert.assertEquals(schemaData.getKeySchema().getSchemaStr(), INT_KEY_SCHEMA); + Assert.assertEquals(schemaData.getValueSchema(1).getSchemaStr(), VALUE_SCHEMA_1); + Assert.assertEquals(schemaData.getValueSchema(2).getSchemaStr(), VALUE_SCHEMA_2); + } + + private RequestBasedMetaRepository getMockRequestBasedMetaRepository() { + RequestBasedMetaRepository requestBasedMetaRepository = mock(RequestBasedMetaRepository.class); + + // Schema Map + requestBasedMetaRepository.storeSchemaMap = new VeniceConcurrentHashMap<>(); + requestBasedMetaRepository.schemaMap = new VeniceConcurrentHashMap<>(); + + // Mock D2TransportClient + try { + D2TransportClient d2TransportClient = getMockD2TransportClient(); + when(requestBasedMetaRepository.getD2TransportClient(store.getName())).thenReturn(d2TransportClient); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Mock max value schema id + when(requestBasedMetaRepository.getMaxValueSchemaId(store.getName())).thenReturn(SchemaData.UNKNOWN_SCHEMA_ID); + + return requestBasedMetaRepository; + } + + private D2TransportClient getMockD2TransportClient() + throws InterruptedException, java.util.concurrent.ExecutionException { + + // Mock D2 + D2TransportClient d2TransportClient = mock(D2TransportClient.class); + when(d2TransportClient.getServiceName()).thenReturn(D2_SERVICE_NAME); + + // Mock request + String mockURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + store.getName(); + TransportClientResponse mockResponse = mock(TransportClientResponse.class); + CompletableFuture completableFuture = mock(CompletableFuture.class); + RecordSerializer recordSerializer = + new FastAvroSerializer<>(StorePropertiesResponseRecord.SCHEMA$, null); + when(completableFuture.get()).thenReturn(mockResponse); + when(mockResponse.getBody()).thenReturn(recordSerializer.serialize(MOCK_STORE_PROPERTIES_RESPONSE_RECORD)); + when(d2TransportClient.get(mockURL)).thenReturn(completableFuture); + + return d2TransportClient; + } + + private void setupTestStore() { + store = TestUtils.populateZKStore( + (ZKStore) TestUtils.createTestStore( + Long.toString(RANDOM.nextLong()), + Long.toString(RANDOM.nextLong()), + System.currentTimeMillis()), + RANDOM); + } + + private void setupTestStorePropertiesResponse() { + StorePropertiesResponseRecord record = new StorePropertiesResponseRecord(); + + // StoreMetaValue + StoreMetaValue storeMetaValue = new StoreMetaValue(); + storeMetaValue.setStoreProperties(new ReadOnlyStore(store).cloneStoreProperties()); + + // Key Schema + Map storeKeySchemas = new VeniceConcurrentHashMap<>(); + storeKeySchemas.put("0", INT_KEY_SCHEMA); + storeMetaValue.setStoreKeySchemas(new StoreKeySchemas(storeKeySchemas)); + + // Value Schemas + Map storeValueSchemas = new VeniceConcurrentHashMap<>(); + storeValueSchemas.put("1", VALUE_SCHEMA_1); + storeValueSchemas.put("2", VALUE_SCHEMA_2); + + storeMetaValue.setStoreValueSchemas(new StoreValueSchemas(storeValueSchemas)); + + record.setStoreMetaValue(storeMetaValue); + + MOCK_STORE_PROPERTIES_RESPONSE_RECORD = record; + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java new file mode 100644 index 00000000000..caccf895f54 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java @@ -0,0 +1,155 @@ +package com.linkedin.davinci.repository; + +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.meta.ReadOnlyStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.system.store.MetaStoreDataType; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; +import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; +import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ThinClientMetaStoreBasedRepositoryTest { + private static final Logger LOGGER = LogManager.getLogger(ThinClientMetaStoreBasedRepositoryTest.class); + private Random RANDOM; + + private Store store; + private final String CLUSTER_NAME = "CLUSTER_NAME"; + + // Mock schemas + private static final String INT_KEY_SCHEMA = "\"int\""; + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + + @BeforeClass + public void setupRandom() { + long seed = System.nanoTime(); + RANDOM = new Random(seed); + LOGGER.info("Random seed set: {}", seed); + } + + @BeforeMethod + public void beforeMethodSetup() { + + // Store + setUpTestStore(); + } + + @Test + public void testFetchStoreConfigFromRemote() { + + // Mock ThinClientMetaStoreBasedRepository + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = getMockThinClientMetaStoreBasedRepository(); + + // Test FetchStoreConfigFromRemote + when(thinClientMetaStoreBasedRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + StoreConfig storeConfig = thinClientMetaStoreBasedRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertEquals(storeConfig.getCluster(), CLUSTER_NAME); + } + + @Test + public void testGetSchemaData() { + + // Mock ThinClientMetaStoreBasedRepository + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = getMockThinClientMetaStoreBasedRepository(); + + // Test GetSchemaData + when(thinClientMetaStoreBasedRepository.getSchemaData(store.getName())).thenCallRealMethod(); + SchemaData schemaData = thinClientMetaStoreBasedRepository.getSchemaData(store.getName()); + Assert.assertNotNull(schemaData); + Assert.assertEquals(schemaData.getStoreName(), store.getName()); + Assert.assertEquals(schemaData.getKeySchema().getSchemaStr(), INT_KEY_SCHEMA); + Assert.assertEquals(schemaData.getValueSchema(1).getSchemaStr(), VALUE_SCHEMA_1); + Assert.assertEquals(schemaData.getValueSchema(2).getSchemaStr(), VALUE_SCHEMA_2); + } + + private ThinClientMetaStoreBasedRepository getMockThinClientMetaStoreBasedRepository() { + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = + mock(ThinClientMetaStoreBasedRepository.class); + + // schemaMap + thinClientMetaStoreBasedRepository.schemaMap = new VeniceConcurrentHashMap<>(); + + // StoreMetaValue + StoreMetaValue storeMetaValue = new StoreMetaValue(); + storeMetaValue.setStoreProperties(new ReadOnlyStore(store).cloneStoreProperties()); + + // Key Schema + Map storeKeySchemas = new VeniceConcurrentHashMap<>(); + storeKeySchemas.put("0", INT_KEY_SCHEMA); + storeMetaValue.setStoreKeySchemas(new StoreKeySchemas(storeKeySchemas)); + + // Value Schemas + Map storeValueSchemas = new VeniceConcurrentHashMap<>(); + storeValueSchemas.put("1", VALUE_SCHEMA_1); + storeValueSchemas.put("2", VALUE_SCHEMA_2); + + storeMetaValue.setStoreValueSchemas(new StoreValueSchemas(storeValueSchemas)); + + // Store Cluster Config + StoreClusterConfig storeClusterConfig = new StoreClusterConfig(); + storeClusterConfig.setStoreName(store.getName()); + storeClusterConfig.setCluster(CLUSTER_NAME); + storeMetaValue.setStoreClusterConfig(storeClusterConfig); + + // Mock getStoreMetaValue + StoreMetaKey storeMetaKey; + storeMetaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, store.getName()); + put(KEY_STRING_CLUSTER_NAME, CLUSTER_NAME); + } + }); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_CLUSTER_CONFIG + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + + return thinClientMetaStoreBasedRepository; + } + + private void setUpTestStore() { + store = TestUtils.populateZKStore( + (ZKStore) TestUtils.createTestStore( + Long.toString(RANDOM.nextLong()), + Long.toString(RANDOM.nextLong()), + System.currentTimeMillis()), + RANDOM); + } +} diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java index fe562fbc547..383e53e8fe8 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java @@ -65,6 +65,9 @@ public class ClientConfig { // HttpTransport settings private int maxConnectionsPerRoute; // only for HTTP1 + // NativeMetadataRepository settings + private boolean useRequestBasedMetaRepository = false; + private int maxConnectionsTotal; // only for HTTP1 private boolean httpClient5Http2Enabled; @@ -102,7 +105,7 @@ public static ClientConfig cloneConfig(ClientConfi .setD2ZkTimeout(config.getD2ZkTimeout()) .setD2Client(config.getD2Client()) .setD2Routing(config.isD2Routing()) // This should be the last of the D2 configs since it is an inferred config - // and we want the cloned config to match the source config + // and we want the cloned config to match the source config // Performance-related settings .setMetricsRepository(config.getMetricsRepository()) @@ -279,6 +282,15 @@ public ClientConfig setMaxConnectionsPerRoute(int maxConnectionsPerRoute) { return this; } + public boolean isUseRequestBasedMetaRepository() { + return useRequestBasedMetaRepository; + } + + public ClientConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) { + this.useRequestBasedMetaRepository = useRequestBasedMetaRepository; + return this; + } + public int getMaxConnectionsTotal() { return maxConnectionsTotal; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 304eeaabae4..7d2fc390685 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -924,7 +924,7 @@ public StoreProperties cloneStoreProperties() { storeProperties.setBootstrapToOnlineTimeoutInHours(getBootstrapToOnlineTimeoutInHours()); // storeProperties.setLeaderFollowerModelEnabled(isLeaderFollowerModelEnabled()); storeProperties.setNativeReplicationEnabled(isNativeReplicationEnabled()); - // storeProperties.setReplicationMetadataVersionID(getReplicationMetadataVersionID()); + storeProperties.setReplicationMetadataVersionID(getRmdVersion()); storeProperties.setPushStreamSourceAddress(getPushStreamSourceAddress()); storeProperties.setBackupStrategy(getBackupStrategy().getValue()); storeProperties.setSchemaAutoRegisteFromPushJobEnabled(isSchemaAutoRegisterFromPushJobEnabled()); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java index 8ca379a50e8..34719c11448 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java @@ -2,7 +2,6 @@ import static org.testng.Assert.*; -import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.systemstore.schemas.StoreETLConfig; import com.linkedin.venice.systemstore.schemas.StoreHybridConfig; import com.linkedin.venice.systemstore.schemas.StorePartitionerConfig; @@ -33,11 +32,12 @@ public void setupReadOnlyStore() { @Test public void testCloneStoreProperties() { - ZKStore store = populateZKStore( + ZKStore store = TestUtils.populateZKStore( (ZKStore) TestUtils.createTestStore( Long.toString(RANDOM.nextLong()), Long.toString(RANDOM.nextLong()), - System.currentTimeMillis())); + System.currentTimeMillis()), + RANDOM); ReadOnlyStore readOnlyStore = new ReadOnlyStore(store); StoreProperties storeProperties = readOnlyStore.cloneStoreProperties(); @@ -106,58 +106,6 @@ public void testCloneStoreProperties() { assertEquals(storeProperties.getNearlineProducerCountPerWriter(), store.getNearlineProducerCountPerWriter()); } - private ZKStore populateZKStore(ZKStore store) { - store.setCurrentVersion(RANDOM.nextInt()); - store.setPartitionCount(RANDOM.nextInt()); - store.setLowWatermark(RANDOM.nextLong()); - store.setEnableWrites(false); - store.setEnableReads(true); - store.setStorageQuotaInByte(RANDOM.nextLong()); - store.setReadQuotaInCU(RANDOM.nextLong()); - store.setHybridStoreConfig(TestUtils.createTestHybridStoreConfig(RANDOM)); - store.setViewConfigs(TestUtils.createTestViewConfigs(RANDOM)); - store.setCompressionStrategy(CompressionStrategy.GZIP); - store.setClientDecompressionEnabled(true); - store.setChunkingEnabled(true); - store.setRmdChunkingEnabled(true); - store.setBatchGetLimit(RANDOM.nextInt()); - store.setNumVersionsToPreserve(RANDOM.nextInt()); - store.setIncrementalPushEnabled(true); - store.setSeparateRealTimeTopicEnabled(true); - store.setMigrating(true); - store.setWriteComputationEnabled(true); - store.setReadComputationEnabled(true); - store.setBootstrapToOnlineTimeoutInHours(RANDOM.nextInt()); - store.setNativeReplicationEnabled(true); - store.setPushStreamSourceAddress("push_stream_source"); - store.setBackupStrategy(BackupStrategy.DELETE_ON_NEW_PUSH_START); - store.setSchemaAutoRegisterFromPushJobEnabled(true); - store.setLatestSuperSetValueSchemaId(RANDOM.nextInt()); - store.setHybridStoreDiskQuotaEnabled(true); - store.setStoreMetaSystemStoreEnabled(true); - store.setEtlStoreConfig(TestUtils.createTestETLStoreConfig()); - store.setPartitionerConfig(TestUtils.createTestPartitionerConfig(RANDOM)); - store.setLatestVersionPromoteToCurrentTimestamp(RANDOM.nextLong()); - store.setBackupVersionRetentionMs(RANDOM.nextLong()); - store.setMigrationDuplicateStore(true); - store.setNativeReplicationSourceFabric("native_replication_source_fabric"); - store.setDaVinciPushStatusStoreEnabled(true); - store.setStoreMetadataSystemStoreEnabled(true); - store.setActiveActiveReplicationEnabled(true); - store.setMinCompactionLagSeconds(RANDOM.nextLong()); - store.setMaxCompactionLagSeconds(RANDOM.nextLong()); - store.setMaxRecordSizeBytes(RANDOM.nextInt()); - store.setMaxNearlineRecordSizeBytes(RANDOM.nextInt()); - store.setUnusedSchemaDeletionEnabled(true); - store.setVersions(TestUtils.createTestVersions(store.getName(), RANDOM)); - store.setSystemStores(TestUtils.createTestSystemStores(store.getName(), RANDOM)); - store.setStorageNodeReadQuotaEnabled(true); - store.setBlobTransferEnabled(true); - store.setNearlineProducerCompressionEnabled(true); - store.setNearlineProducerCountPerWriter(RANDOM.nextInt()); - return store; - } - private void assertEqualHybridConfig(StoreHybridConfig actual, HybridStoreConfig expected) { assertEquals(actual.getRewindTimeInSeconds(), expected.getRewindTimeInSeconds()); assertEquals(actual.getOffsetLagThresholdToGoOnline(), expected.getOffsetLagThresholdToGoOnline()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java index 57c667329c4..7dc55dc7b34 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java @@ -15,6 +15,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.repository.NativeMetadataRepository; +import com.linkedin.davinci.repository.RequestBasedMetaRepository; import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AvroSpecificStoreClient; @@ -38,6 +39,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.system.store.MetaStoreDataType; @@ -287,6 +289,42 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException } } + @Test(timeOut = 120 * Time.MS_PER_SECOND) + public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException { + String regularVeniceStoreName = Utils.getUniqueString("venice_store"); + createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName); + D2Client d2Client = null; + NativeMetadataRepository nativeMetadataRepository = null; + try { + d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress()); + ClientConfig clientConfig = + getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true); + // Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store + // current version. + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig); + nativeMetadataRepository.start(); + // ThinClientMetaStoreBasedRepository implementation should be used since CLIENT_USE_META_SYSTEM_STORE_REPOSITORY + // is set to true without enabling other feature flags. + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + verifyRepository(nativeMetadataRepository, regularVeniceStoreName); + } finally { + if (d2Client != null) { + D2ClientUtils.shutdownClient(d2Client); + } + if (nativeMetadataRepository != null) { + // Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to + // initialize + // a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not + // leaked + // into other tests. + nativeMetadataRepository.clear(); + } + } + } + @Test(timeOut = 60 * Time.MS_PER_SECOND) public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException { String regularVeniceStoreName = Utils.getUniqueString("venice_store"); @@ -393,12 +431,14 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertNull(nativeMetadataRepository.getStore("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.getStoreOrThrow("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.subscribe("Non-existing-store")); - nativeMetadataRepository.subscribe(regularVeniceStoreName); - Store store = nativeMetadataRepository.getStore(regularVeniceStoreName); - Store controllerStore = new ReadOnlyStore( - veniceLocalCluster.getLeaderVeniceController().getVeniceAdmin().getStore(clusterName, regularVeniceStoreName)); - assertEquals(store, controllerStore); + Store store = normalizeStore(new ReadOnlyStore(nativeMetadataRepository.getStore(regularVeniceStoreName))); + Store controllerStore = normalizeStore( + new ReadOnlyStore( + veniceLocalCluster.getLeaderVeniceController() + .getVeniceAdmin() + .getStore(clusterName, regularVeniceStoreName))); + assertEquals(store.toString(), controllerStore.toString()); SchemaEntry keySchema = nativeMetadataRepository.getKeySchema(regularVeniceStoreName); SchemaEntry controllerKeySchema = veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() @@ -423,9 +463,9 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertEquals(nativeRepoStore.getStorageQuotaInByte(), storageQuota); }); assertFalse(controllerClient.addValueSchema(regularVeniceStoreName, VALUE_SCHEMA_2).isError()); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { assertEquals( - nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), + nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), // this does not retry, only executed onces veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() .getValueSchemas(clusterName, regularVeniceStoreName)); @@ -448,6 +488,10 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, }); } + private Store normalizeStore(ReadOnlyStore store) { + return new ReadOnlyStore(new ZKStore(store.cloneStoreProperties())); + } + private void createStoreAndMaterializeMetaSystemStore(String storeName) { createStoreAndMaterializeMetaSystemStore(storeName, VALUE_SCHEMA_1); } @@ -455,8 +499,17 @@ private void createStoreAndMaterializeMetaSystemStore(String storeName) { private void createStoreAndMaterializeMetaSystemStore(String storeName, String valueSchema) { // Verify and create Venice regular store if it doesn't exist. if (parentControllerClient.getStore(storeName).getStore() == null) { - assertFalse( - parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema).isError()); + NewStoreResponse resp = + parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema); + if (resp.isError()) { + System.out.println("Create new store failed: " + resp.getError()); + } + assertFalse(resp.isError()); + assertFalse(parentControllerClient.emptyPush(storeName, "test-push-job", 100).isError()); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } } String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); TestUtils.waitForNonDeterministicPushCompletion( diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 2ee342811d6..7f47088a3f1 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -48,6 +48,7 @@ import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor; import com.linkedin.venice.kafka.protocol.state.PartitionState; +import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.ETLStoreConfig; @@ -517,6 +518,58 @@ public static Store createTestStore(String name, String owner, long createdTime) return store; } + public static ZKStore populateZKStore(ZKStore store, Random random) { + store.setCurrentVersion(random.nextInt()); + store.setPartitionCount(random.nextInt()); + store.setLowWatermark(random.nextLong()); + store.setEnableWrites(false); + store.setEnableReads(true); + store.setStorageQuotaInByte(random.nextLong()); + store.setReadQuotaInCU(random.nextLong()); + store.setHybridStoreConfig(TestUtils.createTestHybridStoreConfig(random)); + store.setViewConfigs(TestUtils.createTestViewConfigs(random)); + store.setCompressionStrategy(CompressionStrategy.GZIP); + store.setClientDecompressionEnabled(true); + store.setChunkingEnabled(true); + store.setRmdChunkingEnabled(true); + store.setBatchGetLimit(random.nextInt()); + store.setNumVersionsToPreserve(random.nextInt()); + store.setIncrementalPushEnabled(true); + store.setSeparateRealTimeTopicEnabled(true); + store.setMigrating(true); + store.setWriteComputationEnabled(true); + store.setReadComputationEnabled(true); + store.setBootstrapToOnlineTimeoutInHours(random.nextInt()); + store.setNativeReplicationEnabled(true); + store.setPushStreamSourceAddress("push_stream_source"); + store.setBackupStrategy(BackupStrategy.DELETE_ON_NEW_PUSH_START); + store.setSchemaAutoRegisterFromPushJobEnabled(true); + store.setLatestSuperSetValueSchemaId(random.nextInt()); + store.setHybridStoreDiskQuotaEnabled(true); + store.setStoreMetaSystemStoreEnabled(true); + store.setEtlStoreConfig(TestUtils.createTestETLStoreConfig()); + store.setPartitionerConfig(TestUtils.createTestPartitionerConfig(random)); + store.setLatestVersionPromoteToCurrentTimestamp(random.nextLong()); + store.setBackupVersionRetentionMs(random.nextLong()); + store.setMigrationDuplicateStore(true); + store.setNativeReplicationSourceFabric("native_replication_source_fabric"); + store.setDaVinciPushStatusStoreEnabled(true); + store.setStoreMetadataSystemStoreEnabled(true); + store.setActiveActiveReplicationEnabled(true); + store.setMinCompactionLagSeconds(random.nextLong()); + store.setMaxCompactionLagSeconds(random.nextLong()); + store.setMaxRecordSizeBytes(random.nextInt()); + store.setMaxNearlineRecordSizeBytes(random.nextInt()); + store.setUnusedSchemaDeletionEnabled(true); + store.setVersions(TestUtils.createTestVersions(store.getName(), random)); + store.setSystemStores(TestUtils.createTestSystemStores(store.getName(), random)); + store.setStorageNodeReadQuotaEnabled(true); + store.setBlobTransferEnabled(true); + store.setNearlineProducerCompressionEnabled(true); + store.setNearlineProducerCountPerWriter(random.nextInt()); + return store; + } + public static HybridStoreConfig createTestHybridStoreConfig(Random random) { HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl( random.nextLong(), diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index 878e231566e..39fdaacce0d 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -180,7 +180,7 @@ public StorePropertiesResponse getStoreProperties(String storeName, Optional entry: valueSchemas.entrySet()) { int schemaId = Integer.parseInt(entry.getKey().toString()); if (schemaId > largestKnownSchemaId.get()) { - storeValueSchemas.put(schemaId, entry.getValue()); + storeValueSchemas.valueSchemaMap.put(Integer.toString(schemaId), entry.getValue()); } } } else { diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index 3cd9de7ea97..4bd0ea0f926 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -158,38 +158,65 @@ public void testGetStoreProperties() { doReturn(readyToServeInstances).when(partition).getReadyToServeInstances(); partitionAssignment.addPartition(partition); String schema = "\"string\""; + ArrayList valueSchemas = new ArrayList<>(); + final int schemaCount = 3; + for (int i = 1; i <= schemaCount; i++) { + valueSchemas.add(new SchemaEntry(i, schema)); + } doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName); Mockito.when(mockSchemaRepo.getKeySchema(storeName)).thenReturn(new SchemaEntry(0, schema)); - Mockito.when(mockSchemaRepo.getValueSchemas(storeName)) - .thenReturn(Collections.singletonList(new SchemaEntry(0, schema))); + Mockito.when(mockSchemaRepo.getValueSchemas(storeName)).thenReturn(valueSchemas); Mockito.when(mockCustomizedViewRepository.getPartitionAssignments(topicName)).thenReturn(partitionAssignment); Mockito.when(mockHelixInstanceConfigRepository.getInstanceGroupIdMapping()).thenReturn(Collections.emptyMap()); - mockStore.setStorageNodeReadQuotaEnabled(true); + + // Request StorePropertiesResponse storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + + // Assert response Assert.assertNotNull(storePropertiesResponse); Assert.assertNotNull(storePropertiesResponse.getResponseRecord()); Assert.assertNotNull(storePropertiesResponse.getResponseRecord().getStoreMetaValue()); Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().get("0"), "\"string\""); - // Verify the metadata Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreProperties().getVersions().size(), 2); Assert.assertEquals(storePropertiesResponse.getResponseRecord().getRoutingInfo().get("0").size(), 1); + ServerCurrentVersionResponse currentVersionResponse = + serverReadMetadataRepository.getCurrentVersionResponse(storeName); + Assert.assertNotNull(currentVersionResponse); + Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Assert metrics repo String metadataInvokeMetricName = ".ServerMetadataStats--request_based_metadata_invoke_count.Rate"; String metadataFailureMetricName = ".ServerMetadataStats--request_based_metadata_failure_count.Rate"; Assert.assertTrue(metricsRepository.getMetric(metadataInvokeMetricName).value() > 0); Assert.assertEquals(metricsRepository.getMetric(metadataFailureMetricName).value(), 0d); - ServerCurrentVersionResponse currentVersionResponse = - serverReadMetadataRepository.getCurrentVersionResponse(storeName); - Assert.assertNotNull(currentVersionResponse); - Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Test largestKnownSchemaID param + for (int i = 0; i <= schemaCount; i++) { + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.of(i)); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount - i); + } + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount); + // Value update test mockStore.setBatchGetLimit(300); storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); Assert.assertEquals( From 70236be970e1e62396a31582785fd472da8e2dd9 Mon Sep 17 00:00:00 2001 From: pthirun Date: Thu, 6 Feb 2025 16:14:02 -0800 Subject: [PATCH 2/3] add requestbasedmetarepository integration testing --- .../venice/endToEnd/MetaSystemStoreTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java index 7dc55dc7b34..89f2e4f1095 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java @@ -289,6 +289,15 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException } } + // TODO PRANAV move this test and use the full DVC + // Can we add a new test file where we run a more + // comprehensive integration test with a DVC? i.e + // push some new versions or make some store config + // changes and make sure the DVC pick up those changes. + // You can see examples like the recently added + // testBatchOnlyMaterializedViewDVCConsumer. + // You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper, + // a single region will be sufficient. @Test(timeOut = 120 * Time.MS_PER_SECOND) public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException { String regularVeniceStoreName = Utils.getUniqueString("venice_store"); @@ -306,8 +315,7 @@ public void testRequestBasedMetaStoreBasedRepository() throws InterruptedExcepti .build(); nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig); nativeMetadataRepository.start(); - // ThinClientMetaStoreBasedRepository implementation should be used since CLIENT_USE_META_SYSTEM_STORE_REPOSITORY - // is set to true without enabling other feature flags. + // RequestBasedMetaRepository implementation should be used since Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); verifyRepository(nativeMetadataRepository, regularVeniceStoreName); } finally { From ea4f353c9665f0df21cbfb6cc12769d068d06df0 Mon Sep 17 00:00:00 2001 From: pthirun Date: Tue, 11 Feb 2025 17:41:04 -0800 Subject: [PATCH 3/3] add requestbasedmetarepository integration testing --- .../davinci/client/DaVinciConfig.java | 15 + .../factory/CachingDaVinciClientFactory.java | 3 +- .../repository/NativeMetadataRepository.java | 1 - .../RequestBasedMetaRepository.java | 4 +- .../venice/endToEnd/MetaSystemStoreTest.java | 45 --- ...TestDaVinciRequestBasedMetaRepository.java | 269 ++++++++++++++++++ .../linkedin/venice/utils/TestWriteUtils.java | 105 +++++-- 7 files changed, 375 insertions(+), 67 deletions(-) create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDaVinciRequestBasedMetaRepository.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java index 013c5f3c0db..7c063c385b6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciConfig.java @@ -47,6 +47,12 @@ public class DaVinciConfig { */ private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD; + /** + * Determines whether to enable request-based metadata retrieval directly from the Venice Server. + * By default, metadata is retrieved from a system store via a thin client. + */ + private boolean useRequestBasedMetaRepository = false; + public DaVinciConfig() { } @@ -147,4 +153,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold; return this; } + + public boolean isUseRequestBasedMetaRepository() { + return useRequestBasedMetaRepository; + } + + public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) { + this.useRequestBasedMetaRepository = useRequestBasedMetaRepository; + return this; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java index f57bfe7c565..065c23f97b5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient( ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client) .setD2ServiceName(clusterDiscoveryD2ServiceName) .setMetricsRepository(metricsRepository) - .setSpecificValueClass(valueClass); + .setSpecificValueClass(valueClass) + .setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository()); DaVinciClient client; if (config.isIsolated()) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java index b1cdb7782a5..d7da1e16e12 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java @@ -117,7 +117,6 @@ public static NativeMetadataRepository getInstance( ClientConfig clientConfig, VeniceProperties backendConfig, ICProvider icProvider) { - NativeMetadataRepository nativeMetadataRepository; if (clientConfig.isUseRequestBasedMetaRepository()) { nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java index 6a7bb50900c..a5af7aa195d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java @@ -130,9 +130,8 @@ protected int getMaxValueSchemaId(String storeName) { } protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) { - if (!storeSchemaMap.containsKey(storeName)) { - // New schema data + // New store Map.Entry keySchemaEntry = record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next(); SchemaData schemaData = new SchemaData( @@ -140,7 +139,6 @@ protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString())); storeSchemaMap.put(storeName, schemaData); } - // Store Value Schemas for (Map.Entry entry: record.getStoreMetaValue() .getStoreValueSchemas() diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java index 89f2e4f1095..8a8c213f191 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java @@ -15,7 +15,6 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.repository.NativeMetadataRepository; -import com.linkedin.davinci.repository.RequestBasedMetaRepository; import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AvroSpecificStoreClient; @@ -289,50 +288,6 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException } } - // TODO PRANAV move this test and use the full DVC - // Can we add a new test file where we run a more - // comprehensive integration test with a DVC? i.e - // push some new versions or make some store config - // changes and make sure the DVC pick up those changes. - // You can see examples like the recently added - // testBatchOnlyMaterializedViewDVCConsumer. - // You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper, - // a single region will be sufficient. - @Test(timeOut = 120 * Time.MS_PER_SECOND) - public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException { - String regularVeniceStoreName = Utils.getUniqueString("venice_store"); - createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName); - D2Client d2Client = null; - NativeMetadataRepository nativeMetadataRepository = null; - try { - d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress()); - ClientConfig clientConfig = - getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true); - // Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store - // current version. - VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) - .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) - .build(); - nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig); - nativeMetadataRepository.start(); - // RequestBasedMetaRepository implementation should be used since - Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); - verifyRepository(nativeMetadataRepository, regularVeniceStoreName); - } finally { - if (d2Client != null) { - D2ClientUtils.shutdownClient(d2Client); - } - if (nativeMetadataRepository != null) { - // Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to - // initialize - // a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not - // leaked - // into other tests. - nativeMetadataRepository.clear(); - } - } - } - @Test(timeOut = 60 * Time.MS_PER_SECOND) public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException { String regularVeniceStoreName = Utils.getUniqueString("venice_store"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDaVinciRequestBasedMetaRepository.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDaVinciRequestBasedMetaRepository.java new file mode 100644 index 00000000000..d5d783da240 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDaVinciRequestBasedMetaRepository.java @@ -0,0 +1,269 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; +import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; +import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; +import static org.testng.Assert.assertNotNull; + +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.davinci.client.DaVinciClient; +import com.linkedin.davinci.client.DaVinciConfig; +import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; +import com.linkedin.venice.D2.D2ClientUtils; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.SchemaResponse; +import com.linkedin.venice.integration.utils.D2TestUtils; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceRouterWrapper; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; +import io.tehuti.Metric; +import io.tehuti.metrics.MetricsRepository; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestDaVinciRequestBasedMetaRepository { + private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; + + private static final String CLUSTER_NAME = "venice-cluster"; + private VeniceClusterWrapper clusterWrapper; + + private static final String storeNameStringToString = "store-name-string-to-string"; + private static final String storeNameStringToNameRecord = "store-name-string-to-name-record"; + + // StoreName -> ControllerClient + // Using map to check which stores are created + private final Map controllerClients = new HashMap<>(); + // StoreName -> Directory + private final Map pushJobAvroDataDirs = new HashMap<>(); + + private DaVinciConfig daVinciConfig; + private MetricsRepository dvcMetricsRepo; + private D2Client daVinciD2RemoteFabric; + private CachingDaVinciClientFactory daVinciClientFactory; + + @BeforeClass(alwaysRun = true) + public void setUp() throws IOException { + + VeniceClusterCreateOptions.Builder options = new VeniceClusterCreateOptions.Builder().clusterName(CLUSTER_NAME) + .numberOfRouters(1) + .numberOfServers(2) + .numberOfControllers(2) + .replicationFactor(2) + .forkServer(false); + clusterWrapper = ServiceFactory.getVeniceCluster(options.build()); + + // Create stores + runPushJob( // String to String + storeNameStringToString, + TestWriteUtils + .writeSimpleAvroFileWithStringToStringSchema(getPushJobAvroFileDirectory(storeNameStringToString))); + runPushJob( // String to Name Record + storeNameStringToNameRecord, + TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema( + getPushJobAvroFileDirectory(storeNameStringToNameRecord))); + + // Set up DVC Client Factory + VeniceProperties backendConfig = + new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) + .put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + daVinciConfig = new DaVinciConfig(); + daVinciConfig.setUseRequestBasedMetaRepository(true); + daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(clusterWrapper.getZk().getAddress()); + dvcMetricsRepo = new MetricsRepository(); + daVinciClientFactory = new CachingDaVinciClientFactory( + daVinciD2RemoteFabric, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + dvcMetricsRepo, + backendConfig); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + + // Shutdown remote fabric + D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); + + // Close client factory + daVinciClientFactory.close(); + + // Close controller clients + for (Map.Entry entry: controllerClients.entrySet()) { + entry.getValue().close(); + } + + // Close cluster wrapper + clusterWrapper.close(); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testDVCRequestBasedMetaRepositoryStringToString() + throws IOException, ExecutionException, InterruptedException { + + try (DaVinciClient storeClient = + daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToString, daVinciConfig)) { + storeClient.subscribeAll().get(); + + int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; + for (int i = 1; i <= recordCount; i++) { + Assert.assertEquals( + storeClient.get(Integer.toString(i)).get().toString(), + TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + Assert + .assertEquals(getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), (double) 1); + + // Run new push job + recordCount = 200; + runPushJob( + storeNameStringToString, + TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema( + getPushJobAvroFileDirectory(storeNameStringToString), + recordCount)); + + // Verify version swap + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> { + Assert.assertEquals( + getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), + (double) 2); + }); + + for (int i = 1; i <= recordCount; i++) { + Assert.assertEquals( + storeClient.get(Integer.toString(i)).get().toString(), + TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i); + } + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testDVCRequestBasedMetaRepositoryStringToNameRecord() throws ExecutionException, InterruptedException { + + try (DaVinciClient storeClient = + daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) { + storeClient.subscribeAll().get(); + + int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; + for (int i = 1; i <= recordCount; i++) { + // Verify storeClient can read + Assert.assertEquals( + storeClient.get(Integer.toString(i)).get().toString(), + TestWriteUtils.renderNameRecord(TestWriteUtils.STRING_TO_NAME_RECORD_V1_SCHEMA, i) + .get(DEFAULT_VALUE_FIELD_PROP) + .toString()); + } + + // Verify version + Assert.assertEquals( + getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord), + (double) 1); + } + } + + @Test(timeOut = TEST_TIMEOUT) + public void testDVCRequestBasedMetaRepositoryStringToNameRecordVersions() + throws IOException, ExecutionException, InterruptedException { + + try (DaVinciClient storeClient = + daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) { + storeClient.subscribeAll().get(); + + for (int i = 0; i < TestWriteUtils.countStringToNameRecordSchemas(); i++) { + Schema schema = TestWriteUtils.getStringToNameRecordSchema(i); + int currentValueVersion = i + 2; + + // Run new push job with new version + int recordCount = currentValueVersion * TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; + runPushJob( + storeNameStringToNameRecord, + TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordSchema( + getPushJobAvroFileDirectory(storeNameStringToNameRecord), + schema, + recordCount)); + + // Verify version swap + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> { + Assert.assertEquals( + getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord), + (double) (currentValueVersion)); + }); + + // Verify storeClient can read all + for (int j = 1; j <= recordCount; j++) { + Assert.assertEquals( + storeClient.get(Integer.toString(j)).get().toString(), + TestWriteUtils.renderNameRecord(schema, j).get(DEFAULT_VALUE_FIELD_PROP).toString()); + } + } + } + } + + private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { + Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName); + assertNotNull(metric, "Expected metric " + metricName + " not found."); + return metric.value(); + } + + private File getPushJobAvroFileDirectory(String storeName) { + if (!pushJobAvroDataDirs.containsKey(storeName)) { + pushJobAvroDataDirs.put(storeName, getTempDataDirectory()); + } + + return pushJobAvroDataDirs.get(storeName); + } + + private void runPushJob(String storeName, Schema schema) { + + ControllerClient controllerClient; + File dataDir = getPushJobAvroFileDirectory(storeName); + String dataDirPath = "file:" + dataDir.getAbsolutePath(); + + if (!controllerClients.containsKey(storeName)) { + // Init store + controllerClient = IntegrationTestPushUtils.createStoreForJob( + CLUSTER_NAME, + schema, + TestWriteUtils.defaultVPJProps( + clusterWrapper.getVeniceControllers().get(0).getControllerUrl(), + dataDirPath, + storeName)); + controllerClients.put(storeName, controllerClient); + } else { + controllerClient = controllerClients.get(storeName); + + // Add new schema + Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema(); + SchemaResponse schemaResponse = controllerClient.addValueSchema(storeName, valueSchema.toString()); + Assert.assertFalse(schemaResponse.isError(), schemaResponse.getError()); + } + + Properties props = + TestWriteUtils.defaultVPJProps(controllerClient.getLeaderControllerUrl(), dataDirPath, storeName); + TestWriteUtils.runPushJob(storeName + "_" + Utils.getUniqueString("push_job"), props); + } +} diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java index 95be1347541..4e4f30d2f2e 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java @@ -19,6 +19,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP; +import com.google.common.base.CaseFormat; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.RandomRecordGenerator; import com.linkedin.avroutil1.compatibility.RecordGenerationConfig; @@ -107,7 +108,6 @@ public class TestWriteUtils { AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV5.avsc")); public static final Schema NAME_RECORD_V6_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV6.avsc")); - public static final Schema NAME_RECORD_V7_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV7.avsc")); public static final Schema NAME_RECORD_V8_SCHEMA = @@ -116,7 +116,6 @@ public class TestWriteUtils { AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV9.avsc")); public static final Schema NAME_RECORD_V10_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV10.avsc")); - public static final Schema NAME_RECORD_V11_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV11.avsc")); @@ -147,13 +146,32 @@ public class TestWriteUtils { new PushInputSchemaBuilder().setKeySchema(INT_SCHEMA).setValueSchema(INT_SCHEMA).build(); public static final Schema STRING_TO_STRING_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(STRING_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V1_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V1_SCHEMA).build(); public static final Schema STRING_TO_NAME_RECORD_V2_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V2_SCHEMA).build(); - public static final Schema STRING_TO_NAME_RECORD_V3_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V3_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V5_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V5_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V6_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V6_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V7_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V7_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V8_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V8_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V9_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V9_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V10_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V10_SCHEMA).build(); + public static final Schema STRING_TO_NAME_RECORD_V11_SCHEMA = + new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V11_SCHEMA).build(); + private static final Schema[] STRING_TO_NAME_RECORD_SCHEMAS = + new Schema[] { STRING_TO_NAME_RECORD_V1_SCHEMA, STRING_TO_NAME_RECORD_V2_SCHEMA, STRING_TO_NAME_RECORD_V3_SCHEMA, + STRING_TO_NAME_RECORD_V5_SCHEMA, STRING_TO_NAME_RECORD_V6_SCHEMA, STRING_TO_NAME_RECORD_V7_SCHEMA, + STRING_TO_NAME_RECORD_V8_SCHEMA, STRING_TO_NAME_RECORD_V9_SCHEMA, STRING_TO_NAME_RECORD_V10_SCHEMA }; + public static final Schema STRING_TO_NAME_RECORD_V1_UPDATE_SCHEMA = new PushInputSchemaBuilder().setKeySchema(STRING_SCHEMA).setValueSchema(NAME_RECORD_V1_UPDATE_SCHEMA).build(); public static final Schema STRING_TO_STRING_WITH_EXTRA_FIELD_SCHEMA = @@ -168,6 +186,52 @@ public static File getTempDataDirectory() { return Utils.getTempDataDirectory(); } + public static GenericRecord renderNameRecord(Schema schema, int i) { + + // Key + GenericRecord keyValueRecord = new GenericData.Record(schema); + keyValueRecord.put(DEFAULT_KEY_FIELD_PROP, String.valueOf(i)); + + // Value + Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema(); + valueSchema.getFields().get(0).name(); + GenericRecord valueRecord = new GenericData.Record(schema.getField(DEFAULT_VALUE_FIELD_PROP).schema()); + for (Schema.Field field: valueSchema.getFields()) { + Object value = null; + switch (field.schema().getType()) { + case STRING: + // Camel case field name to snake case value + value = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.name()) + "_" + i; + break; + case INT: + case LONG: + value = i; + break; + case FLOAT: + case DOUBLE: + value = (double) i; + break; + case BOOLEAN: + value = true; + break; + default: + break; + } + valueRecord.put(field.name(), value); + } + keyValueRecord.put(DEFAULT_VALUE_FIELD_PROP, valueRecord); + + return keyValueRecord; + } + + public static int countStringToNameRecordSchemas() { + return STRING_TO_NAME_RECORD_SCHEMAS.length; + } + + public static Schema getStringToNameRecordSchema(int version) { + return STRING_TO_NAME_RECORD_SCHEMAS[version]; + } + public static Schema writeSimpleAvroFileWithStringToStringSchema(File parentDir) throws IOException { return writeSimpleAvroFileWithStringToStringSchema(parentDir, DEFAULT_USER_DATA_RECORD_COUNT); } @@ -361,31 +425,38 @@ public static Schema writeEmptyAvroFile(File parentDir, String fileName, Schema } public static Schema writeSimpleAvroFileWithStringToNameRecordV1Schema(File parentDir) throws IOException { - String firstName = "first_name_"; - String lastName = "last_name_"; - - return writeSimpleAvroFile(parentDir, STRING_TO_NAME_RECORD_V1_SCHEMA, i -> { - GenericRecord keyValueRecord = new GenericData.Record(STRING_TO_NAME_RECORD_V1_SCHEMA); - keyValueRecord.put(DEFAULT_KEY_FIELD_PROP, String.valueOf(i)); // Key - GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - valueRecord.put("firstName", firstName + i); - valueRecord.put("lastName", lastName + i); - keyValueRecord.put(DEFAULT_VALUE_FIELD_PROP, valueRecord); // Value - return keyValueRecord; - }); + return writeSimpleAvroFileWithStringToNameRecordV1Schema(parentDir, DEFAULT_USER_DATA_RECORD_COUNT); + } + + public static Schema writeSimpleAvroFileWithStringToNameRecordV1Schema(File parentDir, int recordCount) + throws IOException { + return writeSimpleAvroFileWithStringToNameRecordSchema(parentDir, STRING_TO_NAME_RECORD_V1_SCHEMA, recordCount); + } + + public static Schema writeSimpleAvroFileWithStringToNameRecordSchema(File parentDir, Schema schema, int recordCount) + throws IOException { + return writeSimpleAvroFile(parentDir, schema, i -> renderNameRecord(schema, i), recordCount); } public static Schema writeSimpleAvroFile( File parentDir, Schema schema, - Function recordProvider) throws IOException { + Function recordProvider, + int recordCount) throws IOException { return writeAvroFile(parentDir, "string2record.avro", schema, (recordSchema, writer) -> { - for (int i = 1; i <= DEFAULT_USER_DATA_RECORD_COUNT; ++i) { + for (int i = 1; i <= recordCount; ++i) { writer.append(recordProvider.apply(i)); } }); } + public static Schema writeSimpleAvroFile( + File parentDir, + Schema schema, + Function recordProvider) throws IOException { + return writeSimpleAvroFile(parentDir, schema, recordProvider, DEFAULT_USER_DATA_RECORD_COUNT); + } + public static Schema writeSimpleAvroFileWithStringToUserWithStringMapSchema(File parentDir, int itemsPerRecord) throws IOException { String valuePayloadBase = "1234567890";