Skip to content

Commit 1bb67bc

Browse files
committed
Addressed comments
1 parent f769e8f commit 1bb67bc

File tree

21 files changed

+444
-241
lines changed

21 files changed

+444
-241
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -669,14 +669,8 @@ protected void processMessageAndMaybeProduceToKafka(
669669
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
670670
int oldValueSchemaId =
671671
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
672-
Lazy<GenericRecord> newValueProvider;
673-
if (mergeConflictResult.getNewValueDeserialized().isPresent()) {
674-
newValueProvider = Lazy.of(() -> mergeConflictResult.getNewValueDeserialized().get());
675-
} else {
676-
newValueProvider = getNewValueProvider(
677-
mergeConflictResultWrapper.getUpdatedValueBytes(),
678-
mergeConflictResult.getValueSchemaId());
679-
}
672+
Lazy<GenericRecord> newValueProvider = mergeConflictResultWrapper
673+
.getNewValueProvider((schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId));
680674
queueUpVersionTopicWritesWithViewWriters(
681675
partitionConsumptionState,
682676
(viewWriter) -> viewWriter.processRecord(

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3196,9 +3196,14 @@ private PubSubMessageProcessedResult processMessage(
31963196
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
31973197
byte[] keyBytes = kafkaKey.getKey();
31983198
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
3199+
Lazy<GenericRecord> valueProvider;
31993200
switch (msgType) {
32003201
case PUT:
32013202
Put put = (Put) kafkaValue.payloadUnion;
3203+
// Value provider should use un-compressed data.
3204+
final ByteBuffer rawPutValue = put.putValue;
3205+
valueProvider =
3206+
Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId).deserialize(rawPutValue));
32023207
put.putValue = maybeCompressData(
32033208
consumerRecord.getTopicPartition().getPartitionNumber(),
32043209
put.putValue,
@@ -3221,7 +3226,7 @@ private PubSubMessageProcessedResult processMessage(
32213226
null);
32223227
}
32233228

3224-
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false));
3229+
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider));
32253230

