Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DVC] Add RequestBasedMetaRepository to enable metadata retrieval directly from server #1467

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class DaVinciConfig {
*/
private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD;

/**
* Determines whether to enable request-based metadata retrieval directly from the Venice Server.
* By default, metadata is retrieved from a system store via a thin client.
*/
private boolean useRequestBasedMetaRepository = false;

public DaVinciConfig() {
}

Expand Down Expand Up @@ -147,4 +153,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl
this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold;
return this;
}

public boolean isUseRequestBasedMetaRepository() {
return useRequestBasedMetaRepository;
}

public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) {
this.useRequestBasedMetaRepository = useRequestBasedMetaRepository;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient(
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
.setSpecificValueClass(valueClass)
.setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository());

DaVinciClient client;
if (config.isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.linkedin.davinci.repository;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.stats.NativeMetadataRepositoryStats;
Expand All @@ -25,17 +23,11 @@
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository
private final Map<String, StoreConfig> storeConfigMap = new VeniceConcurrentHashMap<>();
// Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed
// for key schema evolvability.
private final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
protected Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the final here so we may initialize this map in unit test mocking.

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Set<StoreDataChangedListener> listeners = new CopyOnWriteArraySet<>();
private final AtomicLong totalStoreReadQuota = new AtomicLong();
Expand Down Expand Up @@ -125,11 +117,19 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {
NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
} else {
nativeMetadataRepository = new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
}

LOGGER.info(
"Initializing {} with {}",
NativeMetadataRepository.class.getSimpleName(),
ThinClientMetaStoreBasedRepository.class.getSimpleName());
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
nativeMetadataRepository.getClass().getSimpleName());

return nativeMetadataRepository;
}

@Override
Expand Down Expand Up @@ -168,23 +168,19 @@ public boolean hasStore(String storeName) {
return subscribedStoreMap.containsKey(storeName);
}

// refreshOneStore will throw the VeniceNoStoreException when
// retrieving metadata for stores in "Deleting" state or "Missing".
@Override
public Store refreshOneStore(String storeName) {
try {
getAndSetStoreConfigFromSystemStore(storeName);
StoreConfig storeConfig = storeConfigMap.get(storeName);
StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName);
if (storeConfig == null) {
throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName);
}
Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster());
// isDeleting check to detect deleted store is only supported by meta system store based implementation.
if (newStore != null && !storeConfig.isDeleting()) {
putStore(newStore);
getAndCacheSchemaDataFromSystemStore(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
} else {
removeStore(storeName);
xunyin8 marked this conversation as resolved.
Show resolved Hide resolved
}
Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster());
putStore(newStore);
getAndCacheSchemaData(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
return newStore;
} catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) {
throw new VeniceNoStoreException(storeName, e);
Expand Down Expand Up @@ -393,74 +389,17 @@ public void clear() {
* Get the store cluster config from system store and update the local cache with it. Different implementation will
* get the data differently but should all populate the store cluster config map.
*/
protected void getAndSetStoreConfigFromSystemStore(String storeName) {
storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName));
protected StoreConfig cacheStoreConfigFromRemote(String storeName) {
StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName);
storeConfigMap.put(storeName, storeConfig);
return storeConfig;
}

protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName);
protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName);

protected abstract Store getStoreFromSystemStore(String storeName, String clusterName);
protected abstract Store fetchStoreFromRemote(String storeName, String clusterName);

protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key);

// Helper function with common code for retrieving StoreConfig from meta system store.
protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) {
StoreClusterConfig clusterConfig = getStoreMetaValue(
storeName,
MetaStoreDataType.STORE_CLUSTER_CONFIG
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig;
return new StoreConfig(clusterConfig);
}

// Helper function with common code for retrieving SchemaData from meta system store.
protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
SchemaData schemaData = schemaMap.get(storeName);
SchemaEntry keySchema;
if (schemaData == null) {
// Retrieve the key schema and initialize SchemaData only if it's not cached yet.
StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> keySchemaMap =
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
if (keySchemaMap.isEmpty()) {
throw new VeniceException("No key schema found for store: " + storeName);
}
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
keySchema =
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
schemaData = new SchemaData(storeName, keySchema);
}
StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> valueSchemaMap =
getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap;
// Check the value schema string, if it's empty then try to query the other key space for individual value schema.
for (Map.Entry<CharSequence, CharSequence> entry: valueSchemaMap.entrySet()) {
// Check if we already have the corresponding value schema
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
if (schemaData.getValueSchema(valueSchemaId) != null) {
continue;
}
if (entry.getValue().toString().isEmpty()) {
// The value schemas might be too large to be stored in a single K/V.
StoreMetaKey individualValueSchemaKey =
MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap<String, String>() {
{
put(KEY_STRING_STORE_NAME, storeName);
put(KEY_STRING_SCHEMA_ID, entry.getKey().toString());
}
});
// Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in
// the individual value schema key space.
String valueSchema =
getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString();
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema));
} else {
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString()));
}
}
return schemaData;
}
protected abstract SchemaData getSchemaData(String storeName);

