Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][common][vpj] Introduce ComplexVenicePartitioner to materialized view #1509

Merged
merged 5 commits into from
Feb 22, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed comments
xunyin8 committed Feb 21, 2025
commit 5cf9039b599f6cd7d47df7d1eb5740fedac54467
Original file line number Diff line number Diff line change
@@ -669,14 +669,8 @@ protected void processMessageAndMaybeProduceToKafka(
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
Lazy<GenericRecord> newValueProvider;
if (mergeConflictResult.getNewValueDeserialized().isPresent()) {
newValueProvider = Lazy.of(() -> mergeConflictResult.getNewValueDeserialized().get());
} else {
newValueProvider = getNewValueProvider(
mergeConflictResultWrapper.getUpdatedValueBytes(),
mergeConflictResult.getValueSchemaId());
}
Lazy<GenericRecord> newValueProvider = mergeConflictResultWrapper
.getNewValueProvider((schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId));
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(
Original file line number Diff line number Diff line change
@@ -3196,9 +3196,14 @@ private PubSubMessageProcessedResult processMessage(
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
Lazy<GenericRecord> valueProvider;
switch (msgType) {
case PUT:
Put put = (Put) kafkaValue.payloadUnion;
// Value provider should use un-compressed data.
final ByteBuffer rawPutValue = put.putValue;
valueProvider =
Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId).deserialize(rawPutValue));
put.putValue = maybeCompressData(
consumerRecord.getTopicPartition().getPartitionNumber(),
put.putValue,
@@ -3221,7 +3226,7 @@ private PubSubMessageProcessedResult processMessage(
null);
}

return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false));
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider));

