diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java index 7d595198864..b1cdb7782a5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java @@ -1,8 +1,6 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; -import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import static java.lang.Thread.currentThread; import com.linkedin.davinci.stats.NativeMetadataRepositoryStats; @@ -25,17 +23,11 @@ import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.service.ICProvider; -import com.linkedin.venice.system.store.MetaStoreDataType; -import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; -import com.linkedin.venice.systemstore.schemas.StoreMetaKey; -import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository private final Map storeConfigMap = new VeniceConcurrentHashMap<>(); // Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed // for key schema evolvability. - private final Map schemaMap = new VeniceConcurrentHashMap<>(); + protected Map schemaMap = new VeniceConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final Set listeners = new CopyOnWriteArraySet<>(); private final AtomicLong totalStoreReadQuota = new AtomicLong(); @@ -125,11 +117,20 @@ public static NativeMetadataRepository getInstance( ClientConfig clientConfig, VeniceProperties backendConfig, ICProvider icProvider) { + + NativeMetadataRepository nativeMetadataRepository; + if (clientConfig.isUseRequestBasedMetaRepository()) { + nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig); + } else { + nativeMetadataRepository = new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + } + LOGGER.info( "Initializing {} with {}", NativeMetadataRepository.class.getSimpleName(), - ThinClientMetaStoreBasedRepository.class.getSimpleName()); - return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider); + nativeMetadataRepository.getClass().getSimpleName()); + + return nativeMetadataRepository; } @Override @@ -168,23 +169,19 @@ public boolean hasStore(String storeName) { return subscribedStoreMap.containsKey(storeName); } + // refreshOneStore will throw the VeniceNoStoreException when + // retrieving metadata for stores in "Deleting" state or "Missing". @Override public Store refreshOneStore(String storeName) { try { - getAndSetStoreConfigFromSystemStore(storeName); - StoreConfig storeConfig = storeConfigMap.get(storeName); + StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName); if (storeConfig == null) { throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName); } - Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster()); - // isDeleting check to detect deleted store is only supported by meta system store based implementation. - if (newStore != null && !storeConfig.isDeleting()) { - putStore(newStore); - getAndCacheSchemaDataFromSystemStore(storeName); - nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); - } else { - removeStore(storeName); - } + Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster()); + putStore(newStore); + getAndCacheSchemaData(storeName); + nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis()); return newStore; } catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) { throw new VeniceNoStoreException(storeName, e); @@ -393,74 +390,17 @@ public void clear() { * Get the store cluster config from system store and update the local cache with it. Different implementation will * get the data differently but should all populate the store cluster config map. */ - protected void getAndSetStoreConfigFromSystemStore(String storeName) { - storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName)); + protected StoreConfig cacheStoreConfigFromRemote(String storeName) { + StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName); + storeConfigMap.put(storeName, storeConfig); + return storeConfig; } - protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName); + protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName); - protected abstract Store getStoreFromSystemStore(String storeName, String clusterName); + protected abstract Store fetchStoreFromRemote(String storeName, String clusterName); - protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key); - - // Helper function with common code for retrieving StoreConfig from meta system store. - protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) { - StoreClusterConfig clusterConfig = getStoreMetaValue( - storeName, - MetaStoreDataType.STORE_CLUSTER_CONFIG - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; - return new StoreConfig(clusterConfig); - } - - // Helper function with common code for retrieving SchemaData from meta system store. - protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) { - SchemaData schemaData = schemaMap.get(storeName); - SchemaEntry keySchema; - if (schemaData == null) { - // Retrieve the key schema and initialize SchemaData only if it's not cached yet. - StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map keySchemaMap = - getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; - if (keySchemaMap.isEmpty()) { - throw new VeniceException("No key schema found for store: " + storeName); - } - Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); - keySchema = - new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); - schemaData = new SchemaData(storeName, keySchema); - } - StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS - .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); - Map valueSchemaMap = - getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; - // Check the value schema string, if it's empty then try to query the other key space for individual value schema. - for (Map.Entry entry: valueSchemaMap.entrySet()) { - // Check if we already have the corresponding value schema - int valueSchemaId = Integer.parseInt(entry.getKey().toString()); - if (schemaData.getValueSchema(valueSchemaId) != null) { - continue; - } - if (entry.getValue().toString().isEmpty()) { - // The value schemas might be too large to be stored in a single K/V. - StoreMetaKey individualValueSchemaKey = - MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { - { - put(KEY_STRING_STORE_NAME, storeName); - put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); - } - }); - // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in - // the individual value schema key space. - String valueSchema = - getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); - } else { - schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); - } - } - return schemaData; - } + protected abstract SchemaData getSchemaData(String storeName); protected Store putStore(Store newStore) { // Workaround to make old metadata compatible with new fields @@ -516,11 +456,11 @@ protected void notifyStoreChanged(Store store) { } } - protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { + protected SchemaData getAndCacheSchemaData(String storeName) { if (!hasStore(storeName)) { throw new VeniceNoStoreException(storeName); } - SchemaData schemaData = getSchemaDataFromSystemStore(storeName); + SchemaData schemaData = getSchemaData(storeName); schemaMap.put(storeName, schemaData); return schemaData; } @@ -532,7 +472,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) { private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException { SchemaData schemaData = schemaMap.get(storeName); if (schemaData == null) { - schemaData = getAndCacheSchemaDataFromSystemStore(storeName); + schemaData = getAndCacheSchemaData(storeName); } return schemaData; } @@ -545,8 +485,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) { return schemaData.getValueSchema(id); } - protected abstract SchemaData getSchemaDataFromSystemStore(String storeName); - /** * This function is used to remove schema entry for the given store from local cache, * and related listeners as well. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java new file mode 100644 index 00000000000..6a7bb50900c --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/RequestBasedMetaRepository.java @@ -0,0 +1,153 @@ +package com.linkedin.davinci.repository; + +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.D2ServiceDiscovery; +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.meta.QueryAction; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; +import com.linkedin.venice.systemstore.schemas.StoreProperties; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import org.apache.avro.Schema; + + +public class RequestBasedMetaRepository extends NativeMetadataRepository { + + // cluster -> client + private final Map d2TransportClientMap = new VeniceConcurrentHashMap<>(); + + // storeName -> T + protected Map storeSchemaMap = new VeniceConcurrentHashMap<>(); + + private final D2TransportClient d2DiscoveryTransportClient; + private D2ServiceDiscovery d2ServiceDiscovery; + + public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) { + super(clientConfig, backendConfig); + this.d2ServiceDiscovery = new D2ServiceDiscovery(); + this.d2DiscoveryTransportClient = + new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client()); + } + + @Override + public void clear() { + super.clear(); + + // Clear cache + d2TransportClientMap.clear(); + storeSchemaMap.clear(); + } + + @Override + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { + // Create StoreConfig from D2 + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + + StoreClusterConfig storeClusterConfig = new StoreClusterConfig(); + storeClusterConfig.setStoreName(storeName); + storeClusterConfig.setCluster(d2TransportClient.getServiceName()); + + return new StoreConfig(storeClusterConfig); + } + + @Override + protected Store fetchStoreFromRemote(String storeName, String clusterName) { + // Fetch store, bypass cache + StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName); + StoreProperties storeProperties = record.storeMetaValue.storeProperties; + return new ZKStore(storeProperties); + } + + @Override + protected SchemaData getSchemaData(String storeName) { + if (!storeSchemaMap.containsKey(storeName)) { + // Cache miss + fetchAndCacheStorePropertiesResponseRecord(storeName); + } + return storeSchemaMap.get(storeName); + } + + protected StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) { + + // Request + int maxValueSchemaId = getMaxValueSchemaId(storeName); + D2TransportClient d2TransportClient = getD2TransportClient(storeName); + String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName; + if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) { + requestBasedStorePropertiesURL += "/" + maxValueSchemaId; + } + + TransportClientResponse response; + try { + response = d2TransportClient.get(requestBasedStorePropertiesURL).get(); + } catch (Exception e) { + throw new RuntimeException( + "Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL + + ": " + e); + } + + // Deserialize + Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$; + RecordDeserializer recordDeserializer = FastSerializerDeserializerFactory + .getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class); + StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody()); + + // Cache + cacheStoreSchema(storeName, record); + + return record; + } + + D2TransportClient getD2TransportClient(String storeName) { + synchronized (this) { + // Get cluster for store + String serverD2ServiceName = + d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service(); + if (d2TransportClientMap.containsKey(serverD2ServiceName)) { + return d2TransportClientMap.get(serverD2ServiceName); + } + D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client()); + d2TransportClientMap.put(serverD2ServiceName, d2TransportClient); + return d2TransportClient; + } + } + + protected int getMaxValueSchemaId(String storeName) { + if (!schemaMap.containsKey(storeName)) { + return SchemaData.UNKNOWN_SCHEMA_ID; + } + return schemaMap.get(storeName).getMaxValueSchemaId(); + } + + protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) { + + if (!storeSchemaMap.containsKey(storeName)) { + // New schema data + Map.Entry keySchemaEntry = + record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next(); + SchemaData schemaData = new SchemaData( + storeName, + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString())); + storeSchemaMap.put(storeName, schemaData); + } + + // Store Value Schemas + for (Map.Entry entry: record.getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .entrySet()) { + storeSchemaMap.get(storeName) + .addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString())); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java index 637ba1682ed..fa8ba11357c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.repository; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID; import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; import com.linkedin.venice.client.exceptions.ServiceDiscoveryException; @@ -9,13 +10,16 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.MissingKeyInStoreMetadataException; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceRetriableException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.system.store.MetaStoreDataType; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.systemstore.schemas.StoreProperties; @@ -63,12 +67,16 @@ public void subscribe(String storeName) throws InterruptedException { } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { - return getStoreConfigFromMetaSystemStore(storeName); + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { + StoreClusterConfig clusterConfig = getStoreMetaValue( + storeName, + MetaStoreDataType.STORE_CLUSTER_CONFIG + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig; + return new StoreConfig(clusterConfig); } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { StoreProperties storeProperties = getStoreMetaValue(storeName, MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap() { { @@ -79,12 +87,6 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { return new ZKStore(storeProperties); } - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); - } - - @Override protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { final Callable> supplier = () -> getAvroClientForMetaStore(storeName).get(key); Callable> wrappedSupplier = @@ -107,6 +109,56 @@ protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { return value; } + @Override + protected SchemaData getSchemaData(String storeName) { + SchemaData schemaData = schemaMap.get(storeName); + SchemaEntry keySchema; + if (schemaData == null) { + // Retrieve the key schema and initialize SchemaData only if it's not cached yet. + StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map keySchemaMap = + getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap; + if (keySchemaMap.isEmpty()) { + throw new VeniceException("No key schema found for store: " + storeName); + } + Map.Entry keySchemaEntry = keySchemaMap.entrySet().iterator().next(); + keySchema = + new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()); + schemaData = new SchemaData(storeName, keySchema); + } + StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName)); + Map valueSchemaMap = + getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap; + // Check the value schema string, if it's empty then try to query the other key space for individual value schema. + for (Map.Entry entry: valueSchemaMap.entrySet()) { + // Check if we already have the corresponding value schema + int valueSchemaId = Integer.parseInt(entry.getKey().toString()); + if (schemaData.getValueSchema(valueSchemaId) != null) { + continue; + } + if (entry.getValue().toString().isEmpty()) { + // The value schemas might be too large to be stored in a single K/V. + StoreMetaKey individualValueSchemaKey = + MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, storeName); + put(KEY_STRING_SCHEMA_ID, entry.getKey().toString()); + } + }); + // Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in + // the individual value schema key space. + String valueSchema = + getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString(); + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema)); + } else { + schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString())); + } + } + return schemaData; + } + private AvroSpecificStoreClient getAvroClientForMetaStore(String storeName) { return storeClientMap.computeIfAbsent(storeName, k -> { ClientConfig clonedClientConfig = ClientConfig.cloneConfig(clientConfig) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java index 8477945c160..333c778fdfe 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/NativeMetadataRepositoryTest.java @@ -8,25 +8,16 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.client.store.ClientConfig; -import com.linkedin.venice.client.store.schemas.TestKeyRecord; -import com.linkedin.venice.client.store.schemas.TestValueRecord; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.schema.SchemaData; -import com.linkedin.venice.system.store.MetaStoreDataType; -import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; -import com.linkedin.venice.systemstore.schemas.StoreMetaKey; -import com.linkedin.venice.systemstore.schemas.StoreMetaValue; -import com.linkedin.venice.systemstore.schemas.StoreValueSchema; -import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import java.time.Clock; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -34,7 +25,8 @@ public class NativeMetadataRepositoryTest { - private ClientConfig clientConfig; + private ClientConfig clientConfigThinClient; + private ClientConfig clientConfigRequestBased; private VeniceProperties backendConfig; private MetricsRepository metricsRepository; private Clock clock; @@ -42,19 +34,22 @@ public class NativeMetadataRepositoryTest { @BeforeMethod public void setUpMocks() { - clientConfig = mock(ClientConfig.class); + clientConfigThinClient = mock(ClientConfig.class); + clientConfigRequestBased = mock(ClientConfig.class); backendConfig = mock(VeniceProperties.class); doReturn(1L).when(backendConfig).getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); metricsRepository = new MetricsRepository(); - doReturn(metricsRepository).when(clientConfig).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigThinClient).getMetricsRepository(); + doReturn(metricsRepository).when(clientConfigRequestBased).getMetricsRepository(); + doReturn(true).when(clientConfigRequestBased).isUseRequestBasedMetaRepository(); clock = mock(Clock.class); doReturn(0L).when(clock).millis(); } @Test - public void testGetInstance() { + public void testGetThinClientInstance() { NativeMetadataRepository nativeMetadataRepository = - NativeMetadataRepository.getInstance(clientConfig, backendConfig); + NativeMetadataRepository.getInstance(clientConfigThinClient, backendConfig); Assert.assertTrue(nativeMetadataRepository instanceof ThinClientMetaStoreBasedRepository); Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); @@ -64,9 +59,22 @@ public void testGetInstance() { Assert.assertThrows(() -> nativeMetadataRepository.start()); } + @Test + public void testGetRequestBasedInstance() { + NativeMetadataRepository nativeMetadataRepository = + NativeMetadataRepository.getInstance(clientConfigRequestBased, backendConfig); + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + nativeMetadataRepository.start(); + nativeMetadataRepository.clear(); + Assert.assertThrows(() -> nativeMetadataRepository.subscribe(STORE_NAME)); + Assert.assertThrows(() -> nativeMetadataRepository.start()); + } + @Test public void testGetSchemaDataFromReadThroughCache() throws InterruptedException { - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertThrows(VeniceNoStoreException.class, () -> nmr.getKeySchema(STORE_NAME)); nmr.subscribe(STORE_NAME); @@ -77,7 +85,7 @@ public void testGetSchemaDataFromReadThroughCache() throws InterruptedException public void testGetSchemaDataEfficiently() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); Assert.assertEquals(nmr.keySchemaRequestCount, 0); Assert.assertEquals(nmr.valueSchemasRequestCount, 0); @@ -95,13 +103,12 @@ public void testGetSchemaDataEfficiently() throws InterruptedException { Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1); Assert.assertNotNull(nmr.getKeySchema(STORE_NAME)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1)); - // Refresh the store a few more times to retrieve value schema v2 for (int i = 0; i < 10; i++) { nmr.refreshOneStore(STORE_NAME); } Assert.assertEquals(nmr.keySchemaRequestCount, 1); Assert.assertEquals(nmr.valueSchemasRequestCount, 12); - Assert.assertEquals(nmr.specificValueSchemaRequestCount, 2); + Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1); Assert.assertNotNull(nmr.getKeySchema(STORE_NAME)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1)); Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 2)); @@ -115,7 +122,7 @@ public void testGetSchemaDataEfficiently() throws InterruptedException { public void testNativeMetadataRepositoryStats() throws InterruptedException { doReturn(Long.MAX_VALUE).when(backendConfig) .getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong()); - TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock); + TestNMR nmr = new TestNMR(clientConfigThinClient, backendConfig, clock); nmr.start(); nmr.subscribe(STORE_NAME); doReturn(1000L).when(clock).millis(); @@ -154,19 +161,27 @@ static class TestNMR extends NativeMetadataRepository { int valueSchemasRequestCount = 0; int specificValueSchemaRequestCount = 0; + private static final String INT_KEY_SCHEMA = "\"int\""; + + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig, Clock clock) { super(clientConfig, backendConfig, clock); } @Override - protected StoreConfig getStoreConfigFromSystemStore(String storeName) { + protected StoreConfig fetchStoreConfigFromRemote(String storeName) { StoreConfig storeConfig = mock(StoreConfig.class); when(storeConfig.isDeleting()).thenReturn(false); return storeConfig; } @Override - protected Store getStoreFromSystemStore(String storeName, String clusterName) { + protected Store fetchStoreFromRemote(String storeName, String clusterName) { Store store = mock(Store.class); when(store.getName()).thenReturn(storeName); when(store.getReadQuotaInCU()).thenReturn(1L); @@ -174,38 +189,25 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) { } @Override - protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) { - StoreMetaValue storeMetaValue = new StoreMetaValue(); - MetaStoreDataType metaStoreDataType = MetaStoreDataType.valueOf(key.metadataType); - switch (metaStoreDataType) { - case STORE_KEY_SCHEMAS: - Map keySchemaMap = new HashMap<>(); - keySchemaMap.put(String.valueOf(1), TestKeyRecord.SCHEMA$.toString()); - storeMetaValue.storeKeySchemas = new StoreKeySchemas(keySchemaMap); - keySchemaRequestCount++; - break; - case STORE_VALUE_SCHEMAS: - Map valueSchemaMap = new HashMap<>(); - valueSchemaMap.put(String.valueOf(1), ""); - if (valueSchemasRequestCount > 1) { - valueSchemaMap.put(String.valueOf(2), ""); - } - storeMetaValue.storeValueSchemas = new StoreValueSchemas(valueSchemaMap); - valueSchemasRequestCount++; - break; - case STORE_VALUE_SCHEMA: - storeMetaValue.storeValueSchema = new StoreValueSchema(TestValueRecord.SCHEMA$.toString()); - specificValueSchemaRequestCount++; - break; - default: - // do nothing + protected SchemaData getSchemaData(String storeName) { + if (schemaMap.containsKey(storeName)) { + valueSchemasRequestCount++; + return schemaMap.get(storeName); } - return storeMetaValue; - } - @Override - protected SchemaData getSchemaDataFromSystemStore(String storeName) { - return getSchemaDataFromMetaSystemStore(storeName); + // Mock schemas for testing + SchemaEntry schemaEntry = new SchemaEntry(0, INT_KEY_SCHEMA); + SchemaData schemaData = new SchemaData(storeName, schemaEntry); + schemaData.addValueSchema(new SchemaEntry(1, VALUE_SCHEMA_1)); + schemaData.addValueSchema(new SchemaEntry(2, VALUE_SCHEMA_2)); + + // Mock metrics + keySchemaRequestCount++; + valueSchemasRequestCount++; + specificValueSchemaRequestCount++; + + schemaMap.put(storeName, schemaData); + return schemaData; } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java new file mode 100644 index 00000000000..cff5a02d54f --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/RequestBasedMetaRepositoryTest.java @@ -0,0 +1,235 @@ +package com.linkedin.davinci.repository; + +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.client.store.transport.D2TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.meta.QueryAction; +import com.linkedin.venice.meta.ReadOnlyStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.serializer.FastAvroSerializer; +import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class RequestBasedMetaRepositoryTest { + private static final Logger LOGGER = LogManager.getLogger(RequestBasedMetaRepositoryTest.class); + private Random RANDOM; + + private Store store; + private static final String D2_SERVICE_NAME = "D2_SERVICE_NAME"; + private StorePropertiesResponseRecord MOCK_STORE_PROPERTIES_RESPONSE_RECORD; + + // Mock schemas + private static final String INT_KEY_SCHEMA = "\"int\""; + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + + @BeforeClass + public void beforeClassSetup() { + long seed = System.nanoTime(); + RANDOM = new Random(seed); + LOGGER.info("Random seed set: {}", seed); + } + + @BeforeMethod + public void beforeMethodSetup() { + + // Store + setupTestStore(); + + // StorePropertiesResponseRecord + setupTestStorePropertiesResponse(); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreConfigFromRemote() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + + // Test FetchStoreConfigFromRemote + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), D2_SERVICE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertNull(storeConfig.getMigrationDestCluster()); + Assert.assertNull(storeConfig.getMigrationSrcCluster()); + } + + @Test + public void testRequestBasedMetaRepositoryFetchStoreFromRemote() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + + // Test FetchStoreConfigFromRemote + StoreConfig storeConfig = requestBasedMetaRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getCluster(), D2_SERVICE_NAME); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertEquals(storeConfig.getMigrationDestCluster(), null); + Assert.assertEquals(storeConfig.getMigrationSrcCluster(), null); + } + + @Test + public void testRequestBasedMetaRepositoryFetchAndCacheStorePropertiesResponseRecord() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.fetchAndCacheStorePropertiesResponseRecord(store.getName())).thenCallRealMethod(); + + // Test FetchAndCacheStorePropertiesResponseRecord + StorePropertiesResponseRecord record = + requestBasedMetaRepository.fetchAndCacheStorePropertiesResponseRecord(store.getName()); + Assert.assertNotNull(record); + Assert.assertNotNull(record.getStoreMetaValue()); + Assert.assertNotNull(record.getStoreMetaValue().getStoreProperties()); + Assert.assertEquals(record.getStoreMetaValue().getStoreProperties().getName().toString(), store.getName()); + Assert.assertEquals(record.getStoreMetaValue().getStoreProperties().getOwner().toString(), store.getOwner()); + Assert + .assertEquals(record.getStoreMetaValue().getStoreProperties().getVersions().size(), store.getVersions().size()); + Assert.assertEquals(record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().size(), 1); + Assert.assertEquals(record.getStoreMetaValue().getStoreValueSchemas().getValueSchemaMap().size(), 2); + } + + @Test + public void testRequestBasedMetaRepositoryGetMaxValueSchemaId() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + when(requestBasedMetaRepository.getMaxValueSchemaId(store.getName())).thenCallRealMethod(); + + // Test GetMaxValueSchemaId + int schemaId = requestBasedMetaRepository.getMaxValueSchemaId(store.getName()); + Assert.assertEquals(schemaId, SchemaData.UNKNOWN_SCHEMA_ID); + + // Put schema + SchemaEntry schemaEntryKey = new SchemaEntry(0, INT_KEY_SCHEMA); + SchemaEntry schemaEntryValue1 = new SchemaEntry(1, VALUE_SCHEMA_1); + SchemaEntry schemaEntryValue2 = new SchemaEntry(2, VALUE_SCHEMA_2); + SchemaData schemaData = new SchemaData(store.getName(), schemaEntryKey); + schemaData.addValueSchema(schemaEntryValue1); + schemaData.addValueSchema(schemaEntryValue2); + requestBasedMetaRepository.schemaMap.put(store.getName(), schemaData); + + // Test GetMaxValueSchemaId + schemaId = requestBasedMetaRepository.getMaxValueSchemaId(store.getName()); + Assert.assertEquals(schemaId, 2); + } + + @Test + public void testRequestBasedMetaRepositoryCacheStoreSchema() { + + // Mock RequestBasedMetaRepository + RequestBasedMetaRepository requestBasedMetaRepository = getMockRequestBasedMetaRepository(); + doCallRealMethod().when(requestBasedMetaRepository) + .cacheStoreSchema(store.getName(), MOCK_STORE_PROPERTIES_RESPONSE_RECORD); + + requestBasedMetaRepository.cacheStoreSchema(store.getName(), MOCK_STORE_PROPERTIES_RESPONSE_RECORD); + SchemaData schemaData = requestBasedMetaRepository.storeSchemaMap.get(store.getName()); + Assert.assertNotNull(schemaData); + Assert.assertEquals(schemaData.getKeySchema().getSchemaStr(), INT_KEY_SCHEMA); + Assert.assertEquals(schemaData.getValueSchema(1).getSchemaStr(), VALUE_SCHEMA_1); + Assert.assertEquals(schemaData.getValueSchema(2).getSchemaStr(), VALUE_SCHEMA_2); + } + + private RequestBasedMetaRepository getMockRequestBasedMetaRepository() { + RequestBasedMetaRepository requestBasedMetaRepository = mock(RequestBasedMetaRepository.class); + + // Schema Map + requestBasedMetaRepository.storeSchemaMap = new VeniceConcurrentHashMap<>(); + requestBasedMetaRepository.schemaMap = new VeniceConcurrentHashMap<>(); + + // Mock D2TransportClient + try { + D2TransportClient d2TransportClient = getMockD2TransportClient(); + when(requestBasedMetaRepository.getD2TransportClient(store.getName())).thenReturn(d2TransportClient); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Mock max value schema id + when(requestBasedMetaRepository.getMaxValueSchemaId(store.getName())).thenReturn(SchemaData.UNKNOWN_SCHEMA_ID); + + return requestBasedMetaRepository; + } + + private D2TransportClient getMockD2TransportClient() + throws InterruptedException, java.util.concurrent.ExecutionException { + + // Mock D2 + D2TransportClient d2TransportClient = mock(D2TransportClient.class); + when(d2TransportClient.getServiceName()).thenReturn(D2_SERVICE_NAME); + + // Mock request + String mockURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + store.getName(); + TransportClientResponse mockResponse = mock(TransportClientResponse.class); + CompletableFuture completableFuture = mock(CompletableFuture.class); + RecordSerializer recordSerializer = + new FastAvroSerializer<>(StorePropertiesResponseRecord.SCHEMA$, null); + when(completableFuture.get()).thenReturn(mockResponse); + when(mockResponse.getBody()).thenReturn(recordSerializer.serialize(MOCK_STORE_PROPERTIES_RESPONSE_RECORD)); + when(d2TransportClient.get(mockURL)).thenReturn(completableFuture); + + return d2TransportClient; + } + + private void setupTestStore() { + store = TestUtils.populateZKStore( + (ZKStore) TestUtils.createTestStore( + Long.toString(RANDOM.nextLong()), + Long.toString(RANDOM.nextLong()), + System.currentTimeMillis()), + RANDOM); + } + + private void setupTestStorePropertiesResponse() { + StorePropertiesResponseRecord record = new StorePropertiesResponseRecord(); + + // StoreMetaValue + StoreMetaValue storeMetaValue = new StoreMetaValue(); + storeMetaValue.setStoreProperties(new ReadOnlyStore(store).cloneStoreProperties()); + + // Key Schema + Map storeKeySchemas = new VeniceConcurrentHashMap<>(); + storeKeySchemas.put("0", INT_KEY_SCHEMA); + storeMetaValue.setStoreKeySchemas(new StoreKeySchemas(storeKeySchemas)); + + // Value Schemas + Map storeValueSchemas = new VeniceConcurrentHashMap<>(); + storeValueSchemas.put("1", VALUE_SCHEMA_1); + storeValueSchemas.put("2", VALUE_SCHEMA_2); + + storeMetaValue.setStoreValueSchemas(new StoreValueSchemas(storeValueSchemas)); + + record.setStoreMetaValue(storeMetaValue); + + MOCK_STORE_PROPERTIES_RESPONSE_RECORD = record; + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java new file mode 100644 index 00000000000..caccf895f54 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepositoryTest.java @@ -0,0 +1,155 @@ +package com.linkedin.davinci.repository; + +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME; +import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.meta.ReadOnlyStore; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreConfig; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.system.store.MetaStoreDataType; +import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; +import com.linkedin.venice.systemstore.schemas.StoreKeySchemas; +import com.linkedin.venice.systemstore.schemas.StoreMetaKey; +import com.linkedin.venice.systemstore.schemas.StoreMetaValue; +import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class ThinClientMetaStoreBasedRepositoryTest { + private static final Logger LOGGER = LogManager.getLogger(ThinClientMetaStoreBasedRepositoryTest.class); + private Random RANDOM; + + private Store store; + private final String CLUSTER_NAME = "CLUSTER_NAME"; + + // Mock schemas + private static final String INT_KEY_SCHEMA = "\"int\""; + private static final String VALUE_SCHEMA_1 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"}\n" + " ]\n" + "}"; + private static final String VALUE_SCHEMA_2 = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestValue\",\n" + + " \"fields\": [\n" + " {\"name\": \"test_field1\", \"type\": \"string\"},\n" + + " {\"name\": \"test_field2\", \"type\": \"int\", \"default\": 0}\n" + " ]\n" + "}"; + + @BeforeClass + public void setupRandom() { + long seed = System.nanoTime(); + RANDOM = new Random(seed); + LOGGER.info("Random seed set: {}", seed); + } + + @BeforeMethod + public void beforeMethodSetup() { + + // Store + setUpTestStore(); + } + + @Test + public void testFetchStoreConfigFromRemote() { + + // Mock ThinClientMetaStoreBasedRepository + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = getMockThinClientMetaStoreBasedRepository(); + + // Test FetchStoreConfigFromRemote + when(thinClientMetaStoreBasedRepository.fetchStoreConfigFromRemote(store.getName())).thenCallRealMethod(); + StoreConfig storeConfig = thinClientMetaStoreBasedRepository.fetchStoreConfigFromRemote(store.getName()); + Assert.assertNotNull(storeConfig); + Assert.assertEquals(storeConfig.getStoreName(), store.getName()); + Assert.assertEquals(storeConfig.getCluster(), CLUSTER_NAME); + } + + @Test + public void testGetSchemaData() { + + // Mock ThinClientMetaStoreBasedRepository + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = getMockThinClientMetaStoreBasedRepository(); + + // Test GetSchemaData + when(thinClientMetaStoreBasedRepository.getSchemaData(store.getName())).thenCallRealMethod(); + SchemaData schemaData = thinClientMetaStoreBasedRepository.getSchemaData(store.getName()); + Assert.assertNotNull(schemaData); + Assert.assertEquals(schemaData.getStoreName(), store.getName()); + Assert.assertEquals(schemaData.getKeySchema().getSchemaStr(), INT_KEY_SCHEMA); + Assert.assertEquals(schemaData.getValueSchema(1).getSchemaStr(), VALUE_SCHEMA_1); + Assert.assertEquals(schemaData.getValueSchema(2).getSchemaStr(), VALUE_SCHEMA_2); + } + + private ThinClientMetaStoreBasedRepository getMockThinClientMetaStoreBasedRepository() { + ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository = + mock(ThinClientMetaStoreBasedRepository.class); + + // schemaMap + thinClientMetaStoreBasedRepository.schemaMap = new VeniceConcurrentHashMap<>(); + + // StoreMetaValue + StoreMetaValue storeMetaValue = new StoreMetaValue(); + storeMetaValue.setStoreProperties(new ReadOnlyStore(store).cloneStoreProperties()); + + // Key Schema + Map storeKeySchemas = new VeniceConcurrentHashMap<>(); + storeKeySchemas.put("0", INT_KEY_SCHEMA); + storeMetaValue.setStoreKeySchemas(new StoreKeySchemas(storeKeySchemas)); + + // Value Schemas + Map storeValueSchemas = new VeniceConcurrentHashMap<>(); + storeValueSchemas.put("1", VALUE_SCHEMA_1); + storeValueSchemas.put("2", VALUE_SCHEMA_2); + + storeMetaValue.setStoreValueSchemas(new StoreValueSchemas(storeValueSchemas)); + + // Store Cluster Config + StoreClusterConfig storeClusterConfig = new StoreClusterConfig(); + storeClusterConfig.setStoreName(store.getName()); + storeClusterConfig.setCluster(CLUSTER_NAME); + storeMetaValue.setStoreClusterConfig(storeClusterConfig); + + // Mock getStoreMetaValue + StoreMetaKey storeMetaKey; + storeMetaKey = MetaStoreDataType.STORE_KEY_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_PROPERTIES.getStoreMetaKey(new HashMap() { + { + put(KEY_STRING_STORE_NAME, store.getName()); + put(KEY_STRING_CLUSTER_NAME, CLUSTER_NAME); + } + }); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + storeMetaKey = MetaStoreDataType.STORE_CLUSTER_CONFIG + .getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, store.getName())); + when(thinClientMetaStoreBasedRepository.getStoreMetaValue(store.getName(), storeMetaKey)) + .thenReturn(storeMetaValue); + + return thinClientMetaStoreBasedRepository; + } + + private void setUpTestStore() { + store = TestUtils.populateZKStore( + (ZKStore) TestUtils.createTestStore( + Long.toString(RANDOM.nextLong()), + Long.toString(RANDOM.nextLong()), + System.currentTimeMillis()), + RANDOM); + } +} diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java index fe562fbc547..383e53e8fe8 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/ClientConfig.java @@ -65,6 +65,9 @@ public class ClientConfig { // HttpTransport settings private int maxConnectionsPerRoute; // only for HTTP1 + // NativeMetadataRepository settings + private boolean useRequestBasedMetaRepository = false; + private int maxConnectionsTotal; // only for HTTP1 private boolean httpClient5Http2Enabled; @@ -102,7 +105,7 @@ public static ClientConfig cloneConfig(ClientConfi .setD2ZkTimeout(config.getD2ZkTimeout()) .setD2Client(config.getD2Client()) .setD2Routing(config.isD2Routing()) // This should be the last of the D2 configs since it is an inferred config - // and we want the cloned config to match the source config + // and we want the cloned config to match the source config // Performance-related settings .setMetricsRepository(config.getMetricsRepository()) @@ -279,6 +282,15 @@ public ClientConfig setMaxConnectionsPerRoute(int maxConnectionsPerRoute) { return this; } + public boolean isUseRequestBasedMetaRepository() { + return useRequestBasedMetaRepository; + } + + public ClientConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) { + this.useRequestBasedMetaRepository = useRequestBasedMetaRepository; + return this; + } + public int getMaxConnectionsTotal() { return maxConnectionsTotal; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 304eeaabae4..7d2fc390685 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -924,7 +924,7 @@ public StoreProperties cloneStoreProperties() { storeProperties.setBootstrapToOnlineTimeoutInHours(getBootstrapToOnlineTimeoutInHours()); // storeProperties.setLeaderFollowerModelEnabled(isLeaderFollowerModelEnabled()); storeProperties.setNativeReplicationEnabled(isNativeReplicationEnabled()); - // storeProperties.setReplicationMetadataVersionID(getReplicationMetadataVersionID()); + storeProperties.setReplicationMetadataVersionID(getRmdVersion()); storeProperties.setPushStreamSourceAddress(getPushStreamSourceAddress()); storeProperties.setBackupStrategy(getBackupStrategy().getValue()); storeProperties.setSchemaAutoRegisteFromPushJobEnabled(isSchemaAutoRegisterFromPushJobEnabled()); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java index 8ca379a50e8..34719c11448 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/ReadOnlyStoreTest.java @@ -2,7 +2,6 @@ import static org.testng.Assert.*; -import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.systemstore.schemas.StoreETLConfig; import com.linkedin.venice.systemstore.schemas.StoreHybridConfig; import com.linkedin.venice.systemstore.schemas.StorePartitionerConfig; @@ -33,11 +32,12 @@ public void setupReadOnlyStore() { @Test public void testCloneStoreProperties() { - ZKStore store = populateZKStore( + ZKStore store = TestUtils.populateZKStore( (ZKStore) TestUtils.createTestStore( Long.toString(RANDOM.nextLong()), Long.toString(RANDOM.nextLong()), - System.currentTimeMillis())); + System.currentTimeMillis()), + RANDOM); ReadOnlyStore readOnlyStore = new ReadOnlyStore(store); StoreProperties storeProperties = readOnlyStore.cloneStoreProperties(); @@ -106,58 +106,6 @@ public void testCloneStoreProperties() { assertEquals(storeProperties.getNearlineProducerCountPerWriter(), store.getNearlineProducerCountPerWriter()); } - private ZKStore populateZKStore(ZKStore store) { - store.setCurrentVersion(RANDOM.nextInt()); - store.setPartitionCount(RANDOM.nextInt()); - store.setLowWatermark(RANDOM.nextLong()); - store.setEnableWrites(false); - store.setEnableReads(true); - store.setStorageQuotaInByte(RANDOM.nextLong()); - store.setReadQuotaInCU(RANDOM.nextLong()); - store.setHybridStoreConfig(TestUtils.createTestHybridStoreConfig(RANDOM)); - store.setViewConfigs(TestUtils.createTestViewConfigs(RANDOM)); - store.setCompressionStrategy(CompressionStrategy.GZIP); - store.setClientDecompressionEnabled(true); - store.setChunkingEnabled(true); - store.setRmdChunkingEnabled(true); - store.setBatchGetLimit(RANDOM.nextInt()); - store.setNumVersionsToPreserve(RANDOM.nextInt()); - store.setIncrementalPushEnabled(true); - store.setSeparateRealTimeTopicEnabled(true); - store.setMigrating(true); - store.setWriteComputationEnabled(true); - store.setReadComputationEnabled(true); - store.setBootstrapToOnlineTimeoutInHours(RANDOM.nextInt()); - store.setNativeReplicationEnabled(true); - store.setPushStreamSourceAddress("push_stream_source"); - store.setBackupStrategy(BackupStrategy.DELETE_ON_NEW_PUSH_START); - store.setSchemaAutoRegisterFromPushJobEnabled(true); - store.setLatestSuperSetValueSchemaId(RANDOM.nextInt()); - store.setHybridStoreDiskQuotaEnabled(true); - store.setStoreMetaSystemStoreEnabled(true); - store.setEtlStoreConfig(TestUtils.createTestETLStoreConfig()); - store.setPartitionerConfig(TestUtils.createTestPartitionerConfig(RANDOM)); - store.setLatestVersionPromoteToCurrentTimestamp(RANDOM.nextLong()); - store.setBackupVersionRetentionMs(RANDOM.nextLong()); - store.setMigrationDuplicateStore(true); - store.setNativeReplicationSourceFabric("native_replication_source_fabric"); - store.setDaVinciPushStatusStoreEnabled(true); - store.setStoreMetadataSystemStoreEnabled(true); - store.setActiveActiveReplicationEnabled(true); - store.setMinCompactionLagSeconds(RANDOM.nextLong()); - store.setMaxCompactionLagSeconds(RANDOM.nextLong()); - store.setMaxRecordSizeBytes(RANDOM.nextInt()); - store.setMaxNearlineRecordSizeBytes(RANDOM.nextInt()); - store.setUnusedSchemaDeletionEnabled(true); - store.setVersions(TestUtils.createTestVersions(store.getName(), RANDOM)); - store.setSystemStores(TestUtils.createTestSystemStores(store.getName(), RANDOM)); - store.setStorageNodeReadQuotaEnabled(true); - store.setBlobTransferEnabled(true); - store.setNearlineProducerCompressionEnabled(true); - store.setNearlineProducerCountPerWriter(RANDOM.nextInt()); - return store; - } - private void assertEqualHybridConfig(StoreHybridConfig actual, HybridStoreConfig expected) { assertEquals(actual.getRewindTimeInSeconds(), expected.getRewindTimeInSeconds()); assertEquals(actual.getOffsetLagThresholdToGoOnline(), expected.getOffsetLagThresholdToGoOnline()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java index 57c667329c4..7dc55dc7b34 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java @@ -15,6 +15,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.repository.NativeMetadataRepository; +import com.linkedin.davinci.repository.RequestBasedMetaRepository; import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AvroSpecificStoreClient; @@ -38,6 +39,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.system.store.MetaStoreDataType; @@ -287,6 +289,42 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException } } + @Test(timeOut = 120 * Time.MS_PER_SECOND) + public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException { + String regularVeniceStoreName = Utils.getUniqueString("venice_store"); + createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName); + D2Client d2Client = null; + NativeMetadataRepository nativeMetadataRepository = null; + try { + d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress()); + ClientConfig clientConfig = + getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true); + // Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store + // current version. + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig); + nativeMetadataRepository.start(); + // ThinClientMetaStoreBasedRepository implementation should be used since CLIENT_USE_META_SYSTEM_STORE_REPOSITORY + // is set to true without enabling other feature flags. + Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository); + verifyRepository(nativeMetadataRepository, regularVeniceStoreName); + } finally { + if (d2Client != null) { + D2ClientUtils.shutdownClient(d2Client); + } + if (nativeMetadataRepository != null) { + // Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to + // initialize + // a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not + // leaked + // into other tests. + nativeMetadataRepository.clear(); + } + } + } + @Test(timeOut = 60 * Time.MS_PER_SECOND) public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException { String regularVeniceStoreName = Utils.getUniqueString("venice_store"); @@ -393,12 +431,14 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertNull(nativeMetadataRepository.getStore("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.getStoreOrThrow("Non-existing-store")); expectThrows(VeniceNoStoreException.class, () -> nativeMetadataRepository.subscribe("Non-existing-store")); - nativeMetadataRepository.subscribe(regularVeniceStoreName); - Store store = nativeMetadataRepository.getStore(regularVeniceStoreName); - Store controllerStore = new ReadOnlyStore( - veniceLocalCluster.getLeaderVeniceController().getVeniceAdmin().getStore(clusterName, regularVeniceStoreName)); - assertEquals(store, controllerStore); + Store store = normalizeStore(new ReadOnlyStore(nativeMetadataRepository.getStore(regularVeniceStoreName))); + Store controllerStore = normalizeStore( + new ReadOnlyStore( + veniceLocalCluster.getLeaderVeniceController() + .getVeniceAdmin() + .getStore(clusterName, regularVeniceStoreName))); + assertEquals(store.toString(), controllerStore.toString()); SchemaEntry keySchema = nativeMetadataRepository.getKeySchema(regularVeniceStoreName); SchemaEntry controllerKeySchema = veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() @@ -423,9 +463,9 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, assertEquals(nativeRepoStore.getStorageQuotaInByte(), storageQuota); }); assertFalse(controllerClient.addValueSchema(regularVeniceStoreName, VALUE_SCHEMA_2).isError()); - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { assertEquals( - nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), + nativeMetadataRepository.getValueSchemas(regularVeniceStoreName), // this does not retry, only executed onces veniceLocalCluster.getLeaderVeniceController() .getVeniceAdmin() .getValueSchemas(clusterName, regularVeniceStoreName)); @@ -448,6 +488,10 @@ private void verifyRepository(NativeMetadataRepository nativeMetadataRepository, }); } + private Store normalizeStore(ReadOnlyStore store) { + return new ReadOnlyStore(new ZKStore(store.cloneStoreProperties())); + } + private void createStoreAndMaterializeMetaSystemStore(String storeName) { createStoreAndMaterializeMetaSystemStore(storeName, VALUE_SCHEMA_1); } @@ -455,8 +499,17 @@ private void createStoreAndMaterializeMetaSystemStore(String storeName) { private void createStoreAndMaterializeMetaSystemStore(String storeName, String valueSchema) { // Verify and create Venice regular store if it doesn't exist. if (parentControllerClient.getStore(storeName).getStore() == null) { - assertFalse( - parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema).isError()); + NewStoreResponse resp = + parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema); + if (resp.isError()) { + System.out.println("Create new store failed: " + resp.getError()); + } + assertFalse(resp.isError()); + assertFalse(parentControllerClient.emptyPush(storeName, "test-push-job", 100).isError()); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } } String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); TestUtils.waitForNonDeterministicPushCompletion( diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 2ee342811d6..7f47088a3f1 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -48,6 +48,7 @@ import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor; import com.linkedin.venice.kafka.protocol.state.PartitionState; +import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.ETLStoreConfig; @@ -517,6 +518,58 @@ public static Store createTestStore(String name, String owner, long createdTime) return store; } + public static ZKStore populateZKStore(ZKStore store, Random random) { + store.setCurrentVersion(random.nextInt()); + store.setPartitionCount(random.nextInt()); + store.setLowWatermark(random.nextLong()); + store.setEnableWrites(false); + store.setEnableReads(true); + store.setStorageQuotaInByte(random.nextLong()); + store.setReadQuotaInCU(random.nextLong()); + store.setHybridStoreConfig(TestUtils.createTestHybridStoreConfig(random)); + store.setViewConfigs(TestUtils.createTestViewConfigs(random)); + store.setCompressionStrategy(CompressionStrategy.GZIP); + store.setClientDecompressionEnabled(true); + store.setChunkingEnabled(true); + store.setRmdChunkingEnabled(true); + store.setBatchGetLimit(random.nextInt()); + store.setNumVersionsToPreserve(random.nextInt()); + store.setIncrementalPushEnabled(true); + store.setSeparateRealTimeTopicEnabled(true); + store.setMigrating(true); + store.setWriteComputationEnabled(true); + store.setReadComputationEnabled(true); + store.setBootstrapToOnlineTimeoutInHours(random.nextInt()); + store.setNativeReplicationEnabled(true); + store.setPushStreamSourceAddress("push_stream_source"); + store.setBackupStrategy(BackupStrategy.DELETE_ON_NEW_PUSH_START); + store.setSchemaAutoRegisterFromPushJobEnabled(true); + store.setLatestSuperSetValueSchemaId(random.nextInt()); + store.setHybridStoreDiskQuotaEnabled(true); + store.setStoreMetaSystemStoreEnabled(true); + store.setEtlStoreConfig(TestUtils.createTestETLStoreConfig()); + store.setPartitionerConfig(TestUtils.createTestPartitionerConfig(random)); + store.setLatestVersionPromoteToCurrentTimestamp(random.nextLong()); + store.setBackupVersionRetentionMs(random.nextLong()); + store.setMigrationDuplicateStore(true); + store.setNativeReplicationSourceFabric("native_replication_source_fabric"); + store.setDaVinciPushStatusStoreEnabled(true); + store.setStoreMetadataSystemStoreEnabled(true); + store.setActiveActiveReplicationEnabled(true); + store.setMinCompactionLagSeconds(random.nextLong()); + store.setMaxCompactionLagSeconds(random.nextLong()); + store.setMaxRecordSizeBytes(random.nextInt()); + store.setMaxNearlineRecordSizeBytes(random.nextInt()); + store.setUnusedSchemaDeletionEnabled(true); + store.setVersions(TestUtils.createTestVersions(store.getName(), random)); + store.setSystemStores(TestUtils.createTestSystemStores(store.getName(), random)); + store.setStorageNodeReadQuotaEnabled(true); + store.setBlobTransferEnabled(true); + store.setNearlineProducerCompressionEnabled(true); + store.setNearlineProducerCountPerWriter(random.nextInt()); + return store; + } + public static HybridStoreConfig createTestHybridStoreConfig(Random random) { HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl( random.nextLong(), diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java index 878e231566e..39fdaacce0d 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerReadMetadataRepository.java @@ -180,7 +180,7 @@ public StorePropertiesResponse getStoreProperties(String storeName, Optional entry: valueSchemas.entrySet()) { int schemaId = Integer.parseInt(entry.getKey().toString()); if (schemaId > largestKnownSchemaId.get()) { - storeValueSchemas.put(schemaId, entry.getValue()); + storeValueSchemas.valueSchemaMap.put(Integer.toString(schemaId), entry.getValue()); } } } else { diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java index 3cd9de7ea97..4bd0ea0f926 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerReadMetadataRepositoryTest.java @@ -158,38 +158,65 @@ public void testGetStoreProperties() { doReturn(readyToServeInstances).when(partition).getReadyToServeInstances(); partitionAssignment.addPartition(partition); String schema = "\"string\""; + ArrayList valueSchemas = new ArrayList<>(); + final int schemaCount = 3; + for (int i = 1; i <= schemaCount; i++) { + valueSchemas.add(new SchemaEntry(i, schema)); + } doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName); Mockito.when(mockSchemaRepo.getKeySchema(storeName)).thenReturn(new SchemaEntry(0, schema)); - Mockito.when(mockSchemaRepo.getValueSchemas(storeName)) - .thenReturn(Collections.singletonList(new SchemaEntry(0, schema))); + Mockito.when(mockSchemaRepo.getValueSchemas(storeName)).thenReturn(valueSchemas); Mockito.when(mockCustomizedViewRepository.getPartitionAssignments(topicName)).thenReturn(partitionAssignment); Mockito.when(mockHelixInstanceConfigRepository.getInstanceGroupIdMapping()).thenReturn(Collections.emptyMap()); - mockStore.setStorageNodeReadQuotaEnabled(true); + + // Request StorePropertiesResponse storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + + // Assert response Assert.assertNotNull(storePropertiesResponse); Assert.assertNotNull(storePropertiesResponse.getResponseRecord()); Assert.assertNotNull(storePropertiesResponse.getResponseRecord().getStoreMetaValue()); Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().get("0"), "\"string\""); - // Verify the metadata Assert.assertEquals( storePropertiesResponse.getResponseRecord().getStoreMetaValue().getStoreProperties().getVersions().size(), 2); Assert.assertEquals(storePropertiesResponse.getResponseRecord().getRoutingInfo().get("0").size(), 1); + ServerCurrentVersionResponse currentVersionResponse = + serverReadMetadataRepository.getCurrentVersionResponse(storeName); + Assert.assertNotNull(currentVersionResponse); + Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Assert metrics repo String metadataInvokeMetricName = ".ServerMetadataStats--request_based_metadata_invoke_count.Rate"; String metadataFailureMetricName = ".ServerMetadataStats--request_based_metadata_failure_count.Rate"; Assert.assertTrue(metricsRepository.getMetric(metadataInvokeMetricName).value() > 0); Assert.assertEquals(metricsRepository.getMetric(metadataFailureMetricName).value(), 0d); - ServerCurrentVersionResponse currentVersionResponse = - serverReadMetadataRepository.getCurrentVersionResponse(storeName); - Assert.assertNotNull(currentVersionResponse); - Assert.assertEquals(currentVersionResponse.getCurrentVersion(), 2); + // Test largestKnownSchemaID param + for (int i = 0; i <= schemaCount; i++) { + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.of(i)); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount - i); + } + storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); + Assert.assertEquals( + storePropertiesResponse.getResponseRecord() + .getStoreMetaValue() + .getStoreValueSchemas() + .getValueSchemaMap() + .size(), + schemaCount); + // Value update test mockStore.setBatchGetLimit(300); storePropertiesResponse = serverReadMetadataRepository.getStoreProperties(storeName, Optional.empty()); Assert.assertEquals(