Skip to content

Commit 0d96921

Browse files
authored
[dvc][doc] Refactor DaVinciRecordTransformerConfig to use Builder pattern and update DVRT docs (#1501)
1. Previously, the configs for DaVinciRecordTransformerConfig were set by passing in all of the values into its constructor. This means that anytime we wanted to add another config, it would be a breaking change. To resolve this, I refactored DaVinciRecordTransformerConfig to use a Builder pattern. 2. Fixed a bug where if a user set storeRecordsInDaVinci, it would always bootstrap from VT and ignore the on-disk state. This isn't deseriable behavior. To fix this, I introduced a new config called alwaysBootstrapFromVersionTopic. Although most users won't use this, this can serve use-cases where the user is only storing their records in-memory. It is more preferable for users to at least have storeRecordsInDaVinci set to true so they can bootstrap from disk and utilize blob transfer. 3. If a user doesn't intend on modifying the schema of their records, the user won't need to pass inoutputValueSchema and outputValueClass to the DaVinciRecordTransformerConfig anymore. This can serve use cases where the user just wants to redirect their records to an alternative storage such as DuckDB. 4. Updated the DVRT docs to include the latest changes.
1 parent 4049bb2 commit 0d96921

23 files changed

+586
-188
lines changed

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.LinkedHashMap;
7070
import java.util.List;
7171
import java.util.Map;
72+
import java.util.Objects;
7273
import java.util.Optional;
7374
import java.util.Set;
7475
import java.util.concurrent.CompletableFuture;
@@ -700,6 +701,11 @@ private VeniceConfigLoader buildVeniceConfig() {
700701
kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS);
701702
}
702703

