Skip to content

Commit cfa6434

Browse files
committed
add requestbasedmetarepository to dvc
1 parent fe1500f commit cfa6434

File tree

9 files changed

+434
-136
lines changed

9 files changed

+434
-136
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/NativeMetadataRepository.java

Lines changed: 22 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.linkedin.davinci.repository;
22

33
import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
4-
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID;
5-
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
64
import static java.lang.Thread.currentThread;
75

86
import com.linkedin.davinci.stats.NativeMetadataRepositoryStats;
@@ -25,17 +23,11 @@
2523
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
2624
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
2725
import com.linkedin.venice.service.ICProvider;
28-
import com.linkedin.venice.system.store.MetaStoreDataType;
29-
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
30-
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
31-
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
3226
import com.linkedin.venice.utils.VeniceProperties;
3327
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
3428
import java.time.Clock;
3529
import java.util.ArrayList;
3630
import java.util.Collection;
37-
import java.util.Collections;
38-
import java.util.HashMap;
3931
import java.util.List;
4032
import java.util.Map;
4133
import java.util.Set;
@@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository
7062
private final Map<String, StoreConfig> storeConfigMap = new VeniceConcurrentHashMap<>();
7163
// Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed
7264
// for key schema evolvability.
73-
private final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
65+
protected final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
7466
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
7567
private final Set<StoreDataChangedListener> listeners = new CopyOnWriteArraySet<>();
7668
private final AtomicLong totalStoreReadQuota = new AtomicLong();
@@ -128,8 +120,12 @@ public static NativeMetadataRepository getInstance(
128120
LOGGER.info(
129121
"Initializing {} with {}",
130122
NativeMetadataRepository.class.getSimpleName(),
131-
ThinClientMetaStoreBasedRepository.class.getSimpleName());
132-
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
123+
RequestBasedMetaRepository.class.getSimpleName());
124+
if (clientConfig.isUseRequestBasedMetaRepository()) {
125+
return new RequestBasedMetaRepository(clientConfig, backendConfig);
126+
} else {
127+
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
128+
}
133129
}
134130

135131
@Override
@@ -171,20 +167,14 @@ public boolean hasStore(String storeName) {
171167
@Override
172168
public Store refreshOneStore(String storeName) {
173169
try {
174-
getAndSetStoreConfigFromSystemStore(storeName);
175-
StoreConfig storeConfig = storeConfigMap.get(storeName);
170+
StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName);
176171
if (storeConfig == null) {
177172
throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName);
178173
}
179-
Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster());
180-
// isDeleting check to detect deleted store is only supported by meta system store based implementation.
181-
if (newStore != null && !storeConfig.isDeleting()) {
182-
putStore(newStore);
183-
getAndCacheSchemaDataFromSystemStore(storeName);
184-
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
185-
} else {
186-
removeStore(storeName);
187-
}
174+
Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster());
175+
putStore(newStore);
176+
getAndCacheSchemaData(storeName);
177+
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
188178
return newStore;
189179
} catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) {
190180
throw new VeniceNoStoreException(storeName, e);
@@ -393,74 +383,17 @@ public void clear() {
393383
* Get the store cluster config from system store and update the local cache with it. Different implementation will
394384
* get the data differently but should all populate the store cluster config map.
395385
*/
396-
protected void getAndSetStoreConfigFromSystemStore(String storeName) {
397-
storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName));
386+
protected StoreConfig cacheStoreConfigFromRemote(String storeName) {
387+
StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName);
388+
storeConfigMap.put(storeName, storeConfig);
389+
return storeConfig;
398390
}
399391

400-
protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName);
392+
protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName);
401393

402-
protected abstract Store getStoreFromSystemStore(String storeName, String clusterName);
394+
protected abstract Store fetchStoreFromRemote(String storeName, String clusterName);
403395

