@@ -3202,9 +3202,14 @@ private PubSubMessageProcessedResult processMessage(
3202
3202
KafkaMessageEnvelope kafkaValue = consumerRecord .getValue ();
3203
3203
byte [] keyBytes = kafkaKey .getKey ();
3204
3204
MessageType msgType = MessageType .valueOf (kafkaValue .messageType );
3205
+ Lazy <GenericRecord > valueProvider ;
3205
3206
switch (msgType ) {
3206
3207
case PUT :
3207
3208
Put put = (Put ) kafkaValue .payloadUnion ;
3209
+ // Value provider should use un-compressed data.
3210
+ final ByteBuffer rawPutValue = put .putValue ;
3211
+ valueProvider =
3212
+ Lazy .of (() -> storeDeserializerCache .getDeserializer (put .schemaId , put .schemaId ).deserialize (rawPutValue ));
3208
3213
put .putValue = maybeCompressData (
3209
3214
consumerRecord .getTopicPartition ().getPartitionNumber (),
3210
3215
put .putValue ,
@@ -3227,7 +3232,7 @@ private PubSubMessageProcessedResult processMessage(
3227
3232
null );
3228
3233
}
3229
3234
3230
- return new PubSubMessageProcessedResult (new WriteComputeResultWrapper (put , null , false ));
3235
+ return new PubSubMessageProcessedResult (new WriteComputeResultWrapper (put , null , false , valueProvider ));
3231
3236
3232
3237
case UPDATE :
3233
3238
/**
@@ -3272,20 +3277,21 @@ private PubSubMessageProcessedResult processMessage(
3272
3277
3273
3278
final byte [] updatedValueBytes ;
3274
3279
final ChunkedValueManifest oldValueManifest = valueManifestContainer .getManifest ();
3275
-
3280
+ GenericRecord updatedValue ;
3276
3281
try {
3277
3282
long writeComputeStartTimeInNS = System .nanoTime ();
3283
+
3278
3284
// Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call
3279
3285
// 'maybeCompress'.
3286
+ updatedValue = storeWriteComputeHandler .applyWriteCompute (
3287
+ currValue ,
3288
+ update .schemaId ,
3289
+ readerValueSchemaId ,
3290
+ update .updateValue ,
3291
+ update .updateSchemaId ,
3292
+ readerUpdateProtocolVersion );
3280
3293
updatedValueBytes = compressor .get ()
3281
- .compress (
3282
- storeWriteComputeHandler .applyWriteCompute (
3283
- currValue ,
3284
- update .schemaId ,
3285
- readerValueSchemaId ,
3286
- update .updateValue ,
3287
- update .updateSchemaId ,
3288
- readerUpdateProtocolVersion ));
3294
+ .compress (storeWriteComputeHandler .serializeUpdatedValue (updatedValue , readerValueSchemaId ));
3289
3295
hostLevelIngestionStats
3290
3296
.recordWriteComputeUpdateLatency (LatencyUtils .getElapsedTimeFromNSToMS (writeComputeStartTimeInNS ));
3291
3297
} catch (Exception e ) {
@@ -3322,7 +3328,8 @@ private PubSubMessageProcessedResult processMessage(
3322
3328
Put updatedPut = new Put ();
3323
3329
updatedPut .putValue = updateValueWithSchemaId ;
3324
3330
updatedPut .schemaId = readerValueSchemaId ;
3325
- return new PubSubMessageProcessedResult (new WriteComputeResultWrapper (updatedPut , oldValueManifest , false ));
3331
+ return new PubSubMessageProcessedResult (
3332
+ new WriteComputeResultWrapper (updatedPut , oldValueManifest , false , Lazy .of (() -> updatedValue )));
3326
3333
}
3327
3334
case DELETE :
3328
3335
/**
@@ -3331,7 +3338,19 @@ private PubSubMessageProcessedResult processMessage(
3331
3338
if (isWriteComputationEnabled && partitionConsumptionState .isEndOfPushReceived ()) {
3332
3339
partitionConsumptionState .setTransientRecord (kafkaClusterId , consumerRecord .getOffset (), keyBytes , -1 , null );
3333
3340
}
3334
- return new PubSubMessageProcessedResult (new WriteComputeResultWrapper (null , null , false ));
3341
+ // Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to generate
3342
+ // deletes for materialized view topic partition(s).
3343
+ Lazy <GenericRecord > oldValueProvider = Lazy .of (() -> {
3344
+ ChunkedValueManifestContainer oldValueManifestContainer = new ChunkedValueManifestContainer ();
3345
+ int oldValueReaderSchemaId = schemaRepository .getSupersetSchema (storeName ).getId ();
3346
+ return readStoredValueRecord (
3347
+ partitionConsumptionState ,
3348
+ keyBytes ,
3349
+ oldValueReaderSchemaId ,
3350
+ consumerRecord .getTopicPartition (),
3351
+ oldValueManifestContainer );
3352
+ });
3353
+ return new PubSubMessageProcessedResult (new WriteComputeResultWrapper (null , null , false , oldValueProvider ));
3335
3354
3336
3355
default :
3337
3356
throw new VeniceMessageException (
@@ -3383,7 +3402,7 @@ protected void processMessageAndMaybeProduceToKafka(
3383
3402
Put newPut = writeComputeResultWrapper .getNewPut ();
3384
3403
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
3385
3404
boolean isChunkedKey = isChunked () && !partitionConsumptionState .isEndOfPushReceived ();
3386
- Lazy <GenericRecord > newValueProvider = getNewValueProvider ( newPut . putValue , newPut . schemaId );
3405
+ Lazy <GenericRecord > newValueProvider = writeComputeResultWrapper . getValueProvider ( );
3387
3406
queueUpVersionTopicWritesWithViewWriters (
3388
3407
partitionConsumptionState ,
3389
3408
(viewWriter ) -> viewWriter
@@ -3951,7 +3970,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
3951
3970
3952
3971
protected void queueUpVersionTopicWritesWithViewWriters (
3953
3972
PartitionConsumptionState partitionConsumptionState ,
3954
- Function <VeniceViewWriter , CompletableFuture <PubSubProduceResult >> viewWriterRecordProcessor ,
3973
+ Function <VeniceViewWriter , CompletableFuture <Void >> viewWriterRecordProcessor ,
3955
3974
Runnable versionTopicWrite ) {
3956
3975
long preprocessingTime = System .currentTimeMillis ();
3957
3976
CompletableFuture <Void > currentVersionTopicWrite = new CompletableFuture <>();
@@ -4045,10 +4064,4 @@ <T> T databaseLookupWithConcurrencyLimit(Supplier<T> supplier) {
4045
4064
return supplier .get ();
4046
4065
}
4047
4066
}
4048
- protected Lazy <GenericRecord > getNewValueProvider (ByteBuffer newValue , int schemaId ) {
4049
- if (newValue == null ) {
4050
- return Lazy .of (() -> null );
4051
- }
4052
- return Lazy .of (() -> storeDeserializerCache .getDeserializer (schemaId , schemaId ).deserialize (newValue ));
4053
- }
4054
4067
}
0 commit comments