Skip to content

Commit d17e50b

Browse files
committed
Address comments and add unit test
1 parent 8022d6c commit d17e50b

File tree

10 files changed

+341
-21
lines changed

10 files changed

+341
-21
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ protected void processMessageAndMaybeProduceToKafka(
671671
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
672672
int oldValueSchemaId =
673673
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
674-
Lazy<GenericRecord> newValueProvider = mergeConflictResultWrapper.getNewValueProvider();
674+
Lazy<GenericRecord> valueProvider = mergeConflictResultWrapper.getValueProvider();
675675
queueUpVersionTopicWritesWithViewWriters(
676676
partitionConsumptionState,
677677
(viewWriter) -> viewWriter.processRecord(
@@ -681,7 +681,7 @@ protected void processMessageAndMaybeProduceToKafka(
681681
mergeConflictResult.getValueSchemaId(),
682682
oldValueSchemaId,
683683
mergeConflictResult.getRmdRecord(),
684-
newValueProvider),
684+
valueProvider),
685685
produceToVersionTopic);
686686
} else {
687687
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

+27-14
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
3232
import com.linkedin.davinci.store.record.ValueRecord;
3333
import com.linkedin.davinci.store.view.ChangeCaptureViewWriter;
34+
import com.linkedin.davinci.store.view.MaterializedViewWriter;
3435
import com.linkedin.davinci.store.view.VeniceViewWriter;
3536
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
3637
import com.linkedin.davinci.validation.PartitionTracker;
@@ -203,6 +204,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
203204

204205
protected final Map<String, VeniceViewWriter> viewWriters;
205206
protected final boolean hasChangeCaptureView;
207+
protected final boolean hasComplexVenicePartitionerMaterializedView;
206208

207209
protected final AvroStoreDeserializerCache<GenericRecord> storeDeserializerCache;
208210

@@ -337,16 +339,22 @@ public LeaderFollowerStoreIngestionTask(
337339
version.getNumber(),
338340
schemaRepository.getKeySchema(store.getName()).getSchema());
339341
boolean tmpValueForHasChangeCaptureViewWriter = false;
342+
boolean tmpValueForHasComplexVenicePartitioner = false;
340343
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) {
341344
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) {
342345
tmpValueForHasChangeCaptureViewWriter = true;
343-
break;
346+
} else if (viewWriter.getValue() instanceof MaterializedViewWriter) {
347+
if (((MaterializedViewWriter) viewWriter.getValue()).isComplexVenicePartitioner()) {
348+
tmpValueForHasComplexVenicePartitioner = true;
349+
}
344350
}
345351
}
346352
hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter;
353+
hasComplexVenicePartitionerMaterializedView = tmpValueForHasComplexVenicePartitioner;
347354
} else {
348355
viewWriters = Collections.emptyMap();
349356
hasChangeCaptureView = false;
357+
hasComplexVenicePartitionerMaterializedView = false;
350358
}
351359
this.storeDeserializerCache = new AvroStoreDeserializerCache(
352360
builder.getSchemaRepo(),
@@ -3297,7 +3305,7 @@ private PubSubMessageProcessedResult processMessage(
32973305
update.updateValue,
32983306
update.updateSchemaId,
32993307
readerUpdateProtocolVersion);
3300-
updatedValueBytes = writeComputeResult.getUpdatedValueBytes();
3308+
updatedValueBytes = compressor.get().compress(writeComputeResult.getUpdatedValueBytes());
33013309
hostLevelIngestionStats
33023310
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
33033311
} catch (Exception e) {
@@ -3342,24 +3350,29 @@ private PubSubMessageProcessedResult processMessage(
33423350
Lazy.of(writeComputeResult::getUpdatedValue)));
33433351
}
33443352
case DELETE:
3353+
Lazy<GenericRecord> oldValueProvider;
3354+
if (hasComplexVenicePartitionerMaterializedView) {
3355+
// Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to
3356+
// generate deletes for materialized view topic partition(s). We need to do a non-lazy lookup before, so we
3357+
// have a chance of getting the old value before the transient record cache is updated to null as part of
3358+
// processing the DELETE.
3359+
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId();
3360+
GenericRecord oldValue = readStoredValueRecord(
3361+
partitionConsumptionState,
3362+
keyBytes,
3363+
oldValueReaderSchemaId,
3364+
consumerRecord.getTopicPartition(),
3365+
new ChunkedValueManifestContainer());
3366+
oldValueProvider = Lazy.of(() -> oldValue);
3367+
} else {
3368+
oldValueProvider = Lazy.of(() -> null);
3369+
}
33453370
/**
33463371
* For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above.
33473372
*/
33483373
if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
33493374
partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null);
33503375
}
3351-
// Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to generate
3352-
// deletes for materialized view topic partition(s).
3353-
Lazy<GenericRecord> oldValueProvider = Lazy.of(() -> {
3354-
ChunkedValueManifestContainer oldValueManifestContainer = new ChunkedValueManifestContainer();
3355-
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId();
3356-
return readStoredValueRecord(
3357-
partitionConsumptionState,
3358-
keyBytes,
3359-
oldValueReaderSchemaId,
3360-
consumerRecord.getTopicPartition(),
3361-
oldValueManifestContainer);
3362-
});
33633376
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider));
33643377