case UPDATE:
/**
@@ -3266,20 +3271,21 @@ private PubSubMessageProcessedResult processMessage(

final byte[] updatedValueBytes;
final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest();

GenericRecord updatedValue;
try {
long writeComputeStartTimeInNS = System.nanoTime();

// Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call
// 'maybeCompress'.
updatedValue = storeWriteComputeHandler.applyWriteCompute(
currValue,
update.schemaId,
readerValueSchemaId,
update.updateValue,
update.updateSchemaId,
readerUpdateProtocolVersion);
updatedValueBytes = compressor.get()
.compress(
storeWriteComputeHandler.applyWriteCompute(
currValue,
update.schemaId,
readerValueSchemaId,
update.updateValue,
update.updateSchemaId,
readerUpdateProtocolVersion));
.compress(storeWriteComputeHandler.serializeUpdatedValue(updatedValue, readerValueSchemaId));
hostLevelIngestionStats
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
} catch (Exception e) {
@@ -3316,7 +3322,8 @@ private PubSubMessageProcessedResult processMessage(
Put updatedPut = new Put();
updatedPut.putValue = updateValueWithSchemaId;
updatedPut.schemaId = readerValueSchemaId;
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(updatedPut, oldValueManifest, false));
return new PubSubMessageProcessedResult(
new WriteComputeResultWrapper(updatedPut, oldValueManifest, false, Lazy.of(() -> updatedValue)));
}
case DELETE:
/**
@@ -3325,7 +3332,19 @@ private PubSubMessageProcessedResult processMessage(
if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null);
}
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false));
// Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to generate
// deletes for materialized view topic partition(s).
Lazy<GenericRecord> oldValueProvider = Lazy.of(() -> {
ChunkedValueManifestContainer oldValueManifestContainer = new ChunkedValueManifestContainer();
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId();
return readStoredValueRecord(
partitionConsumptionState,
keyBytes,
oldValueReaderSchemaId,
consumerRecord.getTopicPartition(),
oldValueManifestContainer);
});
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider));

default:
throw new VeniceMessageException(
@@ -3377,7 +3396,7 @@ protected void processMessageAndMaybeProduceToKafka(
Put newPut = writeComputeResultWrapper.getNewPut();
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
Lazy<GenericRecord> newValueProvider = getNewValueProvider(newPut.putValue, newPut.schemaId);
Lazy<GenericRecord> newValueProvider = writeComputeResultWrapper.getValueProvider();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter
@@ -3971,7 +3990,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio

protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Function<VeniceViewWriter, CompletableFuture<Void>> viewWriterRecordProcessor,
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
@@ -4065,10 +4084,4 @@ <T> T databaseLookupWithConcurrencyLimit(Supplier<T> supplier) {
return supplier.get();
}
}
protected Lazy<GenericRecord> getNewValueProvider(ByteBuffer newValue, int schemaId) {
if (newValue == null) {
return Lazy.of(() -> null);
}
return Lazy.of(() -> storeDeserializerCache.getDeserializer(schemaId, schemaId).deserialize(newValue));
}
}
Original file line number Diff line number Diff line change
@@ -4,8 +4,11 @@
import com.linkedin.davinci.replication.merge.MergeConflictResult;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;


/**
@@ -64,4 +67,31 @@ public ByteBuffer getUpdatedValueBytes() {
public ByteBuffer getUpdatedRmdBytes() {
return updatedRmdBytes;
}

/**
* Return a best-effort value provider with the following behaviors:
* 1. returns the new value provider for PUT and UPDATE.
* 2. returns the old value for DELETE (null for non-existent key).
* 3. returns null if the value is not available.
*/
public Lazy<GenericRecord> getNewValueProvider(
Function<Integer, RecordDeserializer<GenericRecord>> deserializerProvider) {
if (updatedValueBytes == null) {
// this is a DELETE
ByteBufferValueRecord<ByteBuffer> oldValue = oldValueProvider.get();
if (oldValue == null || oldValue.value() == null) {
return Lazy.of(() -> null);
}
return Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()));
} else {
// this is a PUT or UPDATE
if (mergeConflictResult.getValueDeserialized().isPresent()) {
return Lazy.of(() -> mergeConflictResult.getValueDeserialized().get());
}
// Use mergeConflictResult.getNewValue() here and not updatedValueBytes for non-compressed value bytes.
return Lazy.of(
() -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId())
.deserialize(mergeConflictResult.getNewValue()));
}
}
}
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ public StoreWriteComputeProcessor(
*
* @return Bytes of partially updated original value.
*/
public byte[] applyWriteCompute(
public GenericRecord applyWriteCompute(
GenericRecord currValue,
int writerValueSchemaId,
int readerValueSchemaId,
@@ -110,9 +110,10 @@ public byte[] applyWriteCompute(
writeComputeProcessor.updateRecord(readerSchemaContainer.getValueSchema(), currValue, writeComputeRecord);

// If write compute is enabled and the record is deleted, the updatedValue will be null.
if (updatedValue == null) {
return null;
}
return updatedValue;
}

public byte[] serializeUpdatedValue(GenericRecord updatedValue, int readerValueSchemaId) {
return getValueSerializer(readerValueSchemaId).serialize(updatedValue);
}

Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.lazy.Lazy;
import org.apache.avro.generic.GenericRecord;


/**
@@ -14,11 +16,21 @@ public class WriteComputeResultWrapper {
* This can be true when there is some delete op against a non-existing entry.
*/
private final boolean skipProduce;
private final Lazy<GenericRecord> valueProvider;

public WriteComputeResultWrapper(Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce) {
this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null));
}

public WriteComputeResultWrapper(
Put newPut,
ChunkedValueManifest oldValueManifest,
boolean skipProduce,
Lazy<GenericRecord> valueProvider) {
this.newPut = newPut;
this.oldValueManifest = oldValueManifest;
this.skipProduce = skipProduce;
this.valueProvider = valueProvider;
}

public Put getNewPut() {
@@ -32,4 +44,14 @@ public ChunkedValueManifest getOldValueManifest() {
public boolean isSkipProduce() {
return skipProduce;
}

/**
* Return a best-effort value provider with the following behaviors:
* 1. returns the new value provider for PUT and UPDATE.
* 2. returns the old value for DELETE (null for non-existent key).
* 3. returns null if the value is not available.
*/
public Lazy<GenericRecord> getValueProvider() {
return this.valueProvider;
}
}
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ public class MergeConflictResult {
private static final MergeConflictResult IGNORED_RESULT = new MergeConflictResult();

private ByteBuffer newValue;
private Optional<GenericRecord> newValueDeserialized;
private Optional<GenericRecord> valueDeserialized;
private int valueSchemaId;
private final boolean updateIgnored; // Whether we should skip the incoming message since it could be a stale message.
private boolean resultReusesInput;
@@ -29,13 +29,13 @@ public MergeConflictResult(

public MergeConflictResult(
ByteBuffer newValue,
Optional<GenericRecord> newValueDeserialized,
Optional<GenericRecord> valueDeserialized,
int valueSchemaID,
boolean resultReusesInput,
GenericRecord rmdRecord) {
this.updateIgnored = false;
this.newValue = newValue;
this.newValueDeserialized = newValueDeserialized;
this.valueDeserialized = valueDeserialized;
this.valueSchemaId = valueSchemaID;
this.resultReusesInput = resultReusesInput;
this.rmdRecord = rmdRecord;
@@ -75,7 +75,7 @@ public GenericRecord getRmdRecord() {
* deserialize the value to generate the MCR.
* @return deserialized new value if possible.
*/
public Optional<GenericRecord> getNewValueDeserialized() {
return newValueDeserialized;
public Optional<GenericRecord> getValueDeserialized() {
return valueDeserialized;
}
}
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
@@ -60,7 +59,7 @@ public ChangeCaptureViewWriter(
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
public CompletableFuture<Void> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
@@ -85,7 +84,7 @@ public CompletableFuture<PubSubProduceResult> processRecord(
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
public CompletableFuture<Void> processRecord(
ByteBuffer newValue,
byte[] key,
int newValueSchemaId,
Original file line number Diff line number Diff line change
@@ -6,13 +6,12 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VeniceComplexPartitioner;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.writer.ComplexVeniceWriter;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
@@ -32,9 +31,8 @@ public class MaterializedViewWriter extends VeniceViewWriter {
private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory;
private final MaterializedView internalView;
private final String materializedViewTopicName;
private Lazy<VeniceWriter> veniceWriter;
private Lazy<ComplexVeniceWriter> veniceWriter;
private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
private final VeniceComplexPartitioner complexPartitioner;

public MaterializedViewWriter(
VeniceConfigLoader props,
@@ -49,23 +47,18 @@ public MaterializedViewWriter(
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
this.veniceWriter = Lazy.of(
() -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions()));
if (internalView.getViewPartitioner() instanceof VeniceComplexPartitioner) {
complexPartitioner = (VeniceComplexPartitioner) internalView.getViewPartitioner();
} else {
complexPartitioner = null;
}
.createComplexVeniceWriter(buildWriterOptions()));
}

/**
* package private for testing purpose
*/
void setVeniceWriter(VeniceWriter veniceWriter) {
void setVeniceWriter(ComplexVeniceWriter veniceWriter) {
this.veniceWriter = Lazy.of(() -> veniceWriter);
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
public CompletableFuture<Void> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
@@ -85,7 +78,7 @@ public CompletableFuture<PubSubProduceResult> processRecord(
* will assemble and re-chunk.
*/
@Override
public CompletableFuture<PubSubProduceResult> processRecord(
public CompletableFuture<Void> processRecord(
ByteBuffer newValue,
byte[] key,
int newValueSchemaId,
@@ -96,22 +89,11 @@ public CompletableFuture<PubSubProduceResult> processRecord(
viewTopicKey = keyWithChunkingSuffixSerializer.getKeyFromChunkedKey(key);
}
byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue);
if (complexPartitioner != null) {
return veniceWriter.get()
.writeWithComplexPartitioner(
viewTopicKey,
newValueBytes,
newValueSchemaId,
newValueProvider,
complexPartitioner,
internalView.getViewPartitionCount());
} else {
if (newValue == null) {
// this is a delete operation
return veniceWriter.get().delete(viewTopicKey, null);
}
return veniceWriter.get().put(viewTopicKey, newValueBytes, newValueSchemaId);
if (newValue == null) {
// this is a delete operation
return veniceWriter.get().complexDelete(viewTopicKey, newValueProvider);
}
return veniceWriter.get().complexPut(viewTopicKey, newValueBytes, newValueSchemaId, newValueProvider);
}

@Override
Loading