Skip to content

Commit ce8e51c

Browse files
committed
[server][common][vpj] Introduce VeniceComplexPartitioner to materialized view
The change will not work if record is actually large and chunked. Proper chunking support is needed and will be addressed in a separate PR. 1. Introduced VeniceComplexPartitioner which extends VenicePartitioner and offer a new API to partition by value and provide possible one-to-many partition mapping. 2. Added value provider of type Lazy<GenericRecord> to VeniceViewWriter's processRecord API to access deserialized value if needed. e.g. when a VeniceComplexPartitioner is involved. 3. MergeConflictResult will now provide deserialized value in a best effort manner. This is useful when we already deserialized the value for a partial update operation so that the deserialized value can be provided directly to the materialized view writer. 4. Refactored VeniceWriter to expose an API to write to desired partition with new DIV. This is only used by the new method writeWithComplexPartitioner for now to handle the partitioning and writes of the same value to mulitple partitions. However, this newly exposed API should also come handy when we build proper chunking support to forward chunks to predetermined view topic partitions. 5. writeWithComplexPartitioner in VeniceWriter will re-chunk when writing to each partition. This should be optimized when we build proper chunking support.
1 parent ff63c20 commit ce8e51c

File tree

24 files changed

+579
-87
lines changed

24 files changed

+579
-87
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,14 @@ protected void processMessageAndMaybeProduceToKafka(
669669
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
670670
int oldValueSchemaId =
671671
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
672+
Lazy<GenericRecord> newValueProvider;
673+
if (mergeConflictResult.getNewValueDeserialized().isPresent()) {
674+
newValueProvider = Lazy.of(() -> mergeConflictResult.getNewValueDeserialized().get());
675+
} else {
676+
newValueProvider = getNewValueProvider(
677+
mergeConflictResultWrapper.getUpdatedValueBytes(),
678+
mergeConflictResult.getValueSchemaId());
679+
}
672680
queueUpVersionTopicWritesWithViewWriters(
673681
partitionConsumptionState,
674682
(viewWriter) -> viewWriter.processRecord(
@@ -677,7 +685,8 @@ protected void processMessageAndMaybeProduceToKafka(
677685
keyBytes,
678686
mergeConflictResult.getValueSchemaId(),
679687
oldValueSchemaId,
680-
mergeConflictResult.getRmdRecord()),
688+
mergeConflictResult.getRmdRecord(),
689+
newValueProvider),
681690
produceToVersionTopic);
682691
} else {
683692
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
202202
protected final Map<String, VeniceViewWriter> viewWriters;
203203
protected final boolean hasChangeCaptureView;
204204

205-
protected final AvroStoreDeserializerCache storeDeserializerCache;
205+
protected final AvroStoreDeserializerCache<GenericRecord> storeDeserializerCache;
206206

207207
private final AtomicLong lastSendIngestionHeartbeatTimestamp = new AtomicLong(0);
208208

@@ -3383,9 +3383,11 @@ protected void processMessageAndMaybeProduceToKafka(
33833383
Put newPut = writeComputeResultWrapper.getNewPut();
33843384
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
33853385
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
3386+
Lazy<GenericRecord> newValueProvider = getNewValueProvider(newPut.putValue, newPut.schemaId);
33863387
queueUpVersionTopicWritesWithViewWriters(
33873388
partitionConsumptionState,
3388-
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey),
3389+
(viewWriter) -> viewWriter
3390+
.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey, newValueProvider),
33893391
produceToVersionTopic);
33903392
} else {
33913393
produceToVersionTopic.run();
@@ -4043,4 +4045,10 @@ <T> T databaseLookupWithConcurrencyLimit(Supplier<T> supplier) {
40434045
return supplier.get();
40444046
}
40454047
}
4048+
protected Lazy<GenericRecord> getNewValueProvider(ByteBuffer newValue, int schemaId) {
4049+
if (newValue == null) {
4050+
return Lazy.of(() -> null);
4051+
}
4052+
return Lazy.of(() -> storeDeserializerCache.getDeserializer(schemaId, schemaId).deserialize(newValue));
4053+
}
40464054
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536
import java.util.Set;
3637
import java.util.function.Function;
3738
import java.util.stream.Collectors;
@@ -276,7 +277,12 @@ public MergeConflictResult update(
276277
final ByteBuffer updatedValueBytes = updatedValueAndRmd.getValue() == null
277278
? null
278279
: serializeMergedValueRecord(oldValueSchemaID, updatedValueAndRmd.getValue());
279-
return new MergeConflictResult(updatedValueBytes, oldValueSchemaID, false, updatedValueAndRmd.getRmd());
280+
return new MergeConflictResult(
281+
updatedValueBytes,
282+
Optional.of(updatedValueAndRmd.getValue()),
283+
oldValueSchemaID,
284+
false,
285+
updatedValueAndRmd.getRmd());
280286
}
281287