33653378
default:

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public ByteBuffer getUpdatedRmdBytes() {
103103
* 2. returns the old value for DELETE (null for non-existent key).
104104
* 3. returns null if the value is not available.
105105
*/
106-
public Lazy<GenericRecord> getNewValueProvider() {
106+
public Lazy<GenericRecord> getValueProvider() {
107107
return valueProvider;
108108
}
109109
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
77
import com.linkedin.venice.message.KafkaKey;
88
import com.linkedin.venice.meta.Version;
9+
import com.linkedin.venice.partitioner.ComplexVenicePartitioner;
910
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
1011
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
1112
import com.linkedin.venice.utils.ByteUtils;
@@ -116,4 +117,8 @@ public String getWriterClassName() {
116117
VeniceWriterOptions buildWriterOptions() {
117118
return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build();
118119
}
120+
121+
public boolean isComplexVenicePartitioner() {
122+
return internalView.getViewPartitioner() instanceof ComplexVenicePartitioner;
123+
}
119124
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.linkedin.davinci.kafka.consumer;
2+
3+
import static org.mockito.Mockito.doReturn;
4+
import static org.mockito.Mockito.mock;
5+
6+
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
7+
import com.linkedin.davinci.replication.merge.MergeConflictResult;
8+
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
9+
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
10+
import com.linkedin.venice.serializer.RecordDeserializer;
11+
import com.linkedin.venice.utils.lazy.Lazy;
12+
import java.nio.ByteBuffer;
13+
import java.util.Optional;
14+
import java.util.function.Function;
15+
import org.apache.avro.generic.GenericRecord;
16+
import org.testng.Assert;
17+
import org.testng.annotations.Test;
18+
19+
20+
public class MergeConflictResultWrapperTest {
21+
@Test
22+
public void testValueProvider() {
23+
MergeConflictResult mergeConflictResult = mock(MergeConflictResult.class);
24+
RecordDeserializer<GenericRecord> deserializer = mock(RecordDeserializer.class);
25+
GenericRecord mockDeleteOldRecord = mock(GenericRecord.class);
26+
GenericRecord mockNewValueRecord = mock(GenericRecord.class);
27+
ByteBuffer mockDeleteOldValueBytes = mock(ByteBuffer.class);
28+
ByteBuffer mockNewValueBytes = mock(ByteBuffer.class);
29+
doReturn(mockDeleteOldRecord).when(deserializer).deserialize(mockDeleteOldValueBytes);
30+
doReturn(mockNewValueRecord).when(deserializer).deserialize(mockNewValueBytes);
31+
RmdWithValueSchemaId rmdWithValueSchemaId = mock(RmdWithValueSchemaId.class);
32+
ChunkedValueManifestContainer chunkedValueManifestContainer = mock(ChunkedValueManifestContainer.class);
33+
ByteBuffer mockUpdatedRmdBytes = mock(ByteBuffer.class);
34+
Function<Integer, RecordDeserializer<GenericRecord>> deserProvider = (schemaId) -> deserializer;
35+
// DELETE
36+
MergeConflictResultWrapper nonExistingKeyDeleteWrapper = new MergeConflictResultWrapper(
37+
mergeConflictResult,
38+
Lazy.of(() -> null),
39+
Lazy.of(() -> null),
40+
rmdWithValueSchemaId,
41+
chunkedValueManifestContainer,
42+
null,
43+
mockUpdatedRmdBytes,
44+
deserProvider);
45+
Assert.assertNull(nonExistingKeyDeleteWrapper.getValueProvider().get());
46+
ByteBufferValueRecord<ByteBuffer> mockDeleteByteBufferValueRecord = mock(ByteBufferValueRecord.class);
47+
doReturn(mockDeleteOldValueBytes).when(mockDeleteByteBufferValueRecord).value();
48+
MergeConflictResultWrapper existingKeyDeleteWrapper = new MergeConflictResultWrapper(
49+
mergeConflictResult,
50+
Lazy.of(() -> mockDeleteByteBufferValueRecord),
51+
Lazy.of(() -> mockDeleteOldValueBytes),
52+
rmdWithValueSchemaId,
53+
chunkedValueManifestContainer,
54+
null,
55+
mockUpdatedRmdBytes,
56+
deserProvider);
57+
Assert.assertEquals(existingKeyDeleteWrapper.getValueProvider().get(), mockDeleteOldRecord);
58+
// PUT/UPDATE
59+
ByteBuffer mockUpdatedValueBytes = mock(ByteBuffer.class);
60+
doReturn(Optional.of(mockNewValueRecord)).when(mergeConflictResult).getDeserializedValue();
61+
MergeConflictResultWrapper updateCachedWrapper = new MergeConflictResultWrapper(
62+
mergeConflictResult,
63+
Lazy.of(() -> null),
64+
Lazy.of(() -> null),
65+
rmdWithValueSchemaId,
66+
chunkedValueManifestContainer,
67+
mockUpdatedValueBytes,
68+
mockUpdatedRmdBytes,
69+
deserProvider);
70+
Assert.assertEquals(updateCachedWrapper.getValueProvider().get(), mockNewValueRecord);
71+
doReturn(Optional.empty()).when(mergeConflictResult).getDeserializedValue();
72+
doReturn(mockNewValueBytes).when(mergeConflictResult).getNewValue();
73+
MergeConflictResultWrapper updateNotCachedWrapper = new MergeConflictResultWrapper(
74+
mergeConflictResult,
75+
Lazy.of(() -> null),
76+
Lazy.of(() -> null),
77+
rmdWithValueSchemaId,
78+
chunkedValueManifestContainer,
79+
mockUpdatedValueBytes,
80+
mockUpdatedRmdBytes,
81+
deserProvider);
82+
Assert.assertEquals(updateNotCachedWrapper.getValueProvider().get(), mockNewValueRecord);
83+
}
84+
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.linkedin.davinci.config.VeniceConfigLoader;
1919
import com.linkedin.davinci.config.VeniceServerConfig;
2020
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
21+
import com.linkedin.davinci.utils.UnitTestComplexPartitioner;
2122
import com.linkedin.venice.kafka.protocol.ControlMessage;
2223
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
2324
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
@@ -175,6 +176,31 @@ public void testViewWriterCanForwardChunkedKeysCorrectly() {
175176
}
176177
}
177178

179+
@Test
180+
public void testIsComplexVenicePartitioner() {
181+
String storeName = "testStore";
182+
String viewName = "simplePartitionerView";
183+
Version version = mock(Version.class);
184+
doReturn(true).when(version).isChunkingEnabled();
185+
doReturn(true).when(version).isRmdChunkingEnabled();
186+
getMockStore(storeName, 1, version);
187+
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
188+
viewParamsBuilder.setPartitionCount(6);
189+
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
190+
VeniceConfigLoader props = getMockProps();
191+
MaterializedViewWriter materializedViewWriter =
192+
new MaterializedViewWriter(props, version, SCHEMA, viewParamsBuilder.build());
193+
Assert.assertFalse(materializedViewWriter.isComplexVenicePartitioner());
194+
String complexViewName = "complexPartitionerView";
195+
MaterializedViewParameters.Builder complexViewParamsBuilder =
196+
new MaterializedViewParameters.Builder(complexViewName);
197+
complexViewParamsBuilder.setPartitionCount(6);
198+
complexViewParamsBuilder.setPartitioner(UnitTestComplexPartitioner.class.getCanonicalName());
199+
MaterializedViewWriter complexMaterializedViewWriter =
200+
new MaterializedViewWriter(props, version, SCHEMA, complexViewParamsBuilder.build());
201+
Assert.assertTrue(complexMaterializedViewWriter.isComplexVenicePartitioner());
202+
}
203+
178204
private VeniceConfigLoader getMockProps() {
179205
VeniceConfigLoader props = mock(VeniceConfigLoader.class);
180206
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.linkedin.davinci.utils;
2+
3+
import com.linkedin.venice.partitioner.ComplexVenicePartitioner;
4+
import com.linkedin.venice.utils.PartitionUtils;
5+
import com.linkedin.venice.utils.ReflectUtils;
6+
import com.linkedin.venice.utils.VeniceProperties;
7+
import java.nio.ByteBuffer;
8+
import org.apache.avro.Schema;
9+
import org.apache.avro.generic.GenericRecord;
10+
11+
12+
/**
13+
* Dummy complex venice partitioner used for unit tests. Cannot use private static class because {@link PartitionUtils}
14+
* uses {@link ReflectUtils#loadClass(String)}.
15+
*/
16+
public class UnitTestComplexPartitioner extends ComplexVenicePartitioner {
17+
public UnitTestComplexPartitioner() {
18+
super();
19+
}
20+
21+
public UnitTestComplexPartitioner(VeniceProperties props) {
22+
super(props);
23+
}
24+
25+
public UnitTestComplexPartitioner(VeniceProperties props, Schema schema) {
26+
super(props, schema);
27+
}
28+
29+
@Override
30+
public int[] getPartitionId(byte[] keyBytes, GenericRecord value, int numPartitions) {
31+
return new int[0];
32+
}
33+
34+
@Override
35+
public int getPartitionId(byte[] keyBytes, int numPartitions) {
36+
return 0;
37+
}
38+
39+
@Override
40+
public int getPartitionId(ByteBuffer keyByteBuffer, int numPartitions) {
41+
return 0;
42+
}
43+
}

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexVeniceWriterAdapter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/**
1919
* Adapter class for {@link ComplexVeniceWriter} to support public APIs defined in {@link AbstractVeniceWriter} in the
20-
* context of being called in a {@link com.linkedin.venice.writer.CompositeVeniceWriter} from VPJ. This class will
20+
* context of being called in a {@link CompositeVeniceWriter} from VPJ. This class will
2121
* provide capabilities to deserialize the value in order to provide {@link ComplexVeniceWriter} a value provider, and
2222
* decompression capabilities in case of a re-push (Kafka input).
2323
*/
@@ -55,8 +55,8 @@ public CompletableFuture<PubSubProduceResult> put(
5555
* The {@link PubSubProduceResult} will always be null and should not be used. This is acceptable because:
5656
* 1. {@link ComplexVeniceWriter#complexPut(Object, Object, int, Lazy)} returns a CompletableFuture with Void
5757
* since it could potentially write to multiple partitions resulting in multiple PubSubProduceResult.
58-
* 2. Only the PubSubProduceResult of the main writer in {@link com.linkedin.venice.writer.CompositeVeniceWriter} is
59-
* used for reporting purpose in VPJ.
58+
* 2. Only the PubSubProduceResult of the main writer in {@link CompositeVeniceWriter} is used for reporting
59+
* purpose in VPJ.
6060
*/
6161
@Override
6262
public CompletableFuture<PubSubProduceResult> put(

internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.venice.writer;
22

3+
import com.linkedin.venice.exceptions.VeniceException;
34
import com.linkedin.venice.partitioner.ComplexVenicePartitioner;
45
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
56
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
@@ -9,6 +10,7 @@
910
import com.linkedin.venice.utils.VeniceProperties;
1011
import com.linkedin.venice.utils.lazy.Lazy;
1112
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.Future;
1214
import java.util.concurrent.atomic.AtomicLong;
1315
import java.util.function.Function;
1416
import org.apache.avro.generic.GenericRecord;
@@ -46,7 +48,7 @@ public CompletableFuture<Void> complexPut(K key, V value, int valueSchemaId, Laz
4648
CompletableFuture<Void> finalCompletableFuture = new CompletableFuture<>();
4749
if (value == null) {
4850
// Ignore null value
49-
finalCompletableFuture.complete(null);
51+
throw new VeniceException("Put value should not be null");
5052
} else {
5153
// Write updated/put record to materialized view topic partition(s)
5254
if (complexPartitioner == null) {
@@ -159,6 +161,17 @@ public CompletableFuture<PubSubProduceResult> delete(
159161
throw new UnsupportedOperationException("ComplexVeniceWriter should use complexDelete instead of delete");
160162
}
161163

164+
@Override
165+
public Future<PubSubProduceResult> update(
166+
K key,
167+
U update,
168+
int valueSchemaId,
169+
int derivedSchemaId,
170+
PubSubProducerCallback callback,
171+
long logicalTs) {
172+
throw new UnsupportedOperationException("ComplexVeniceWriter does not support update");
173+
}
174+
162175
/**
163176
* Execute a "delete" on the key for a predetermined partition.
164177
*/

0 commit comments

Comments
 (0)