32263231
case UPDATE:
32273232
/**
@@ -3266,20 +3271,21 @@ private PubSubMessageProcessedResult processMessage(
32663271

32673272
final byte[] updatedValueBytes;
32683273
final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest();
3269-
3274+
GenericRecord updatedValue;
32703275
try {
32713276
long writeComputeStartTimeInNS = System.nanoTime();
3277+
32723278
// Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call
32733279
// 'maybeCompress'.
3280+
updatedValue = storeWriteComputeHandler.applyWriteCompute(
3281+
currValue,
3282+
update.schemaId,
3283+
readerValueSchemaId,
3284+
update.updateValue,
3285+
update.updateSchemaId,
3286+
readerUpdateProtocolVersion);
32743287
updatedValueBytes = compressor.get()
3275-
.compress(
3276-
storeWriteComputeHandler.applyWriteCompute(
3277-
currValue,
3278-
update.schemaId,
3279-
readerValueSchemaId,
3280-
update.updateValue,
3281-
update.updateSchemaId,
3282-
readerUpdateProtocolVersion));
3288+
.compress(storeWriteComputeHandler.serializeUpdatedValue(updatedValue, readerValueSchemaId));
32833289
hostLevelIngestionStats
32843290
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
32853291
} catch (Exception e) {
@@ -3316,7 +3322,8 @@ private PubSubMessageProcessedResult processMessage(
33163322
Put updatedPut = new Put();
33173323
updatedPut.putValue = updateValueWithSchemaId;
33183324
updatedPut.schemaId = readerValueSchemaId;
3319-
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(updatedPut, oldValueManifest, false));
3325+
return new PubSubMessageProcessedResult(
3326+
new WriteComputeResultWrapper(updatedPut, oldValueManifest, false, Lazy.of(() -> updatedValue)));
33203327
}
33213328
case DELETE:
33223329
/**
@@ -3325,7 +3332,19 @@ private PubSubMessageProcessedResult processMessage(
33253332
if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
33263333
partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null);
33273334
}
3328-
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false));
3335+
// Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to generate
3336+
// deletes for materialized view topic partition(s).
3337+
Lazy<GenericRecord> oldValueProvider = Lazy.of(() -> {
3338+
ChunkedValueManifestContainer oldValueManifestContainer = new ChunkedValueManifestContainer();
3339+
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId();
3340+
return readStoredValueRecord(
3341+
partitionConsumptionState,
3342+
keyBytes,
3343+
oldValueReaderSchemaId,
3344+
consumerRecord.getTopicPartition(),
3345+
oldValueManifestContainer);
3346+
});
3347+
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider));
33293348

33303349
default:
33313350
throw new VeniceMessageException(
@@ -3377,7 +3396,7 @@ protected void processMessageAndMaybeProduceToKafka(
33773396
Put newPut = writeComputeResultWrapper.getNewPut();
33783397
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
33793398
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
3380-
Lazy<GenericRecord> newValueProvider = getNewValueProvider(newPut.putValue, newPut.schemaId);
3399+
Lazy<GenericRecord> newValueProvider = writeComputeResultWrapper.getValueProvider();
33813400
queueUpVersionTopicWritesWithViewWriters(
33823401
partitionConsumptionState,
33833402
(viewWriter) -> viewWriter
@@ -3971,7 +3990,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
39713990

39723991
protected void queueUpVersionTopicWritesWithViewWriters(
39733992
PartitionConsumptionState partitionConsumptionState,
3974-
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
3993+
Function<VeniceViewWriter, CompletableFuture<Void>> viewWriterRecordProcessor,
39753994
Runnable versionTopicWrite) {
39763995
long preprocessingTime = System.currentTimeMillis();
39773996
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
@@ -4065,10 +4084,4 @@ <T> T databaseLookupWithConcurrencyLimit(Supplier<T> supplier) {
40654084
return supplier.get();
40664085
}
40674086
}
4068-
protected Lazy<GenericRecord> getNewValueProvider(ByteBuffer newValue, int schemaId) {
4069-
if (newValue == null) {
4070-
return Lazy.of(() -> null);
4071-
}
4072-
return Lazy.of(() -> storeDeserializerCache.getDeserializer(schemaId, schemaId).deserialize(newValue));
4073-
}
40744087
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
import com.linkedin.davinci.replication.merge.MergeConflictResult;
55
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
66
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
7+
import com.linkedin.venice.serializer.RecordDeserializer;
78
import com.linkedin.venice.utils.lazy.Lazy;
89
import java.nio.ByteBuffer;
10+
import java.util.function.Function;
11+
import org.apache.avro.generic.GenericRecord;
912

1013

1114
/**
@@ -64,4 +67,31 @@ public ByteBuffer getUpdatedValueBytes() {
6467
public ByteBuffer getUpdatedRmdBytes() {
6568
return updatedRmdBytes;
6669
}
70+
71+
/**
72+
* Return a best-effort value provider with the following behaviors:
73+
* 1. returns the new value provider for PUT and UPDATE.
74+
* 2. returns the old value for DELETE (null for non-existent key).
75+
* 3. returns null if the value is not available.
76+
*/
77+
public Lazy<GenericRecord> getNewValueProvider(
78+
Function<Integer, RecordDeserializer<GenericRecord>> deserializerProvider) {
79+
if (updatedValueBytes == null) {
80+
// this is a DELETE
81+
ByteBufferValueRecord<ByteBuffer> oldValue = oldValueProvider.get();
82+
if (oldValue == null || oldValue.value() == null) {
83+
return Lazy.of(() -> null);
84+
}
85+
return Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()));
86+
} else {
87+
// this is a PUT or UPDATE
88+
if (mergeConflictResult.getValueDeserialized().isPresent()) {
89+
return Lazy.of(() -> mergeConflictResult.getValueDeserialized().get());
90+
}
91+
// Use mergeConflictResult.getNewValue() here and not updatedValueBytes for non-compressed value bytes.
92+
return Lazy.of(
93+
() -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId())
94+
.deserialize(mergeConflictResult.getNewValue()));
95+
}
96+
}
6797
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public StoreWriteComputeProcessor(
9393
*
9494
* @return Bytes of partially updated original value.
9595
*/
96-
public byte[] applyWriteCompute(
96+
public GenericRecord applyWriteCompute(
9797
GenericRecord currValue,
9898
int writerValueSchemaId,
9999
int readerValueSchemaId,
@@ -110,9 +110,10 @@ public byte[] applyWriteCompute(
110110
writeComputeProcessor.updateRecord(readerSchemaContainer.getValueSchema(), currValue, writeComputeRecord);
111111

112112
// If write compute is enabled and the record is deleted, the updatedValue will be null.
113-
if (updatedValue == null) {
114-
return null;
115-
}
113+
return updatedValue;
114+
}
115+
116+
public byte[] serializeUpdatedValue(GenericRecord updatedValue, int readerValueSchemaId) {
116117
return getValueSerializer(readerValueSchemaId).serialize(updatedValue);
117118
}
118119

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.linkedin.venice.kafka.protocol.Put;
44
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
5+
import com.linkedin.venice.utils.lazy.Lazy;
6+
import org.apache.avro.generic.GenericRecord;
57

68

79
/**
@@ -14,11 +16,21 @@ public class WriteComputeResultWrapper {
1416
* This can be true when there is some delete op against a non-existing entry.
1517
*/
1618
private final boolean skipProduce;
19+
private final Lazy<GenericRecord> valueProvider;
1720

1821
public WriteComputeResultWrapper(Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce) {
22+
this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null));
23+
}
24+
25+
public WriteComputeResultWrapper(
26+
Put newPut,
27+
ChunkedValueManifest oldValueManifest,
28+
boolean skipProduce,
29+
Lazy<GenericRecord> valueProvider) {
1930
this.newPut = newPut;
2031
this.oldValueManifest = oldValueManifest;
2132
this.skipProduce = skipProduce;
33+
this.valueProvider = valueProvider;
2234
}
2335

2436
public Put getNewPut() {
@@ -32,4 +44,14 @@ public ChunkedValueManifest getOldValueManifest() {
3244
public boolean isSkipProduce() {
3345
return skipProduce;
3446
}
47+
48+
/**
49+
* Return a best-effort value provider with the following behaviors:
50+
* 1. returns the new value provider for PUT and UPDATE.
51+
* 2. returns the old value for DELETE (null for non-existent key).
52+
* 3. returns null if the value is not available.
53+
*/
54+
public Lazy<GenericRecord> getValueProvider() {
55+
return this.valueProvider;
56+
}
3557
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class MergeConflictResult {
1313
private static final MergeConflictResult IGNORED_RESULT = new MergeConflictResult();
1414

1515
private ByteBuffer newValue;
16-
private Optional<GenericRecord> newValueDeserialized;
16+
private Optional<GenericRecord> valueDeserialized;
1717
private int valueSchemaId;
1818
private final boolean updateIgnored; // Whether we should skip the incoming message since it could be a stale message.
1919
private boolean resultReusesInput;
@@ -29,13 +29,13 @@ public MergeConflictResult(
2929

3030
public MergeConflictResult(
3131
ByteBuffer newValue,
32-
Optional<GenericRecord> newValueDeserialized,
32+
Optional<GenericRecord> valueDeserialized,
3333
int valueSchemaID,
3434
boolean resultReusesInput,
3535
GenericRecord rmdRecord) {
3636
this.updateIgnored = false;
3737
this.newValue = newValue;
38-
this.newValueDeserialized = newValueDeserialized;
38+
this.valueDeserialized = valueDeserialized;
3939
this.valueSchemaId = valueSchemaID;
4040
this.resultReusesInput = resultReusesInput;
4141
this.rmdRecord = rmdRecord;
@@ -75,7 +75,7 @@ public GenericRecord getRmdRecord() {
7575
* deserialize the value to generate the MCR.
7676
* @return deserialized new value if possible.
7777
*/
78-
public Optional<GenericRecord> getNewValueDeserialized() {
79-
return newValueDeserialized;
78+
public Optional<GenericRecord> getValueDeserialized() {
79+
return valueDeserialized;
8080
}
8181
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.linkedin.venice.message.KafkaKey;
1515
import com.linkedin.venice.meta.Version;
1616
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
17-
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
1817
import com.linkedin.venice.schema.rmd.RmdUtils;
1918
import com.linkedin.venice.utils.VeniceProperties;
2019
import com.linkedin.venice.utils.lazy.Lazy;
@@ -60,7 +59,7 @@ public ChangeCaptureViewWriter(
6059
}
6160

6261
@Override
63-
public CompletableFuture<PubSubProduceResult> processRecord(
62+
public CompletableFuture<Void> processRecord(
6463
ByteBuffer newValue,
6564
ByteBuffer oldValue,
6665
byte[] key,
@@ -85,7 +84,7 @@ public CompletableFuture<PubSubProduceResult> processRecord(
8584
}
8685

8786
@Override
88-
public CompletableFuture<PubSubProduceResult> processRecord(
87+
public CompletableFuture<Void> processRecord(
8988
ByteBuffer newValue,
9089
byte[] key,
9190
int newValueSchemaId,

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
77
import com.linkedin.venice.message.KafkaKey;
88
import com.linkedin.venice.meta.Version;
9-
import com.linkedin.venice.partitioner.VeniceComplexPartitioner;
109
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
11-
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
1210
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
1311
import com.linkedin.venice.utils.ByteUtils;
1412
import com.linkedin.venice.utils.lazy.Lazy;
1513
import com.linkedin.venice.views.MaterializedView;
14+
import com.linkedin.venice.writer.ComplexVeniceWriter;
1615
import com.linkedin.venice.writer.VeniceWriter;
1716
import com.linkedin.venice.writer.VeniceWriterFactory;
1817
import com.linkedin.venice.writer.VeniceWriterOptions;
@@ -32,9 +31,8 @@ public class MaterializedViewWriter extends VeniceViewWriter {
3231
private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory;
3332
private final MaterializedView internalView;
3433
private final String materializedViewTopicName;
35-
private Lazy<VeniceWriter> veniceWriter;
34+
private Lazy<ComplexVeniceWriter> veniceWriter;
3635
private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
37-
private final VeniceComplexPartitioner complexPartitioner;
3836

3937
public MaterializedViewWriter(
4038
VeniceConfigLoader props,
@@ -49,23 +47,18 @@ public MaterializedViewWriter(
4947
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
5048
this.veniceWriter = Lazy.of(
5149
() -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null)
52-
.createVeniceWriter(buildWriterOptions()));
53-
if (internalView.getViewPartitioner() instanceof VeniceComplexPartitioner) {
54-
complexPartitioner = (VeniceComplexPartitioner) internalView.getViewPartitioner();
55-
} else {
56-
complexPartitioner = null;
57-
}
50+
.createComplexVeniceWriter(buildWriterOptions()));
5851
}
5952

6053
/**
6154
* package private for testing purpose
6255
*/
63-
void setVeniceWriter(VeniceWriter veniceWriter) {
56+
void setVeniceWriter(ComplexVeniceWriter veniceWriter) {
6457
this.veniceWriter = Lazy.of(() -> veniceWriter);
6558
}
6659

6760
@Override
68-
public CompletableFuture<PubSubProduceResult> processRecord(
61+
public CompletableFuture<Void> processRecord(
6962
ByteBuffer newValue,
7063
ByteBuffer oldValue,
7164
byte[] key,
@@ -85,7 +78,7 @@ public CompletableFuture<PubSubProduceResult> processRecord(
8578
* will assemble and re-chunk.
8679
*/
8780
@Override
88-
public CompletableFuture<PubSubProduceResult> processRecord(
81+
public CompletableFuture<Void> processRecord(
8982
ByteBuffer newValue,
9083
byte[] key,
9184
int newValueSchemaId,
@@ -96,22 +89,11 @@ public CompletableFuture<PubSubProduceResult> processRecord(
9689
viewTopicKey = keyWithChunkingSuffixSerializer.getKeyFromChunkedKey(key);
9790
}
9891
byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue);
99-
if (complexPartitioner != null) {
100-
return veniceWriter.get()
101-
.writeWithComplexPartitioner(
102-
viewTopicKey,
103-
newValueBytes,
104-
newValueSchemaId,
105-
newValueProvider,
106-
complexPartitioner,
107-
internalView.getViewPartitionCount());
108-
} else {
109-
if (newValue == null) {
110-
// this is a delete operation
111-
return veniceWriter.get().delete(viewTopicKey, null);
112-
}
113-
return veniceWriter.get().put(viewTopicKey, newValueBytes, newValueSchemaId);
92+
if (newValue == null) {
93+
// this is a delete operation
94+
return veniceWriter.get().complexDelete(viewTopicKey, newValueProvider);
11495
}
96+
return veniceWriter.get().complexPut(viewTopicKey, newValueBytes, newValueSchemaId, newValueProvider);
11597
}
11698

11799
@Override

0 commit comments

Comments
 (0)