6
6
import com .linkedin .davinci .repository .ThinClientMetaStoreBasedRepository ;
7
7
import com .linkedin .davinci .storage .chunking .AbstractAvroChunkingAdapter ;
8
8
import com .linkedin .davinci .storage .chunking .GenericChunkingAdapter ;
9
- import com .linkedin .davinci .storage .chunking .RawBytesChunkingAdapter ;
10
9
import com .linkedin .davinci .storage .chunking .SpecificRecordChunkingAdapter ;
11
- import com .linkedin .davinci .store .memory .InMemoryStorageEngine ;
12
- import com .linkedin .davinci .store .record .ValueRecord ;
10
+ import com .linkedin .davinci .utils .ChunkAssembler ;
13
11
import com .linkedin .venice .client .change .capture .protocol .RecordChangeEvent ;
14
12
import com .linkedin .venice .compression .CompressionStrategy ;
15
13
import com .linkedin .venice .compression .CompressorFactory ;
27
25
import com .linkedin .venice .kafka .protocol .enums .ControlMessageType ;
28
26
import com .linkedin .venice .kafka .protocol .enums .MessageType ;
29
27
import com .linkedin .venice .message .KafkaKey ;
30
- import com .linkedin .venice .meta .ReadOnlySchemaRepository ;
31
28
import com .linkedin .venice .meta .Store ;
32
29
import com .linkedin .venice .meta .StoreInfo ;
33
30
import com .linkedin .venice .meta .Version ;
42
39
import com .linkedin .venice .schema .SchemaReader ;
43
40
import com .linkedin .venice .schema .rmd .RmdUtils ;
44
41
import com .linkedin .venice .serialization .AvroStoreDeserializerCache ;
45
- import com .linkedin .venice .serialization .RawBytesStoreDeserializerCache ;
46
42
import com .linkedin .venice .serialization .StoreDeserializerCache ;
47
43
import com .linkedin .venice .serialization .avro .AvroProtocolDefinition ;
48
44
import com .linkedin .venice .serialization .avro .AvroSpecificStoreDeserializerCache ;
@@ -84,7 +80,6 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
84
80
85
81
protected final HashMap <Integer , VeniceCompressor > compressorMap = new HashMap <>();
86
82
protected StoreDeserializerCache <V > storeDeserializerCache ;
87
- private final AvroStoreDeserializerCache <RecordChangeEvent > recordChangeEventDeserializerCache ;
88
83
89
84
protected ThinClientMetaStoreBasedRepository storeRepository ;
90
85
@@ -107,21 +102,14 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
107
102
108
103
protected final String storeName ;
109
104
110
- // This storage engine serves as a buffer for records which are chunked and have to be buffered before they can
111
- // be returned to the client. We leverage the storageEngine interface here in order to take better advantage
112
- // of the chunking and decompressing adapters that we've already built (which today are built around this interface)
113
- // as chunked records are assembled we will eagerly evict all keys from the storage engine in order to keep the memory
114
- // footprint as small as we can. We could use the object cache storage engine here in order to get LRU behavior
115
- // but then that runs the risk of a parallel subscription having record chunks getting evicted before we have a chance
116
- // to assemble them. So we rely on the simpler and concrete implementation as opposed to the abstraction in order
117
- // to control and guarantee the behavior we're expecting.
118
- protected final InMemoryStorageEngine inMemoryStorageEngine ;
119
105
protected final PubSubConsumerAdapter pubSubConsumer ;
120
106
protected final Map <Integer , List <Long >> currentVersionHighWatermarks = new HashMap <>();
121
107
protected final int [] currentValuePayloadSize ;
122
108
123
109
protected final ChangelogClientConfig changelogClientConfig ;
124
110
111
+ protected final ChunkAssembler chunkAssembler ;
112
+
125
113
public VeniceChangelogConsumerImpl (
126
114
ChangelogClientConfig changelogClientConfig ,
127
115
PubSubConsumerAdapter pubSubConsumer ) {
@@ -142,19 +130,13 @@ public VeniceChangelogConsumerImpl(
142
130
this .schemaReader = changelogClientConfig .getSchemaReader ();
143
131
Schema keySchema = schemaReader .getKeySchema ();
144
132
this .keyDeserializer = FastSerializerDeserializerFactory .getFastAvroGenericDeserializer (keySchema , keySchema );
145
- // The in memory storage engine only relies on the name of store and nothing else. We use an unversioned store name
146
- // here in order to reduce confusion (as this storage engine can be used across version topics).
147
- this .inMemoryStorageEngine = new InMemoryStorageEngine (storeName );
148
- // disable noisy logs
149
- this .inMemoryStorageEngine .suppressLogs (true );
133
+ this .chunkAssembler = new ChunkAssembler (storeName );
134
+
150
135
this .storeRepository = new ThinClientMetaStoreBasedRepository (
151
136
changelogClientConfig .getInnerClientConfig (),
152
137
VeniceProperties .empty (),
153
138
null );
154
- this .recordChangeEventDeserializerCache = new AvroStoreDeserializerCache <>(
155
- new RecordChangeEventReadOnlySchemaRepository (this .storeRepository ),
156
- storeName ,
157
- true );
139
+
158
140
if (changelogClientConfig .getInnerClientConfig ().isSpecificClient ()) {
159
141
// If a value class is supplied, we'll use a Specific record adapter
160
142
Class valueClass = changelogClientConfig .getInnerClientConfig ().getSpecificValueClass ();
@@ -553,109 +535,20 @@ protected boolean handleControlMessage(
553
535
return false ;
554
536
}
555
537
556
- protected <T > T bufferAndAssembleRecordChangeEvent (
557
- PubSubTopicPartition pubSubTopicPartition ,
558
- int schemaId ,
559
- byte [] keyBytes ,
560
- ByteBuffer valueBytes ,
561
- long recordOffset ,
562
- AbstractAvroChunkingAdapter <T > chunkingAdapter ,
563
- Lazy <RecordDeserializer <T >> recordDeserializer ,
564
- StoreDeserializerCache <T > deserializerCache ,
565
- int readerSchemaId ) {
566
- T assembledRecord = null ;
567
- // Select compressor. We'll only construct compressors for version topics so this will return null for
568
- // events from change capture. This is fine as today they are not compressed.
569
- VeniceCompressor compressor ;
570
- if (pubSubTopicPartition .getPubSubTopic ().isVersionTopic ()) {
571
- compressor = compressorMap .get (pubSubTopicPartition .getPartitionNumber ());
572
- } else {
573
- compressor = NO_OP_COMPRESSOR ;
574
- }
575
-
576
- if (!inMemoryStorageEngine .containsPartition (pubSubTopicPartition .getPartitionNumber ())) {
577
- inMemoryStorageEngine .addStoragePartition (pubSubTopicPartition .getPartitionNumber ());
578
- }
579
- // If this is a record chunk, store the chunk and return null for processing this record
580
- if (schemaId == AvroProtocolDefinition .CHUNK .getCurrentProtocolVersion ()) {
581
- inMemoryStorageEngine .put (
582
- pubSubTopicPartition .getPartitionNumber (),
583
- keyBytes ,
584
- ValueRecord .create (schemaId , valueBytes .array ()).serialize ());
585
- return null ;
586
- } else if (schemaId == AvroProtocolDefinition .CHUNKED_VALUE_MANIFEST .getCurrentProtocolVersion ()) {
587
- // This is the last value. Store it, and now read it from the in memory store as a fully assembled value
588
- inMemoryStorageEngine .put (
589
- pubSubTopicPartition .getPartitionNumber (),
590
- keyBytes ,
591
- ValueRecord .create (schemaId , valueBytes .array ()).serialize ());
592
- try {
593
- assembledRecord = processRecordBytes (
594
- recordDeserializer .get (),
595
- compressor ,
596
- keyBytes ,
597
- RawBytesChunkingAdapter .INSTANCE .get (
598
- inMemoryStorageEngine ,
599
- pubSubTopicPartition .getPartitionNumber (),
600
- ByteBuffer .wrap (keyBytes ),
601
- false ,
602
- null ,
603
- null ,
604
- null ,
605
- readerSchemaId ,
606
- RawBytesStoreDeserializerCache .getInstance (),
607
- compressor ,
608
- null ),
609
- pubSubTopicPartition ,
610
- readerSchemaId ,
611
- recordOffset );
612
- } catch (Exception ex ) {
613
- // We might get an exception if we haven't persisted all the chunks for a given key. This
614
- // can actually happen if the client seeks to the middle of a chunked record either by
615
- // only tailing the records or through direct offset management. This is ok, we just won't
616
- // return this record since this is a course grained approach we can drop it.
617
- LOGGER .warn (
618
- "Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}" ,
619
- recordOffset ,
620
- pubSubTopicPartition .getPubSubTopic ().getName ());
621
- }
622
- } else {
623
- // this is a fully specified record, no need to buffer and assemble it, just decompress and deserialize it
624
- try {
625
- assembledRecord = processRecordBytes (
626
- recordDeserializer .get (),
627
- compressor ,
628
- keyBytes ,
629
- valueBytes ,
630
- pubSubTopicPartition ,
631
- readerSchemaId ,
632
- recordOffset );
633
- } catch (Exception e ) {
634
- throw new RuntimeException (e );
635
- }
636
- }
637
- // We only buffer one record at a time for a given partition. If we've made it this far
638
- // we either just finished assembling a large record, or, didn't specify anything. So we'll clear
639
- // the cache. Kafka might give duplicate delivery, but it won't give out of order delivery, so
640
- // this is safe to do in all such contexts.
641
- inMemoryStorageEngine .dropPartition (pubSubTopicPartition .getPartitionNumber ());
642
- return assembledRecord ;
643
- }
644
-
645
538
// This function exists for wrappers of this class to be able to do any kind of preprocessing on the raw bytes of the
646
539
// data consumed
647
540
// in the change stream so as to avoid having to do any duplicate deserialization/serialization. Wrappers which depend
648
541
// on solely
649
542
// on the data post deserialization
650
543
protected <T > T processRecordBytes (
651
- RecordDeserializer < T > deserializer ,
652
- VeniceCompressor compressor ,
544
+ ByteBuffer decompressedBytes ,
545
+ T deserializedValue ,
653
546
byte [] key ,
654
547
ByteBuffer value ,
655
548
PubSubTopicPartition partition ,
656
549
int valueSchemaId ,
657
550
long recordOffset ) throws IOException {
658
- return deserializer . deserialize ( compressor . decompress ( value )) ;
551
+ return deserializedValue ;
659
552
}
660
553
661
554
protected Optional <PubSubMessage <K , ChangeEvent <V >, VeniceChangeCoordinate >> convertPubSubMessageToPubSubChangeEventMessage (
@@ -690,39 +583,54 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
690
583
Put put = (Put ) message .getValue ().payloadUnion ;
691
584
// Select appropriate deserializers
692
585
Lazy deserializerProvider ;
693
- AbstractAvroChunkingAdapter chunkingAdapter ;
694
586
int readerSchemaId ;
695
- ReadOnlySchemaRepository schemaRepo ;
696
- StoreDeserializerCache deserializerCache ;
697
587
if (pubSubTopicPartition .getPubSubTopic ().isVersionTopic ()) {
698
588
Schema valueSchema = schemaReader .getValueSchema (put .schemaId );
699
589
deserializerProvider =
700
590
Lazy .of (() -> FastSerializerDeserializerFactory .getFastAvroGenericDeserializer (valueSchema , valueSchema ));
701
- chunkingAdapter = userEventChunkingAdapter ;
702
591
readerSchemaId = AvroProtocolDefinition .RECORD_CHANGE_EVENT .getCurrentProtocolVersion ();
703
- deserializerCache = this .storeDeserializerCache ;
704
592
} else {
705
593
deserializerProvider = Lazy .of (() -> recordChangeDeserializer );
706
- chunkingAdapter = recordChangeEventChunkingAdapter ;
707
594
readerSchemaId = this .schemaReader .getLatestValueSchemaId ();
708
- deserializerCache = recordChangeEventDeserializerCache ;
709
595
}
710
- assembledObject = bufferAndAssembleRecordChangeEvent (
596
+
597
+ // Select compressor. We'll only construct compressors for version topics so this will return null for
598
+ // events from change capture. This is fine as today they are not compressed.
599
+ VeniceCompressor compressor ;
600
+ if (pubSubTopicPartition .getPubSubTopic ().isVersionTopic ()) {
601
+ compressor = compressorMap .get (pubSubTopicPartition .getPartitionNumber ());
602
+ } else {
603
+ compressor = NO_OP_COMPRESSOR ;
604
+ }
605
+
606
+ assembledObject = chunkAssembler .bufferAndAssembleRecord (
711
607
pubSubTopicPartition ,
712
608
put .getSchemaId (),
713
609
keyBytes ,
714
610
put .getPutValue (),
715
611
message .getOffset (),
716
- chunkingAdapter ,
717
612
deserializerProvider ,
718
- deserializerCache ,
719
- readerSchemaId );
613
+ readerSchemaId ,
614
+ compressor );
720
615
if (assembledObject == null ) {
721
- // bufferAndAssembleRecordChangeEvent may have only buffered records and not returned anything yet because
616
+ // bufferAndAssembleRecord may have only buffered records and not returned anything yet because
722
617
// it's waiting for more input. In this case, just return an empty optional for now.
723
618
return Optional .empty ();
724
619
}
725
620
621
+ try {
622
+ assembledObject = processRecordBytes (
623
+ compressor .decompress (put .getPutValue ()),
624
+ assembledObject ,
625
+ keyBytes ,
626
+ put .getPutValue (),
627
+ pubSubTopicPartition ,
628
+ readerSchemaId ,
629
+ message .getOffset ());
630
+ } catch (Exception ex ) {
631
+ throw new VeniceException (ex );
632
+ }
633
+
726
634
if (assembledObject instanceof RecordChangeEvent ) {
727
635
recordChangeEvent = (RecordChangeEvent ) assembledObject ;
728
636
replicationCheckpoint = recordChangeEvent .replicationCheckpointVector ;
@@ -814,7 +722,7 @@ protected boolean handleVersionSwapControlMessage(
814
722
.put (pubSubTopicPartition .getPartitionNumber (), versionSwap .getLocalHighWatermarks ());
815
723
}
816
724
switchToNewTopic (newServingVersionTopic , topicSuffix , pubSubTopicPartition .getPartitionNumber ());
817
- inMemoryStorageEngine . drop ();
725
+ chunkAssembler . clearInMemoryDB ();
818
726
return true ;
819
727
}
820
728
return false ;
0 commit comments