Skip to content

Commit df28d1e

Browse files
authored
[da-vinci][common] DVC consumer for materialized view (batch only) (#1466)
* [da-vinci][common] DVC consumer for materialized view (batch only) Limiting the change to support batch only materialized view to keep the PR small. Hybrid DVC consumer support and related features such as heartbeat will be added in a separate PR New DaVinciClientFactory APIs for creating DVC for a given view. Defined a new "storeName" rule for views to be used for metrics reporting and DVC client. See VeniceView.getViewStoreName for details. Introduced NativeMetadataRepositoryViewAdapter and HelixReadOnlyStoreViewConfigRepositoryAdapter to provide read-only interface to access various store metadata for both regular Venice stores and store views with the VeniceView.getViewStoreName. There is some issue with chunking support on the read path. When chunking is enabled the view topic keys are doubly wrapped by serializeNonChunkedKey. This is because during NR pass-through mode the view writer is essentially trying to chunk the chunk. The tactical fix now is to unwrap the key with chunked suffix bytes appended and pass it to the view writer to be wrapped again and sent to the correct partition. This only works with non-large messages. I.e. chunking is enabled but nothing is actually getting chunked. Large messages will require a proper fix.
1 parent 21327c7 commit df28d1e

32 files changed

+1363
-75
lines changed

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,22 @@ public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
128128
return subscribe(partitions, Optional.empty());
129129
}
130130