404-
protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key);
405-
406-
// Helper function with common code for retrieving StoreConfig from meta system store.
407-
protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) {
408-
StoreClusterConfig clusterConfig = getStoreMetaValue(
409-
storeName,
410-
MetaStoreDataType.STORE_CLUSTER_CONFIG
411-
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig;
412-
return new StoreConfig(clusterConfig);
413-
}
414-
415-
// Helper function with common code for retrieving SchemaData from meta system store.
416-
protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
417-
SchemaData schemaData = schemaMap.get(storeName);
418-
SchemaEntry keySchema;
419-
if (schemaData == null) {
420-
// Retrieve the key schema and initialize SchemaData only if it's not cached yet.
421-
StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS
422-
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
423-
Map<CharSequence, CharSequence> keySchemaMap =
424-
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
425-
if (keySchemaMap.isEmpty()) {
426-
throw new VeniceException("No key schema found for store: " + storeName);
427-
}
428-
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
429-
keySchema =
430-
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
431-
schemaData = new SchemaData(storeName, keySchema);
432-
}
433-
StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS
434-
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
435-
Map<CharSequence, CharSequence> valueSchemaMap =
436-
getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap;
437-
// Check the value schema string, if it's empty then try to query the other key space for individual value schema.
438-
for (Map.Entry<CharSequence, CharSequence> entry: valueSchemaMap.entrySet()) {
439-
// Check if we already have the corresponding value schema
440-
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
441-
if (schemaData.getValueSchema(valueSchemaId) != null) {
442-
continue;
443-
}
444-
if (entry.getValue().toString().isEmpty()) {
445-
// The value schemas might be too large to be stored in a single K/V.
446-
StoreMetaKey individualValueSchemaKey =
447-
MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap<String, String>() {
448-
{
449-
put(KEY_STRING_STORE_NAME, storeName);
450-
put(KEY_STRING_SCHEMA_ID, entry.getKey().toString());
451-
}
452-
});
453-
// Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in
454-
// the individual value schema key space.
455-
String valueSchema =
456-
getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString();
457-
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema));
458-
} else {
459-
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString()));
460-
}
461-
}
462-
return schemaData;
463-
}
396+
protected abstract SchemaData getSchemaData(String storeName);
464397

465398
protected Store putStore(Store newStore) {
466399
// Workaround to make old metadata compatible with new fields
@@ -516,11 +449,11 @@ protected void notifyStoreChanged(Store store) {
516449
}
517450
}
518451

519-
protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
452+
protected SchemaData getAndCacheSchemaData(String storeName) {
520453
if (!hasStore(storeName)) {
521454
throw new VeniceNoStoreException(storeName);
522455
}
523-
SchemaData schemaData = getSchemaDataFromSystemStore(storeName);
456+
SchemaData schemaData = getSchemaData(storeName);
524457
schemaMap.put(storeName, schemaData);
525458
return schemaData;
526459
}
@@ -532,7 +465,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
532465
private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException {
533466
SchemaData schemaData = schemaMap.get(storeName);
534467
if (schemaData == null) {
535-
schemaData = getAndCacheSchemaDataFromSystemStore(storeName);
468+
schemaData = getAndCacheSchemaData(storeName);
536469
}
537470
return schemaData;
538471
}
@@ -545,8 +478,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) {
545478
return schemaData.getValueSchema(id);
546479
}
547480