protected Store putStore(Store newStore) {
// Workaround to make old metadata compatible with new fields
Expand Down Expand Up @@ -516,11 +455,11 @@ protected void notifyStoreChanged(Store store) {
}
}

protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
protected SchemaData getAndCacheSchemaData(String storeName) {
if (!hasStore(storeName)) {
throw new VeniceNoStoreException(storeName);
}
SchemaData schemaData = getSchemaDataFromSystemStore(storeName);
SchemaData schemaData = getSchemaData(storeName);
schemaMap.put(storeName, schemaData);
return schemaData;
}
Expand All @@ -532,7 +471,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException {
SchemaData schemaData = schemaMap.get(storeName);
if (schemaData == null) {
schemaData = getAndCacheSchemaDataFromSystemStore(storeName);
schemaData = getAndCacheSchemaData(storeName);
}
return schemaData;
}
Expand All @@ -545,8 +484,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) {
return schemaData.getValueSchema(id);
}

protected abstract SchemaData getSchemaDataFromSystemStore(String storeName);

/**
* This function is used to remove schema entry for the given store from local cache,
* and related listeners as well.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.linkedin.davinci.repository;

import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import org.apache.avro.Schema;


public class RequestBasedMetaRepository extends NativeMetadataRepository {

// cluster -> client
private final Map<String, D2TransportClient> d2TransportClientMap = new VeniceConcurrentHashMap<>();

// storeName -> T
protected Map<String, SchemaData> storeSchemaMap = new VeniceConcurrentHashMap<>();

private final D2TransportClient d2DiscoveryTransportClient;
private D2ServiceDiscovery d2ServiceDiscovery;

public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) {
super(clientConfig, backendConfig);
this.d2ServiceDiscovery = new D2ServiceDiscovery();
this.d2DiscoveryTransportClient =
new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client());
}

@Override
public void clear() {
super.clear();

// Clear cache
d2TransportClientMap.clear();
storeSchemaMap.clear();
}

@Override
protected StoreConfig fetchStoreConfigFromRemote(String storeName) {
// Create StoreConfig from D2
D2TransportClient d2TransportClient = getD2TransportClient(storeName);

StoreClusterConfig storeClusterConfig = new StoreClusterConfig();
storeClusterConfig.setStoreName(storeName);
storeClusterConfig.setCluster(d2TransportClient.getServiceName());

return new StoreConfig(storeClusterConfig);
}

@Override
protected Store fetchStoreFromRemote(String storeName, String clusterName) {
// Fetch store, bypass cache
StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName);
StoreProperties storeProperties = record.storeMetaValue.storeProperties;
return new ZKStore(storeProperties);
}

@Override
protected SchemaData getSchemaData(String storeName) {
if (!storeSchemaMap.containsKey(storeName)) {
// Cache miss
fetchAndCacheStorePropertiesResponseRecord(storeName);
}
return storeSchemaMap.get(storeName);
}

protected StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) {

// Request
int maxValueSchemaId = getMaxValueSchemaId(storeName);
D2TransportClient d2TransportClient = getD2TransportClient(storeName);
String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName;
if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) {
requestBasedStorePropertiesURL += "/" + maxValueSchemaId;
}

TransportClientResponse response;
try {
response = d2TransportClient.get(requestBasedStorePropertiesURL).get();
} catch (Exception e) {
throw new RuntimeException(
"Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL
+ ": " + e);
}

// Deserialize
Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$;
RecordDeserializer<StorePropertiesResponseRecord> recordDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class);
StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody());

// Cache
cacheStoreSchema(storeName, record);

return record;
}

D2TransportClient getD2TransportClient(String storeName) {
synchronized (this) {
// Get cluster for store
String serverD2ServiceName =
d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service();
if (d2TransportClientMap.containsKey(serverD2ServiceName)) {
return d2TransportClientMap.get(serverD2ServiceName);
}
D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client());
d2TransportClientMap.put(serverD2ServiceName, d2TransportClient);
return d2TransportClient;
}
}

protected int getMaxValueSchemaId(String storeName) {
if (!schemaMap.containsKey(storeName)) {
return SchemaData.UNKNOWN_SCHEMA_ID;
}
return schemaMap.get(storeName).getMaxValueSchemaId();
}

protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {
if (!storeSchemaMap.containsKey(storeName)) {
// New store
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}
// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
.getValueSchemaMap()
.entrySet()) {
storeSchemaMap.get(storeName)
.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString()));
}
}
}
Loading
Loading