131+
private Version getCurrentVersion() {
132+
return backend.getVeniceCurrentVersion(storeName);
133+
}
134+
135+
private Version getLatestNonFaultyVersion() {
136+
return backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
137+
}
138+
131139
synchronized CompletableFuture<Void> subscribe(
132140
ComplementSet<Integer> partitions,
133141
Optional<Version> bootstrapVersion) {
134142
if (daVinciCurrentVersion == null) {
135143
setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> {
136-
Version version = backend.getVeniceCurrentVersion(storeName);
144+
Version version = getCurrentVersion();
137145
if (version == null) {
138-
version = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
146+
version = getLatestNonFaultyVersion();
139147
}
140148
if (version == null) {
141149
throw new VeniceException("Cannot subscribe to an empty store, storeName=" + storeName);
@@ -218,9 +226,9 @@ synchronized void trySubscribeDaVinciFutureVersion() {
218226
return;
219227
}
220228

221-
Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
229+
Version veniceCurrentVersion = getCurrentVersion();
222230
// Latest non-faulty store version in Venice store.
223-
Version veniceLatestVersion = backend.getVeniceLatestNonFaultyVersion(storeName, faultyVersionSet);
231+
Version veniceLatestVersion = getLatestNonFaultyVersion();
224232
Version targetVersion;
225233
// Make sure current version in the store config has highest priority.
226234
if (veniceCurrentVersion != null
@@ -246,7 +254,7 @@ synchronized void trySubscribeDaVinciFutureVersion() {
246254
* failure.
247255
*/
248256
synchronized void validateDaVinciAndVeniceCurrentVersion() {
249-
Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
257+
Version veniceCurrentVersion = getCurrentVersion();
250258
if (veniceCurrentVersion != null && daVinciCurrentVersion != null) {
251259
if (veniceCurrentVersion.getNumber() > daVinciCurrentVersion.getVersion().getNumber()
252260
&& faultyVersionSet.contains(veniceCurrentVersion.getNumber())) {
@@ -294,7 +302,7 @@ synchronized void tryDeleteInvalidDaVinciFutureVersion() {
294302
synchronized void trySwapDaVinciCurrentVersion(Throwable failure) {
295303
if (daVinciFutureVersion != null) {
296304
// Fetch current version from store config.
297-
Version veniceCurrentVersion = backend.getVeniceCurrentVersion(storeName);
305+
Version veniceCurrentVersion = getCurrentVersion();
298306
if (veniceCurrentVersion == null) {
299307
LOGGER.warn("Failed to retrieve current version of store: " + storeName);
300308
return;

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ public synchronized void start() {
779779
if (isReady()) {
780780
return;
781781
}
782-
logger.info("Starting client, storeName=" + getStoreName());
782+
logger.info("Starting client, storeName={}", getStoreName());
783783
VeniceConfigLoader configLoader = buildVeniceConfig();
784784
Optional<ObjectCacheConfig> cacheConfig = Optional.ofNullable(daVinciConfig.getCacheConfig());
785785
initBackend(clientConfig, configLoader, managedClients, icProvider, cacheConfig, recordTransformerConfig);
@@ -790,7 +790,6 @@ public synchronized void start() {
790790
if (daVinciConfig.isCacheEnabled()) {
791791
cacheBackend = getBackend().getObjectCache();
792792
}
793-
794793
storeBackend = getBackend().getStoreOrThrow(getStoreName());
795794
if (managedClients.isPresent()) {
796795
storeBackend.setManaged(daVinciConfig.isManaged());

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/CachingDaVinciClientFactory.java

+76-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.linkedin.venice.exceptions.VeniceException;
1212
import com.linkedin.venice.service.ICProvider;
1313
import com.linkedin.venice.utils.VeniceProperties;
14+
import com.linkedin.venice.views.VeniceView;
1415
import io.tehuti.metrics.MetricsRepository;
1516
import java.io.Closeable;
1617
import java.util.ArrayList;
@@ -162,6 +163,7 @@ private Class getClientClass(DaVinciConfig daVinciConfig, boolean isSpecific) {
162163
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinciConfig config) {
163164
return getClient(
164165
storeName,
166+
null,
165167
config,
166168
null,
167169
new GenericDaVinciClientConstructor<>(),
@@ -172,6 +174,7 @@ public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinci
172174
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinciConfig config, Class<V> valueClass) {
173175
return getClient(
174176
storeName,
177+
null,
175178
config,
176179
valueClass,
177180
new GenericDaVinciClientConstructor<>(),
@@ -183,6 +186,7 @@ public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, DaVinci
183186
public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(String storeName, DaVinciConfig config) {
184187
return getClient(
185188
storeName,
189+
null,
186190
config,
187191
null,
188192
new GenericDaVinciClientConstructor<>(),
@@ -196,8 +200,66 @@ public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
196200
Class<V> valueClass) {
197201
return getClient(
198202
storeName,
203+
null,
204+
config,
205+
valueClass,
206+
new GenericDaVinciClientConstructor<>(),
207+
getClientClass(config, false),
208+
true);
209+
}
210+
211+
@Override
212+
public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
213+
String storeName,
214+
DaVinciConfig config,
215+
Class<V> valueClass) {
216+
return getClient(
217+
storeName,
218+
null,
219+
config,
220+
valueClass,
221+
new SpecificDaVinciClientConstructor<>(),
222+
getClientClass(config, true),
223+
false);
224+
}
225+
226+
@Override
227+
public <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
228+
String storeName,
229+
DaVinciConfig config,
230+
Class<V> valueClass) {
231+
return getClient(
232+
storeName,
233+
null,
199234
config,
200235
valueClass,
236+
new SpecificDaVinciClientConstructor<>(),
237+
getClientClass(config, true),
238+
true);
239+
}
240+
241+
@Override
242+
public <K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, String viewName, DaVinciConfig config) {
243+
return getClient(
244+
storeName,
245+
viewName,
246+
config,
247+
null,
248+
new GenericDaVinciClientConstructor<>(),
249+
getClientClass(config, false),
250+
false);
251+
}
252+
253+
@Override
254+
public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
255+
String storeName,
256+
String viewName,
257+
DaVinciConfig config) {
258+
return getClient(
259+
storeName,
260+
viewName,
261+
config,
262+
null,
201263
new GenericDaVinciClientConstructor<>(),
202264
getClientClass(config, false),
203265
true);
@@ -206,10 +268,12 @@ public <K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(
206268
@Override
207269
public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
208270
String storeName,
271+
String viewName,
209272
DaVinciConfig config,
210273
Class<V> valueClass) {
211274
return getClient(
212275
storeName,
276+
viewName,
213277
config,
214278
valueClass,
215279
new SpecificDaVinciClientConstructor<>(),
@@ -220,10 +284,12 @@ public <K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
220284
@Override
221285
public <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
222286
String storeName,
287+
String viewName,
223288
DaVinciConfig config,
224289
Class<V> valueClass) {
225290
return getClient(
226291
storeName,
292+
viewName,
227293
config,
228294
valueClass,
229295
new SpecificDaVinciClientConstructor<>(),
@@ -290,29 +356,31 @@ public DaVinciClient<K, V> apply(
290356

291357
protected synchronized DaVinciClient getClient(
292358
String storeName,
359+
String viewName,
293360
DaVinciConfig config,
294361
Class valueClass,
295362
DaVinciClientConstructor clientConstructor,
296363
Class clientClass,
297364
boolean startClient) {
365+
String internalStoreName = viewName == null ? storeName : VeniceView.getViewStoreName(storeName, viewName);
298366
if (closed) {
299-
throw new VeniceException("Unable to get a client from a closed factory, storeName=" + storeName);
367+
throw new VeniceException("Unable to get a client from a closed factory, storeName=" + internalStoreName);
300368
}
301369

302-
DaVinciConfig originalConfig = configs.computeIfAbsent(storeName, k -> config);
370+
DaVinciConfig originalConfig = configs.computeIfAbsent(internalStoreName, k -> config);
303371
if (originalConfig.isManaged() != config.isManaged()) {
304372
throw new VeniceException(
305-
"Managed flag conflict" + ", storeName=" + storeName + ", original=" + originalConfig.isManaged()
373+
"Managed flag conflict" + ", storeName=" + internalStoreName + ", original=" + originalConfig.isManaged()
306374
+ ", requested=" + config.isManaged());
307375
}
308376

309377
if (originalConfig.getStorageClass() != config.getStorageClass()) {
310378
throw new VeniceException(
311-
"Storage class conflict" + ", storeName=" + storeName + ", original=" + originalConfig.getStorageClass()
312-
+ ", requested=" + config.getStorageClass());
379+
"Storage class conflict" + ", storeName=" + internalStoreName + ", original="
380+
+ originalConfig.getStorageClass() + ", requested=" + config.getStorageClass());
313381
}
314382

315-
ClientConfig clientConfig = new ClientConfig(storeName).setD2Client(d2Client)
383+
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
316384
.setD2ServiceName(clusterDiscoveryD2ServiceName)
317385
.setMetricsRepository(metricsRepository)
318386
.setSpecificValueClass(valueClass);
@@ -325,12 +393,12 @@ protected synchronized DaVinciClient getClient(
325393
isolatedClients.add(client);
326394
} else {
327395
client = sharedClients.computeIfAbsent(
328-
storeName,
396+
internalStoreName,
329397
k -> clientConstructor.apply(config, clientConfig, backendConfig, managedClients, icProvider));
330398

331399
if (!clientClass.isInstance(client)) {
332400
throw new VeniceException(
333-
"Client type conflict" + ", storeName=" + storeName + ", originalClientClass=" + client.getClass()
401+
"Client type conflict" + ", storeName=" + internalStoreName + ", originalClientClass=" + client.getClass()
334402
+ ", requestedClientClass=" + clientClass);
335403
}
336404
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/DaVinciClientFactory.java

+16
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,20 @@ <K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
1919
String storeName,
2020
DaVinciConfig config,
2121
Class<V> valueClass);
22+
23+
<K, V> DaVinciClient<K, V> getGenericAvroClient(String storeName, String viewName, DaVinciConfig config);
24+
25+
<K, V> DaVinciClient<K, V> getAndStartGenericAvroClient(String storeName, String viewName, DaVinciConfig config);
26+
27+
<K, V extends SpecificRecord> DaVinciClient<K, V> getSpecificAvroClient(
28+
String storeName,
29+
String viewName,
30+
DaVinciConfig config,
31+
Class<V> valueClass);
32+
33+
<K, V extends SpecificRecord> DaVinciClient<K, V> getAndStartSpecificAvroClient(
34+
String storeName,
35+
String viewName,
36+
DaVinciConfig config,
37+
Class<V> valueClass);
2238
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -3377,9 +3377,11 @@ protected void processMessageAndMaybeProduceToKafka(
33773377
// Write to views
33783378
if (hasViewWriters()) {
33793379
Put newPut = writeComputeResultWrapper.getNewPut();
3380+
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
3381+
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
33803382
queueUpVersionTopicWritesWithViewWriters(
33813383
partitionConsumptionState,
3382-
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
3384+
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey),
33833385
produceToVersionTopic);
33843386
} else {
33853387
produceToVersionTopic.run();

0 commit comments

Comments
 (0)