548-
protected abstract SchemaData getSchemaDataFromSystemStore(String storeName);
549-
550481
/**
551482
* This function is used to remove schema entry for the given store from local cache,
552483
* and related listeners as well.
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package com.linkedin.davinci.repository;
2+
3+
import com.linkedin.venice.client.store.ClientConfig;
4+
import com.linkedin.venice.client.store.D2ServiceDiscovery;
5+
import com.linkedin.venice.client.store.transport.D2TransportClient;
6+
import com.linkedin.venice.client.store.transport.TransportClientResponse;
7+
import com.linkedin.venice.meta.QueryAction;
8+
import com.linkedin.venice.meta.Store;
9+
import com.linkedin.venice.meta.StoreConfig;
10+
import com.linkedin.venice.meta.ZKStore;
11+
import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord;
12+
import com.linkedin.venice.schema.SchemaData;
13+
import com.linkedin.venice.schema.SchemaEntry;
14+
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
15+
import com.linkedin.venice.serializer.RecordDeserializer;
16+
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
17+
import com.linkedin.venice.systemstore.schemas.StoreProperties;
18+
import com.linkedin.venice.utils.VeniceProperties;
19+
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
20+
import java.util.Map;
21+
import org.apache.avro.Schema;
22+
23+
24+
public class RequestBasedMetaRepository extends NativeMetadataRepository {
25+
26+
// cluster -> client
27+
private final Map<String, D2TransportClient> d2TransportClientMap = new VeniceConcurrentHashMap<>();
28+
29+
// storeName -> T
30+
private final Map<String, SchemaData> storeSchemaMap = new VeniceConcurrentHashMap<>();
31+
32+
private final D2TransportClient d2DiscoveryTransportClient;
33+
private D2ServiceDiscovery d2ServiceDiscovery;
34+
35+
public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) {
36+
super(clientConfig, backendConfig);
37+
this.d2ServiceDiscovery = new D2ServiceDiscovery();
38+
this.d2DiscoveryTransportClient =
39+
new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client());
40+
}
41+
42+
@Override
43+
public void clear() {
44+
super.clear();
45+
46+
// Clear cache
47+
d2TransportClientMap.clear();
48+
storeSchemaMap.clear();
49+
}
50+
51+
@Override
52+
protected StoreConfig fetchStoreConfigFromRemote(String storeName) {
53+
// Create StoreConfig from D2
54+
D2TransportClient d2TransportClient = getD2TransportClient(storeName);
55+
56+
StoreClusterConfig storeClusterConfig = new StoreClusterConfig();
57+
storeClusterConfig.setStoreName(storeName);
58+
storeClusterConfig.setCluster(d2TransportClient.getServiceName());
59+
60+
return new StoreConfig(storeClusterConfig);
61+
}
62+
63+
@Override
64+
protected Store fetchStoreFromRemote(String storeName, String clusterName) {
65+
// Fetch store, bypass cache
66+
StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName);
67+
StoreProperties storeProperties = record.storeMetaValue.storeProperties;
68+
return new ZKStore(storeProperties);
69+
}
70+
71+
@Override
72+
protected SchemaData getSchemaData(String storeName) {
73+
if (!storeSchemaMap.containsKey(storeName)) {
74+
// Cache miss
75+
fetchAndCacheStorePropertiesResponseRecord(storeName);
76+
}
77+
78+
return storeSchemaMap.get(storeName);
79+
}
80+
81+
private StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) {
82+
83+
// Request
84+
int maxValueSchemaId = getMaxValueSchemaId(storeName);
85+
D2TransportClient d2TransportClient = getD2TransportClient(storeName);
86+
String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName;
87+
if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) {
88+
requestBasedStorePropertiesURL += "/" + maxValueSchemaId;
89+
}
90+
91+
TransportClientResponse response;
92+
try {
93+
response = d2TransportClient.get(requestBasedStorePropertiesURL).get();
94+
} catch (Exception e) {
95+
throw new RuntimeException(
96+
"Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL
97+
+ ": " + e);
98+
}
99+
100+
// Deserialize
101+
Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$;
102+
RecordDeserializer<StorePropertiesResponseRecord> recordDeserializer = FastSerializerDeserializerFactory
103+
.getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class);
104+
StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody());
105+
106+
// Cache
107+
cacheStoreSchema(storeName, record);
108+
109+
return record;
110+
}
111+
112+
D2TransportClient getD2TransportClient(String storeName) {
113+
synchronized (this) {
114+
// Get cluster for store
115+
String serverD2ServiceName =
116+
d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service();
117+
if (d2TransportClientMap.containsKey(serverD2ServiceName)) {
118+
return d2TransportClientMap.get(serverD2ServiceName);
119+
}
120+
D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client());
121+
d2TransportClientMap.put(serverD2ServiceName, d2TransportClient);
122+
return d2TransportClient;
123+
}
124+
}
125+
126+
private int getMaxValueSchemaId(String storeName) {
127+
if (!schemaMap.containsKey(storeName)) {
128+
return SchemaData.UNKNOWN_SCHEMA_ID;
129+
}
130+
return schemaMap.get(storeName).getMaxValueSchemaId();
131+
}
132+
133+
private void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {
134+
135+
if (!storeSchemaMap.containsKey(storeName)) {
136+
// New schema data
137+
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
138+
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
139+
SchemaData schemaData = new SchemaData(
140+
storeName,
141+
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
142+
storeSchemaMap.put(storeName, schemaData);
143+
}
144+
145+
// Store Value Schemas
146+
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
147+
.getStoreValueSchemas()
148+
.getValueSchemaMap()
149+
.entrySet()) {
150+
storeSchemaMap.get(storeName)
151+
.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString()));
152+
}
153+
}
154+
}

0 commit comments

Comments
 (0)