282288
private MergeConflictResult mergePutWithValueLevelTimestamp(

clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.davinci.replication.merge;
22

33
import java.nio.ByteBuffer;
4+
import java.util.Optional;
45
import org.apache.avro.generic.GenericRecord;
56

67

@@ -12,6 +13,7 @@ public class MergeConflictResult {
1213
private static final MergeConflictResult IGNORED_RESULT = new MergeConflictResult();
1314

1415
private ByteBuffer newValue;
16+
private Optional<GenericRecord> newValueDeserialized;
1517
private int valueSchemaId;
1618
private final boolean updateIgnored; // Whether we should skip the incoming message since it could be a stale message.
1719
private boolean resultReusesInput;
@@ -22,8 +24,18 @@ public MergeConflictResult(
2224
int valueSchemaID,
2325
boolean resultReusesInput,
2426
GenericRecord rmdRecord) {
27+
this(newValue, Optional.empty(), valueSchemaID, resultReusesInput, rmdRecord);
28+
}
29+
30+
public MergeConflictResult(
31+
ByteBuffer newValue,
32+
Optional<GenericRecord> newValueDeserialized,
33+
int valueSchemaID,
34+
boolean resultReusesInput,
35+
GenericRecord rmdRecord) {
2536
this.updateIgnored = false;
2637
this.newValue = newValue;
38+
this.newValueDeserialized = newValueDeserialized;
2739
this.valueSchemaId = valueSchemaID;
2840
this.resultReusesInput = resultReusesInput;
2941
this.rmdRecord = rmdRecord;
@@ -56,4 +68,14 @@ public boolean doesResultReuseInput() {
5668
public GenericRecord getRmdRecord() {
5769
return rmdRecord;
5870
}
71+
72+
/**
73+
* Provide the deserialized new value on a best-effort approach. Meaning that it's acceptable to return an empty
74+
* Optional. e.g. MergeConflictResult of full PUTs will not contain deserialized new value since we don't need to
75+
* deserialize the value to generate the MCR.
76+
* @return deserialized new value if possible.
77+
*/
78+
public Optional<GenericRecord> getNewValueDeserialized() {
79+
return newValueDeserialized;
80+
}
5981
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
1818
import com.linkedin.venice.schema.rmd.RmdUtils;
1919
import com.linkedin.venice.utils.VeniceProperties;
20+
import com.linkedin.venice.utils.lazy.Lazy;
2021
import com.linkedin.venice.views.ChangeCaptureView;
2122
import com.linkedin.venice.writer.VeniceWriter;
2223
import com.linkedin.venice.writer.VeniceWriterFactory;
@@ -65,7 +66,8 @@ public CompletableFuture<PubSubProduceResult> processRecord(
6566
byte[] key,
6667
int newValueSchemaId,
6768
int oldValueSchemaId,
68-
GenericRecord replicationMetadataRecord) {
69+
GenericRecord replicationMetadataRecord,
70+
Lazy<GenericRecord> newValueProvider) {
6971
// TODO: not sold about having currentValue in the interface but it VASTLY simplifies a lot of things with regards
7072
// to dealing with compression/chunking/etc. in the storage layer.
7173

@@ -87,7 +89,8 @@ public CompletableFuture<PubSubProduceResult> processRecord(
8789
ByteBuffer newValue,
8890
byte[] key,
8991
int newValueSchemaId,
90-
boolean isChunkedKey) {
92+
boolean isChunkedKey,
93+
Lazy<GenericRecord> newValueProvider) {
9194
// No op
9295
return CompletableFuture.completedFuture(null);
9396
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
77
import com.linkedin.venice.message.KafkaKey;
88
import com.linkedin.venice.meta.Version;
9+
import com.linkedin.venice.partitioner.VeniceComplexPartitioner;
910
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
1011
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
1112
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
@@ -33,6 +34,7 @@ public class MaterializedViewWriter extends VeniceViewWriter {
3334
private final String materializedViewTopicName;
3435
private Lazy<VeniceWriter> veniceWriter;
3536
private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
37+
private final VeniceComplexPartitioner complexPartitioner;
3638

3739
public MaterializedViewWriter(
3840
VeniceConfigLoader props,
@@ -48,6 +50,11 @@ public MaterializedViewWriter(
4850
this.veniceWriter = Lazy.of(
4951
() -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null)
5052
.createVeniceWriter(buildWriterOptions()));
53+
if (internalView.getViewPartitioner() instanceof VeniceComplexPartitioner) {
54+
complexPartitioner = (VeniceComplexPartitioner) internalView.getViewPartitioner();
55+
} else {
56+
complexPartitioner = null;
57+
}
5158
}
5259

5360
/**
@@ -64,8 +71,9 @@ public CompletableFuture<PubSubProduceResult> processRecord(
6471
byte[] key,
6572
int newValueSchemaId,
6673
int oldValueSchemaId,
67-
GenericRecord replicationMetadataRecord) {
68-
return processRecord(newValue, key, newValueSchemaId, false);
74+
GenericRecord replicationMetadataRecord,
75+
Lazy<GenericRecord> newValueProvider) {
76+
return processRecord(newValue, key, newValueSchemaId, false, newValueProvider);
6977
}
7078

7179
/**
@@ -81,16 +89,29 @@ public CompletableFuture<PubSubProduceResult> processRecord(
8189
ByteBuffer newValue,
8290
byte[] key,
8391
int newValueSchemaId,
84-
boolean isChunkedKey) {
92+
boolean isChunkedKey,
93+
Lazy<GenericRecord> newValueProvider) {
8594
byte[] viewTopicKey = key;
8695
if (isChunkedKey) {
8796
viewTopicKey = keyWithChunkingSuffixSerializer.getKeyFromChunkedKey(key);
8897
}
89-
if (newValue == null) {
90-
// this is a delete operation
91-
return veniceWriter.get().delete(viewTopicKey, null);
98+
byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue);
99+
if (complexPartitioner != null) {
100+
return veniceWriter.get()
101+
.writeWithComplexPartitioner(
102+
viewTopicKey,
103+
newValueBytes,
104+
newValueSchemaId,
105+
newValueProvider,
106+
complexPartitioner,
107+
internalView.getViewPartitionCount());
108+
} else {
109+
if (newValue == null) {
110+
// this is a delete operation
111+
return veniceWriter.get().delete(viewTopicKey, null);
112+
}
113+
return veniceWriter.get().put(viewTopicKey, newValueBytes, newValueSchemaId);
92114
}
93-
return veniceWriter.get().put(viewTopicKey, ByteUtils.extractByteArray(newValue), newValueSchemaId);
94115
}
95116

96117
@Override

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.linkedin.venice.message.KafkaKey;
1111
import com.linkedin.venice.meta.Version;
1212
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
13+
import com.linkedin.venice.utils.lazy.Lazy;
1314
import com.linkedin.venice.views.VeniceView;
1415
import com.linkedin.venice.writer.VeniceWriterOptions;
1516
import java.nio.ByteBuffer;
@@ -62,18 +63,19 @@ public VeniceViewWriter(
6263
* @param newValue the incoming fully specified value which hasn't yet been committed to Venice
6364
* @param oldValue the previous value which has already been locally committed to Venice for the given key
6465
* @param key the key of the record that designates newValue and oldValue
65-
* @param version the version of the store taking this record
6666
* @param newValueSchemaId the schemaId of the incoming record
6767
* @param oldValueSchemaId the schemaId of the old record
6868
* @param replicationMetadataRecord the associated RMD for the incoming record.
69+
* @param newValueProvider to provide the deserialized new value
6970
*/
7071
public abstract CompletableFuture<PubSubProduceResult> processRecord(
7172
ByteBuffer newValue,
7273
ByteBuffer oldValue,
7374
byte[] key,
7475
int newValueSchemaId,
7576
int oldValueSchemaId,
76-
GenericRecord replicationMetadataRecord);
77+
GenericRecord replicationMetadataRecord,
78+
Lazy<GenericRecord> newValueProvider);
7779

7880
/**
7981
* To be called as a given ingestion task consumes each record. This is called prior to writing to a
@@ -83,12 +85,14 @@ public abstract CompletableFuture<PubSubProduceResult> processRecord(
8385
* @param key the key of the record that designates newValue and oldValue
8486
* @param newValueSchemaId the schemaId of the incoming record
8587
* @param isChunkedKey is the key already serialized with {@link com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer}
88+
* @param newValueProvider to provide the deserialized new value
8689
*/
8790
public abstract CompletableFuture<PubSubProduceResult> processRecord(
8891
ByteBuffer newValue,
8992
byte[] key,
9093
int newValueSchemaId,
91-
boolean isChunkedKey);
94+
boolean isChunkedKey,
95+
Lazy<GenericRecord> newValueProvider);
9296

9397
/**
9498
* Called when the server encounters a control message. There isn't (today) a strict ordering

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc
280280
viewWriterMap.put("testView", materializedViewWriter);
281281
when(mockVeniceViewWriterFactory.buildStoreViewWriters(any(), anyInt(), any())).thenReturn(viewWriterMap);
282282
CompletableFuture<PubSubProduceResult> viewWriterFuture = new CompletableFuture<>();
283-
when(materializedViewWriter.processRecord(any(), any(), anyInt(), anyBoolean())).thenReturn(viewWriterFuture);
283+
when(materializedViewWriter.processRecord(any(), any(), anyInt(), anyBoolean(), any()))
284+
.thenReturn(viewWriterFuture);
284285
setUp();
285286
WriteComputeResultWrapper mockResult = mock(WriteComputeResultWrapper.class);
286287
Put put = new Put();
@@ -291,12 +292,12 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc
291292
.thenReturn(CompletableFuture.completedFuture(null));
292293
leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters(
293294
mockPartitionConsumptionState,
294-
(viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1, false),
295+
(viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1, false, Lazy.of(() -> null)),
295296
() -> writeToVersionTopic.set(true));
296297
verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture();
297298
ArgumentCaptor<CompletableFuture> vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class);
298299
verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture());
299-
verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt(), anyBoolean());
300+
verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt(), anyBoolean(), any());
300301
verify(hostLevelIngestionStats, times(1)).recordViewProducerLatency(anyDouble());
301302
verify(hostLevelIngestionStats, never()).recordViewProducerAckLatency(anyDouble());
302303
assertFalse(writeToVersionTopic.get());

clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
2525
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
2626
import com.linkedin.venice.utils.VeniceProperties;
27+
import com.linkedin.venice.utils.lazy.Lazy;
2728
import com.linkedin.venice.views.ChangeCaptureView;
2829
import com.linkedin.venice.writer.VeniceWriter;
2930
import com.linkedin.venice.writer.VeniceWriterOptions;
@@ -245,15 +246,21 @@ public void testProcessRecord() throws ExecutionException, InterruptedException
245246
rmdRecordWithValueLevelTimeStamp.put(TIMESTAMP_FIELD_NAME, 20L);
246247
rmdRecordWithValueLevelTimeStamp.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME, vectors);
247248
changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter);
248-
249+
Lazy<GenericRecord> dummyValueProvider = Lazy.of(() -> null);
249250
// Update Case
250-
changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
251+
changeCaptureViewWriter
252+
.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider)
253+
.get();
251254

252255
// Insert Case
253-
changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
256+
changeCaptureViewWriter
257+
.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider)
258+
.get();
254259

255260
// Deletion Case
256-
changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
261+
changeCaptureViewWriter
262+
.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider)
263+
.get();
257264

258265
// Set up argument captors
259266
ArgumentCaptor<byte[]> keyCaptor = ArgumentCaptor.forClass(byte[].class);

clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
3434
import com.linkedin.venice.utils.ObjectMapperFactory;
3535
import com.linkedin.venice.utils.VeniceProperties;
36+
import com.linkedin.venice.utils.lazy.Lazy;
3637
import com.linkedin.venice.views.MaterializedView;
3738
import com.linkedin.venice.views.VeniceView;
3839
import com.linkedin.venice.writer.VeniceWriter;
@@ -163,8 +164,12 @@ public void testViewWriterCanForwardChunkedKeysCorrectly() {
163164
for (int i = 0; i < 100; i++) {
164165
byte[] key = new byte[keySize];
165166
RANDOM.nextBytes(key);
166-
materializedViewWriter
167-
.processRecord(dummyValue, keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key), 1, true);
167+
materializedViewWriter.processRecord(
168+
dummyValue,
169+
keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key),
170+
1,
171+
true,
172+
Lazy.of(() -> null));
168173
verify(veniceWriter, times(1)).put(eq(key), any(), eq(1));
169174
Mockito.clearInvocations(veniceWriter);
170175
}

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP;
4747
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_DIR;
4848
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_ID_PROP;
49+
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_STRING_PROP;
4950
import static com.linkedin.venice.vpj.VenicePushJobConstants.VSON_PUSH;
5051
import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_COMPRESSION_LEVEL;
5152
import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_DICTIONARY_CREATION_SUCCESS;
@@ -272,6 +273,7 @@ protected void setupInputFormatConf(JobConf jobConf, PushJobSetting pushJobSetti
272273
if (pushJobSetting.isAvro) {
273274
jobConf.set(SCHEMA_STRING_PROP, pushJobSetting.inputDataSchemaString);
274275
jobConf.set(AvroJob.INPUT_SCHEMA, pushJobSetting.inputDataSchemaString);
276+
jobConf.set(VALUE_SCHEMA_STRING_PROP, pushJobSetting.valueSchemaString);
275277
if (pushJobSetting.generatePartialUpdateRecordFromInput) {
276278
jobConf.setBoolean(GENERATE_PARTIAL_UPDATE_RECORD_FROM_INPUT, true);
277279
jobConf.set(UPDATE_SCHEMA_STRING_PROP, pushJobSetting.valueSchemaString);

0 commit comments

Comments
 (0)