Skip to content

Commit ca52bc5

Browse files
committed
Addressed comments and fixed re-push and compression support
1 parent 1bb67bc commit ca52bc5

File tree

22 files changed

+322
-130
lines changed

22 files changed

+322
-130
lines changed

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
549549
rmdWithValueSchemaID,
550550
valueManifestContainer,
551551
null,
552-
null));
552+
null,
553+
(schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId)));
553554
} else {
554555
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation);
555556

@@ -589,7 +590,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
589590
rmdWithValueSchemaID,
590591
valueManifestContainer,
591592
updatedValueBytes,
592-
updatedRmdBytes));
593+
updatedRmdBytes,
594+
(schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId)));
593595
}
594596
}
595597

@@ -669,8 +671,7 @@ protected void processMessageAndMaybeProduceToKafka(
669671
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
670672
int oldValueSchemaId =
671673
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
672-
Lazy<GenericRecord> newValueProvider = mergeConflictResultWrapper
673-
.getNewValueProvider((schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId));
674+
Lazy<GenericRecord> newValueProvider = mergeConflictResultWrapper.getNewValueProvider();
674675
queueUpVersionTopicWritesWithViewWriters(
675676
partitionConsumptionState,
676677
(viewWriter) -> viewWriter.processRecord(

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import com.linkedin.venice.schema.SchemaEntry;
7272
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
7373
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
74+
import com.linkedin.venice.serializer.RecordDeserializer;
7475
import com.linkedin.venice.stats.StatsErrorCode;
7576
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
7677
import com.linkedin.venice.utils.ByteUtils;
@@ -3202,8 +3203,20 @@ private PubSubMessageProcessedResult processMessage(
32023203
Put put = (Put) kafkaValue.payloadUnion;
32033204
// Value provider should use un-compressed data.
32043205
final ByteBuffer rawPutValue = put.putValue;
3205-
valueProvider =
3206-
Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId).deserialize(rawPutValue));
3206+
final boolean needToDecompress = !partitionConsumptionState.isEndOfPushReceived();
3207+
valueProvider = Lazy.of(() -> {
3208+
RecordDeserializer<GenericRecord> recordDeserializer =
3209+
storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId);
3210+
if (needToDecompress) {
3211+
try {
3212+
return recordDeserializer.deserialize(compressor.get().decompress(rawPutValue));
3213+
} catch (IOException e) {
3214+
throw new VeniceException("Unable to provide value due to decompression failure", e);
3215+
}
3216+
} else {
3217+
return recordDeserializer.deserialize(rawPutValue);
3218+
}
3219+
});
32073220
put.putValue = maybeCompressData(
32083221
consumerRecord.getTopicPartition().getPartitionNumber(),
32093222
put.putValue,
@@ -3271,21 +3284,20 @@ private PubSubMessageProcessedResult processMessage(
32713284

32723285
final byte[] updatedValueBytes;
32733286
final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest();
3274-
GenericRecord updatedValue;
3287+
WriteComputeResult writeComputeResult;
32753288
try {
32763289
long writeComputeStartTimeInNS = System.nanoTime();
32773290

32783291
// Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call
32793292
// 'maybeCompress'.
3280-
updatedValue = storeWriteComputeHandler.applyWriteCompute(
3293+
writeComputeResult = storeWriteComputeHandler.applyWriteCompute(
32813294
currValue,
32823295
update.schemaId,
32833296
readerValueSchemaId,
32843297
update.updateValue,
32853298
update.updateSchemaId,
32863299
readerUpdateProtocolVersion);
3287-
updatedValueBytes = compressor.get()
3288-
.compress(storeWriteComputeHandler.serializeUpdatedValue(updatedValue, readerValueSchemaId));
3300+
updatedValueBytes = writeComputeResult.getUpdatedValueBytes();
32893301
hostLevelIngestionStats
32903302
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
32913303
} catch (Exception e) {
@@ -3323,7 +3335,11 @@ private PubSubMessageProcessedResult processMessage(
33233335
updatedPut.putValue = updateValueWithSchemaId;
33243336
updatedPut.schemaId = readerValueSchemaId;
33253337
return new PubSubMessageProcessedResult(
3326-
new WriteComputeResultWrapper(updatedPut, oldValueManifest, false, Lazy.of(() -> updatedValue)));
3338+
new WriteComputeResultWrapper(
3339+
updatedPut,
3340+
oldValueManifest,
3341+
false,
3342+
Lazy.of(writeComputeResult::getUpdatedValue)));
33273343
}
33283344
case DELETE:
33293345
/**

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java

+32-20
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,53 @@ public class MergeConflictResultWrapper {
2020
private final Lazy<ByteBuffer> oldValueByteBufferProvider;
2121
private final RmdWithValueSchemaId oldRmdWithValueSchemaId;
2222
private final ChunkedValueManifestContainer oldValueManifestContainer;
23+
24+
// Serialized and potentially compressed updated value bytes
2325
private final ByteBuffer updatedValueBytes;
2426
private final ByteBuffer updatedRmdBytes;
2527

28+
/**
29+
* Best-effort deserialized value provider that provides the updated value for PUT/UPDATE and the old value for
30+
* DELETE.
31+
*/
32+
private final Lazy<GenericRecord> valueProvider;
33+
2634
public MergeConflictResultWrapper(
2735
MergeConflictResult mergeConflictResult,
2836
Lazy<ByteBufferValueRecord<ByteBuffer>> oldValueProvider,
2937
Lazy<ByteBuffer> oldValueByteBufferProvider,
3038
RmdWithValueSchemaId oldRmdWithValueSchemaId,
3139
ChunkedValueManifestContainer oldValueManifestContainer,
3240
ByteBuffer updatedValueBytes,
33-
ByteBuffer updatedRmdBytes) {
41+
ByteBuffer updatedRmdBytes,
42+
Function<Integer, RecordDeserializer<GenericRecord>> deserializerProvider) {
3443
this.mergeConflictResult = mergeConflictResult;
3544
this.oldValueProvider = oldValueProvider;
3645
this.oldValueByteBufferProvider = oldValueByteBufferProvider;
3746
this.oldRmdWithValueSchemaId = oldRmdWithValueSchemaId;
3847
this.oldValueManifestContainer = oldValueManifestContainer;
3948
this.updatedValueBytes = updatedValueBytes;
4049
this.updatedRmdBytes = updatedRmdBytes;
50+
if (updatedValueBytes == null) {
51+
// this is a DELETE
52+
ByteBufferValueRecord<ByteBuffer> oldValue = oldValueProvider.get();
53+
if (oldValue == null || oldValue.value() == null) {
54+
this.valueProvider = Lazy.of(() -> null);
55+
} else {
56+
this.valueProvider =
57+
Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()));
58+
}
59+
} else {
60+
// this is a PUT or UPDATE
61+
if (mergeConflictResult.getDeserializedValue().isPresent()) {
62+
this.valueProvider = Lazy.of(() -> mergeConflictResult.getDeserializedValue().get());
63+
} else {
64+
// Use mergeConflictResult.getNewValue() here since updatedValueBytes could be compressed.
65+
this.valueProvider = Lazy.of(
66+
() -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId())
67+
.deserialize(mergeConflictResult.getNewValue()));
68+
}
69+
}
4170
}
4271

4372
public MergeConflictResult getMergeConflictResult() {
@@ -74,24 +103,7 @@ public ByteBuffer getUpdatedRmdBytes() {
74103
* 2. returns the old value for DELETE (null for non-existent key).
75104
* 3. returns null if the value is not available.
76105
*/
77-
public Lazy<GenericRecord> getNewValueProvider(
78-
Function<Integer, RecordDeserializer<GenericRecord>> deserializerProvider) {
79-
if (updatedValueBytes == null) {
80-
// this is a DELETE
81-
ByteBufferValueRecord<ByteBuffer> oldValue = oldValueProvider.get();
82-
if (oldValue == null || oldValue.value() == null) {
83-
return Lazy.of(() -> null);
84-
}
85-
return Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()));
86-
} else {
87-
// this is a PUT or UPDATE
88-
if (mergeConflictResult.getValueDeserialized().isPresent()) {
89-
return Lazy.of(() -> mergeConflictResult.getValueDeserialized().get());
90-
}
91-
// Use mergeConflictResult.getNewValue() here and not updatedValueBytes for non-compressed value bytes.
92-
return Lazy.of(
93-
() -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId())
94-
.deserialize(mergeConflictResult.getNewValue()));
95-
}
106+
public Lazy<GenericRecord> getNewValueProvider() {
107+
return valueProvider;
96108
}
97109
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ public StoreWriteComputeProcessor(
9191
* @param writerUpdateProtocolVersion Update protocol version used to serialize Update payload bytes.
9292
* @param readerUpdateProtocolVersion Update protocol version used to deserialize Update payload bytes.
9393
*
94-
* @return Bytes of partially updated original value.
94+
* @return {@link WriteComputeResult} of partially updated original value.
9595
*/
96-
public GenericRecord applyWriteCompute(
96+
public WriteComputeResult applyWriteCompute(
9797
GenericRecord currValue,
9898
int writerValueSchemaId,
9999
int readerValueSchemaId,
@@ -110,11 +110,10 @@ public GenericRecord applyWriteCompute(
110110
writeComputeProcessor.updateRecord(readerSchemaContainer.getValueSchema(), currValue, writeComputeRecord);
111111

112112
// If write compute is enabled and the record is deleted, the updatedValue will be null.
113-
return updatedValue;
114-
}
115-
116-
public byte[] serializeUpdatedValue(GenericRecord updatedValue, int readerValueSchemaId) {
117-
return getValueSerializer(readerValueSchemaId).serialize(updatedValue);
113+
if (updatedValue == null) {
114+
return new WriteComputeResult(null, null);
115+
}
116+
return new WriteComputeResult(getValueSerializer(readerValueSchemaId).serialize(updatedValue), updatedValue);
118117
}
119118

120119
private SchemaAndUniqueId getSchemaAndUniqueId(int valueSchemaId, int writeComputeSchemaId) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.linkedin.davinci.kafka.consumer;
2+
3+
import javax.annotation.Nullable;
4+
import org.apache.avro.generic.GenericRecord;
5+
6+
7+
/**
8+
* Write compute result wrapper class holding the deserialized updated value and the serialized and potentially
9+
* compressed updated value bytes.
10+
*/
11+
public class WriteComputeResult {
12+
private final byte[] updatedValueBytes;
13+
private final GenericRecord updatedValue;
14+
15+
public WriteComputeResult(byte[] updatedValueBytes, GenericRecord updatedValue) {
16+
this.updatedValueBytes = updatedValueBytes;
17+
this.updatedValue = updatedValue;
18+
}
19+
20+
@Nullable
21+
public byte[] getUpdatedValueBytes() {
22+
return updatedValueBytes;
23+
}
24+
25+
@Nullable
26+
public GenericRecord getUpdatedValue() {
27+
return updatedValue;
28+
}
29+
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class MergeConflictResult {
1313
private static final MergeConflictResult IGNORED_RESULT = new MergeConflictResult();
1414

1515
private ByteBuffer newValue;
16-
private Optional<GenericRecord> valueDeserialized;
16+
private Optional<GenericRecord> deserializedValue;
1717
private int valueSchemaId;
1818
private final boolean updateIgnored; // Whether we should skip the incoming message since it could be a stale message.
1919
private boolean resultReusesInput;
@@ -29,13 +29,13 @@ public MergeConflictResult(
2929

3030
public MergeConflictResult(
3131
ByteBuffer newValue,
32-
Optional<GenericRecord> valueDeserialized,
32+
Optional<GenericRecord> deserializedValue,
3333
int valueSchemaID,
3434
boolean resultReusesInput,
3535
GenericRecord rmdRecord) {
3636
this.updateIgnored = false;
3737
this.newValue = newValue;
38-
this.valueDeserialized = valueDeserialized;
38+
this.deserializedValue = deserializedValue;
3939
this.valueSchemaId = valueSchemaID;
4040
this.resultReusesInput = resultReusesInput;
4141
this.rmdRecord = rmdRecord;
@@ -75,7 +75,7 @@ public GenericRecord getRmdRecord() {
7575
* deserialize the value to generate the MCR.
7676
* @return deserialized new value if possible.
7777
*/
78-
public Optional<GenericRecord> getValueDeserialized() {
79-
return valueDeserialized;
78+
public Optional<GenericRecord> getDeserializedValue() {
79+
return deserializedValue;
8080
}
8181
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public CompletableFuture<Void> processRecord(
6666
int newValueSchemaId,
6767
int oldValueSchemaId,
6868
GenericRecord replicationMetadataRecord,
69-
Lazy<GenericRecord> newValueProvider) {
69+
Lazy<GenericRecord> valueProvider) {
7070
// TODO: not sold about having currentValue in the interface but it VASTLY simplifies a lot of things with regards
7171
// to dealing with compression/chunking/etc. in the storage layer.
7272

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ public CompletableFuture<Void> processRecord(
6565
int newValueSchemaId,
6666
int oldValueSchemaId,
6767
GenericRecord replicationMetadataRecord,
68-
Lazy<GenericRecord> newValueProvider) {
69-
return processRecord(newValue, key, newValueSchemaId, false, newValueProvider);
68+
Lazy<GenericRecord> valueProvider) {
69+
return processRecord(newValue, key, newValueSchemaId, false, valueProvider);
7070
}
7171

7272
/**

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public VeniceViewWriter(
6565
* @param newValueSchemaId the schemaId of the incoming record
6666
* @param oldValueSchemaId the schemaId of the old record
6767
* @param replicationMetadataRecord the associated RMD for the incoming record.
68-
* @param newValueProvider to provide the deserialized new value
68+
* @param valueProvider to provide the corresponding deserialized newValue for PUT and UPDATE or the old value for the
69+
* given key for DELETE.
6970
*/
7071
public abstract CompletableFuture<Void> processRecord(
7172
ByteBuffer newValue,
@@ -74,7 +75,7 @@ public abstract CompletableFuture<Void> processRecord(
7475
int newValueSchemaId,
7576
int oldValueSchemaId,
7677
GenericRecord replicationMetadataRecord,
77-
Lazy<GenericRecord> newValueProvider);
78+
Lazy<GenericRecord> valueProvider);
7879

7980
/**
8081
* To be called as a given ingestion task consumes each record. This is called prior to writing to a

Diff for: clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

+18-15
Original file line numberDiff line numberDiff line change
@@ -769,21 +769,6 @@ public void run() {
769769
pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE;
770770
LOGGER.info("Overriding re-push rewind time in seconds to: {}", pushJobSetting.rewindTimeInSecondsOverride);
771771
}
772-
if (pushJobSetting.repushTTLEnabled) {
773-
// Build the full path for HDFSRmdSchemaSource:
774-
// RMD schemas: <job_temp_dir>/rmd_schemas
775-
// Value schemas: <job_temp_dir>/value_schemas
776-
Path rmdSchemaDir = new Path(jobTmpDir, "rmd_schemas");
777-
HadoopUtils.createDirectoryWithPermission(rmdSchemaDir, PERMISSION_700);
778-
Path valueSchemaDir = new Path(jobTmpDir, "value_schemas");
779-
HadoopUtils.createDirectoryWithPermission(valueSchemaDir, PERMISSION_700);
780-
try (HDFSSchemaSource schemaSource =
781-
new HDFSSchemaSource(valueSchemaDir, rmdSchemaDir, pushJobSetting.storeName)) {
782-
schemaSource.saveSchemasOnDisk(controllerClient);
783-
pushJobSetting.rmdSchemaDir = schemaSource.getRmdSchemaPath();
784-
pushJobSetting.valueSchemaDir = schemaSource.getValueSchemaPath();
785-
}
786-
}
787772
}
788773
// Create new store version, topic and fetch Kafka url from backend
789774
createNewStoreVersion(
@@ -825,6 +810,9 @@ public void run() {
825810
} else {
826811
// Populate any view configs to job properties
827812
configureJobPropertiesWithMaterializedViewConfigs();
813+
if (pushJobSetting.repushTTLEnabled || pushJobSetting.materializedViewConfigFlatMap != null) {
814+
buildHDFSSchemaDir();
815+
}
828816
if (pushJobSetting.sendControlMessagesDirectly) {
829817
getVeniceWriter(pushJobSetting).broadcastStartOfPush(
830818
SORTED,
@@ -930,6 +918,21 @@ public void run() {
930918
}
931919
}
932920

921+
private void buildHDFSSchemaDir() throws IOException {
922+
// Build the full path for HDFSRmdSchemaSource:
923+
// RMD schemas: <job_temp_dir>/rmd_schemas
924+
// Value schemas: <job_temp_dir>/value_schemas
925+
Path rmdSchemaDir = new Path(jobTmpDir, "rmd_schemas");
926+
HadoopUtils.createDirectoryWithPermission(rmdSchemaDir, PERMISSION_700);
927+
Path valueSchemaDir = new Path(jobTmpDir, "value_schemas");
928+
HadoopUtils.createDirectoryWithPermission(valueSchemaDir, PERMISSION_700);
929+
try (HDFSSchemaSource schemaSource = new HDFSSchemaSource(valueSchemaDir, rmdSchemaDir, pushJobSetting.storeName)) {
930+
schemaSource.saveSchemasOnDisk(controllerClient);
931+
pushJobSetting.rmdSchemaDir = schemaSource.getRmdSchemaPath();
932+
pushJobSetting.valueSchemaDir = schemaSource.getValueSchemaPath();
933+
}
934+
}
935+
933936
/**
934937
* Get the set of regions that haven't been pushed yet after targeted region push.
935938
* @return a set of regions that haven't been pushed yet.

Diff for: clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP;
4747
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_DIR;
4848
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_ID_PROP;
49-
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_STRING_PROP;
5049
import static com.linkedin.venice.vpj.VenicePushJobConstants.VSON_PUSH;
5150
import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_COMPRESSION_LEVEL;
5251
import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_DICTIONARY_CREATION_SUCCESS;
@@ -273,7 +272,6 @@ protected void setupInputFormatConf(JobConf jobConf, PushJobSetting pushJobSetti
273272
if (pushJobSetting.isAvro) {
274273
jobConf.set(SCHEMA_STRING_PROP, pushJobSetting.inputDataSchemaString);
275274
jobConf.set(AvroJob.INPUT_SCHEMA, pushJobSetting.inputDataSchemaString);
276-
jobConf.set(VALUE_SCHEMA_STRING_PROP, pushJobSetting.valueSchemaString);
277275
if (pushJobSetting.generatePartialUpdateRecordFromInput) {
278276
jobConf.setBoolean(GENERATE_PARTIAL_UPDATE_RECORD_FROM_INPUT, true);
279277
jobConf.set(UPDATE_SCHEMA_STRING_PROP, pushJobSetting.valueSchemaString);
@@ -310,6 +308,8 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) {
310308
}
311309
if (pushJobSetting.materializedViewConfigFlatMap != null) {
312310
jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap);
311+
jobConf.set(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir);
312+
jobConf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir);
313313
}
314314
jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false));
315315
int partitionCount = pushJobSetting.partitionCount;

0 commit comments

Comments
 (0)