704+
String recordTransformerOutputValueSchema = "null";
705+
if (daVinciConfig.isRecordTransformerEnabled()) {
706+
recordTransformerOutputValueSchema = Objects.toString(recordTransformerConfig.getOutputValueSchema(), "null");
707+
}
708+
703709
VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
704710
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
705711
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
@@ -712,11 +718,7 @@ private VeniceConfigLoader buildVeniceConfig() {
712718
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
713719
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
714720
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
715-
.put(
716-
RECORD_TRANSFORMER_VALUE_SCHEMA,
717-
daVinciConfig.isRecordTransformerEnabled()
718-
? recordTransformerConfig.getOutputValueSchema().toString()
719-
: "null")
721+
.put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema)
720722
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
721723
// in Isolated Process
722724
.put(backendConfig.toProperties())

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public BlockingDaVinciRecordTransformer(
2929
Schema keySchema,
3030
Schema inputValueSchema,
3131
Schema outputValueSchema,
32-
boolean storeRecordsInDaVinci) {
33-
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, storeRecordsInDaVinci);
32+
DaVinciRecordTransformerConfig recordTransformerConfig) {
33+
super(recordTransformer.getStoreVersion(), keySchema, inputValueSchema, outputValueSchema, recordTransformerConfig);
3434
this.recordTransformer = recordTransformer;
3535
}
3636

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public abstract class DaVinciRecordTransformer<K, V, O> implements Closeable {
4343
*/
4444
private final boolean storeRecordsInDaVinci;
4545

46+
/**
47+
* Boolean to determine if we should always bootstrap from the Version Topic.
48+
*/
49+
private final boolean alwaysBootstrapFromVersionTopic;
50+
4651
/**
4752
* The key schema, which is immutable inside DaVinciClient. Users can modify the key if they are storing records in an external storage engine, but this must be managed by the user.
4853
*/
@@ -65,17 +70,17 @@ public abstract class DaVinciRecordTransformer<K, V, O> implements Closeable {
6570
* @param keySchema the key schema, which is immutable inside DaVinciClient. Users can modify the key if they are storing records in an external storage engine, but this must be managed by the user
6671
* @param inputValueSchema the value schema before transformation
6772
* @param outputValueSchema the value schema after transformation
68-
* @param storeRecordsInDaVinci set this to false if you intend to store records in a custom storage,
69-
* and not in the Da Vinci Client
73+
* @param recordTransformerConfig the config for the record transformer
7074
*/
7175
public DaVinciRecordTransformer(
7276
int storeVersion,
7377
Schema keySchema,
7478
Schema inputValueSchema,
7579
Schema outputValueSchema,
76-
boolean storeRecordsInDaVinci) {
80+
DaVinciRecordTransformerConfig recordTransformerConfig) {
7781
this.storeVersion = storeVersion;
78-
this.storeRecordsInDaVinci = storeRecordsInDaVinci;
82+
this.storeRecordsInDaVinci = recordTransformerConfig.getStoreRecordsInDaVinci();
83+
this.alwaysBootstrapFromVersionTopic = recordTransformerConfig.getAlwaysBootstrapFromVersionTopic();
7984
this.keySchema = keySchema;
8085
// ToDo: Make use of inputValueSchema to support reader/writer schemas
8186
this.inputValueSchema = inputValueSchema;
@@ -227,6 +232,13 @@ public final boolean getStoreRecordsInDaVinci() {
227232
return storeRecordsInDaVinci;
228233
}
229234

235+
/**
236+
* @return {@link #alwaysBootstrapFromVersionTopic}
237+
*/
238+
public final boolean getAlwaysBootstrapFromVersionTopic() {
239+
return alwaysBootstrapFromVersionTopic;
240+
}
241+
230242
/**
231243
* Returns the schema for the key used in {@link DaVinciClient}'s operations.
232244
*

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java

+89-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.linkedin.davinci.client;
22

3+
import com.linkedin.venice.exceptions.VeniceException;
4+
import java.util.Optional;
35
import org.apache.avro.Schema;
46

57

@@ -10,19 +12,22 @@ public class DaVinciRecordTransformerConfig {
1012
private final DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
1113
private final Class outputValueClass;
1214
private final Schema outputValueSchema;
15+
private final boolean storeRecordsInDaVinci;
16+
private final boolean alwaysBootstrapFromVersionTopic;
1317

14-
/**
15-
* @param recordTransformerFunction the functional interface for creating a {@link DaVinciRecordTransformer}
16-
* @param outputValueClass the class of the output value
17-
* @param outputValueSchema the schema of the output value
18-
*/
19-
public DaVinciRecordTransformerConfig(
20-
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction,
21-
Class outputValueClass,
22-
Schema outputValueSchema) {
23-
this.recordTransformerFunction = recordTransformerFunction;
24-
this.outputValueClass = outputValueClass;
25-
this.outputValueSchema = outputValueSchema;
18+
public DaVinciRecordTransformerConfig(Builder builder) {
19+
this.recordTransformerFunction = Optional.ofNullable(builder.recordTransformerFunction)
20+
.orElseThrow(() -> new VeniceException("recordTransformerFunction cannot be null"));
21+
22+
this.outputValueClass = builder.outputValueClass;
23+
this.outputValueSchema = builder.outputValueSchema;
24+
if ((this.outputValueClass != null && this.outputValueSchema == null)
25+
|| (this.outputValueClass == null && this.outputValueSchema != null)) {
26+
throw new VeniceException("outputValueClass and outputValueSchema must be defined together");
27+
}
28+
29+
this.storeRecordsInDaVinci = Optional.ofNullable(builder.storeRecordsInDaVinci).orElse(true);
30+
this.alwaysBootstrapFromVersionTopic = Optional.ofNullable(builder.alwaysBootstrapFromVersionTopic).orElse(false);
2631
}
2732

2833
/**
@@ -45,4 +50,76 @@ public Class getOutputValueClass() {
4550
public Schema getOutputValueSchema() {
4651
return outputValueSchema;
4752
}
53+
54+
/**
55+
* @return {@link #storeRecordsInDaVinci}
56+
*/
57+
public boolean getStoreRecordsInDaVinci() {
58+
return storeRecordsInDaVinci;
59+
}
60+
61+
/**
62+
* @return {@link #alwaysBootstrapFromVersionTopic}
63+
*/
64+
public boolean getAlwaysBootstrapFromVersionTopic() {
65+
return alwaysBootstrapFromVersionTopic;
66+
}
67+
68+
public static class Builder {
69+
private DaVinciRecordTransformerFunctionalInterface recordTransformerFunction;
70+
private Class outputValueClass;
71+
private Schema outputValueSchema;
72+
private Boolean storeRecordsInDaVinci;
73+
private Boolean alwaysBootstrapFromVersionTopic;
74+
75+
/**
76+
* @param recordTransformerFunction the functional interface for creating a {@link DaVinciRecordTransformer}
77+
*/
78+
public Builder setRecordTransformerFunction(DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
79+
this.recordTransformerFunction = recordTransformerFunction;
80+
return this;
81+
}
82+
83+
/**
84+
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueSchema(Schema)}
85+
* @param outputValueClass the class of the output value
86+
*/
87+
public Builder setOutputValueClass(Class outputValueClass) {
88+
this.outputValueClass = outputValueClass;
89+
return this;
90+
}
91+
92+
/**
93+
* Set this if you modify the schema during transformation. Must be used in conjunction with {@link #setOutputValueClass(Class)}
94+
* @param outputValueSchema the schema of the output value
95+
*/
96+
public Builder setOutputValueSchema(Schema outputValueSchema) {
97+
this.outputValueSchema = outputValueSchema;
98+
return this;
99+
}
100+
101+
/**
102+
* @param storeRecordsInDaVinci set this to false if you intend to store records in a custom storage,
103+
* and not in the Da Vinci Client.
104+
* Default is true.
105+
*/
106+
public Builder setStoreRecordsInDaVinci(boolean storeRecordsInDaVinci) {
107+
this.storeRecordsInDaVinci = storeRecordsInDaVinci;
108+
return this;
109+
}
110+
111+
/**
112+
* @param alwaysBootstrapFromVersionTopic set this to true if {@link #storeRecordsInDaVinci} is false, and you're
113+
* storing records in memory without being backed by disk.
114+
* Default is false.
115+
*/
116+
public Builder setAlwaysBootstrapFromVersionTopic(boolean alwaysBootstrapFromVersionTopic) {
117+
this.alwaysBootstrapFromVersionTopic = alwaysBootstrapFromVersionTopic;
118+
return this;
119+
}
120+
121+
public DaVinciRecordTransformerConfig build() {
122+
return new DaVinciRecordTransformerConfig(this);
123+
}
124+
}
48125
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerFunctionalInterface.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ DaVinciRecordTransformer apply(
1313
Integer storeVersion,
1414
Schema keySchema,
1515
Schema inputValueSchema,
16-
Schema outputValueSchema);
16+
Schema outputValueSchema,
17+
DaVinciRecordTransformerConfig recordTransformerConfig);
1718
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public final void onRecovery(
110110

111111
boolean transformerLogicChanged = hasTransformerLogicChanged(classHash, offsetRecord);
112112

113-
if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) {
113+
if (recordTransformer.getAlwaysBootstrapFromVersionTopic() || transformerLogicChanged) {
114114
LOGGER.info("Bootstrapping directly from the VersionTopic for partition {}", partitionId);
115115

116116
// Bootstrap from VT

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -481,15 +481,25 @@ public StoreIngestionTask(
481481
this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema();
482482
Schema outputValueSchema = recordTransformerConfig.getOutputValueSchema();
483483

484+
// User doesn't intend on transforming records. Use input value schema instead.
485+
if (outputValueSchema == null) {
486+
outputValueSchema = this.recordTransformerInputValueSchema;
487+
}
488+
484489
DaVinciRecordTransformer clientRecordTransformer = recordTransformerConfig.getRecordTransformerFunction()
485-
.apply(versionNumber, keySchema, this.recordTransformerInputValueSchema, outputValueSchema);
490+
.apply(
491+
versionNumber,
492+
keySchema,
493+
this.recordTransformerInputValueSchema,
494+
outputValueSchema,
495+
recordTransformerConfig);
486496

487497
this.recordTransformer = new BlockingDaVinciRecordTransformer(
488498
clientRecordTransformer,
489499
keySchema,
490500
this.recordTransformerInputValueSchema,
491501
outputValueSchema,
492-
clientRecordTransformer.getStoreRecordsInDaVinci());
502+
recordTransformerConfig);
493503
this.recordTransformerDeserializersByPutSchemaId = new SparseConcurrentList<>();
494504

495505
versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber);

Diff for: clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,11 @@ public AvroGenericDaVinciClient setUpClientWithRecordTransformer(
6060
daVinciConfig = new DaVinciConfig();
6161
}
6262

63-
DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig(
64-
(storeVersion, keySchema, inputValueSchema, outputValueSchema) -> new TestStringRecordTransformer(
65-
storeVersion,
66-
keySchema,
67-
inputValueSchema,
68-
outputValueSchema,
69-
true),
70-
String.class,
71-
Schema.create(Schema.Type.STRING));
63+
DaVinciRecordTransformerConfig recordTransformerConfig =
64+
new DaVinciRecordTransformerConfig.Builder().setRecordTransformerFunction(TestStringRecordTransformer::new)
65+
.setOutputValueClass(String.class)
66+
.setOutputValueSchema(Schema.create(Schema.Type.STRING))
67+
.build();
7268
daVinciConfig.setRecordTransformerConfig(recordTransformerConfig);
7369

7470
VeniceProperties backendConfig = mock(VeniceProperties.class);

Diff for: clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java

+23-12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.linkedin.davinci.client.DaVinciConfig;
99
import com.linkedin.davinci.client.DaVinciRecordTransformer;
1010
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
11+
import com.linkedin.davinci.client.DaVinciRecordTransformerFunctionalInterface;
1112
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
1213
import com.linkedin.venice.utils.lazy.Lazy;
1314
import java.io.IOException;
@@ -22,8 +23,8 @@ public TestRecordTransformer(
2223
Schema keySchema,
2324
Schema inputValueSchema,
2425
Schema outputValueSchema,
25-
boolean storeRecordsInDaVinci) {
26-
super(storeVersion, keySchema, inputValueSchema, outputValueSchema, storeRecordsInDaVinci);
26+
DaVinciRecordTransformerConfig recordTransformerConfig) {
27+
super(storeVersion, keySchema, inputValueSchema, outputValueSchema, recordTransformerConfig);
2728
}
2829

2930
@Override
@@ -47,16 +48,21 @@ public void testRecordTransformerEnabled() {
4748
DaVinciConfig config = new DaVinciConfig();
4849
assertFalse(config.isRecordTransformerEnabled());
4950

50-
DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig(
51-
(storeVersion, keySchema, inputValueSchema, outputValueSchema) -> new TestRecordTransformer(
51+
DaVinciRecordTransformerConfig dummyRecordTransformerConfig =
52+
new DaVinciRecordTransformerConfig.Builder().setRecordTransformerFunction(TestRecordTransformer::new).build();
53+
54+
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction =
55+
(storeVersion, keySchema, inputValueSchema, outputValueSchema, config1) -> new TestRecordTransformer(
5256
storeVersion,
5357
keySchema,
5458
inputValueSchema,
5559
outputValueSchema,
56-
true),
57-
String.class,
58-
Schema.create(Schema.Type.INT));
60+
dummyRecordTransformerConfig);
61+
62+
DaVinciRecordTransformerConfig recordTransformerConfig =
63+
new DaVinciRecordTransformerConfig.Builder().setRecordTransformerFunction(recordTransformerFunction).build();
5964
config.setRecordTransformerConfig(recordTransformerConfig);
65+
6066
assertTrue(config.isRecordTransformerEnabled());
6167
}
6268

@@ -65,16 +71,21 @@ public void testGetAndSetRecordTransformer() {
6571
DaVinciConfig config = new DaVinciConfig();
6672
assertNull(config.getRecordTransformerConfig());
6773

68-
DaVinciRecordTransformerConfig recordTransformerConfig = new DaVinciRecordTransformerConfig(
69-
(storeVersion, keySchema, inputValueSchema, outputValueSchema) -> new TestRecordTransformer(
74+
DaVinciRecordTransformerConfig dummyRecordTransformerConfig =
75+
new DaVinciRecordTransformerConfig.Builder().setRecordTransformerFunction(TestRecordTransformer::new).build();
76+
77+
DaVinciRecordTransformerFunctionalInterface recordTransformerFunction =
78+
(storeVersion, keySchema, inputValueSchema, outputValueSchema, config1) -> new TestRecordTransformer(
7079
storeVersion,
7180
keySchema,
7281
inputValueSchema,
7382
outputValueSchema,
74-
true),
75-
String.class,
76-
Schema.create(Schema.Type.INT));
83+
dummyRecordTransformerConfig);
84+
85+
DaVinciRecordTransformerConfig recordTransformerConfig =
86+
new DaVinciRecordTransformerConfig.Builder().setRecordTransformerFunction(recordTransformerFunction).build();
7787
config.setRecordTransformerConfig(recordTransformerConfig);
88+
7889
assertNotNull(config.getRecordTransformerConfig());
7990
}
8091

0 commit comments

Comments
 (0)