Skip to content

Commit 7075d67

Browse files
authored
[duckdb] Create first implementation of DuckDB x DVRT (#1435)
1 parent a138612 commit 7075d67

File tree

13 files changed

+499
-15
lines changed

13 files changed

+499
-15
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ Gemfile.lock
2424
.bundles_cache
2525
docs/vendor/
2626
clients/da-vinci-client/classHash*.txt
27+
integrations/venice-duckdb/classHash*.txt

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ public void processDelete(Lazy<K> key) {
5151
this.recordTransformer.processDelete(key);
5252
}
5353

54-
public void onStartVersionIngestion() {
55-
this.recordTransformer.onStartVersionIngestion();
54+
public void onStartVersionIngestion(boolean isCurrentVersion) {
55+
this.recordTransformer.onStartVersionIngestion(isCurrentVersion);
5656
startLatch.countDown();
5757
}
5858

59-
public void onEndVersionIngestion() {
60-
this.recordTransformer.onEndVersionIngestion();
59+
public void onEndVersionIngestion(int currentVersion) {
60+
this.recordTransformer.onEndVersionIngestion(currentVersion);
6161
}
6262
}

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void processDelete(Lazy<K> key) {
103103
*
104104
* By default, it performs no operation.
105105
*/
106-
public void onStartVersionIngestion() {
106+
public void onStartVersionIngestion(boolean isCurrentVersion) {
107107
return;
108108
}
109109

@@ -113,10 +113,12 @@ public void onStartVersionIngestion() {
113113
*
114114
* By default, it performs no operation.
115115
*/
116-
public void onEndVersionIngestion() {
116+
public void onEndVersionIngestion(int currentVersion) {
117117
return;
118118
}
119119

120+
// Final methods below
121+
120122
/**
121123
* Transforms and processes the given record.
122124
*

Diff for: clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ public StoreIngestionTask(
486486
// onStartVersionIngestion called here instead of run() because this needs to finish running
487487
// before bootstrapping starts
488488
long startTime = System.currentTimeMillis();
489-
recordTransformer.onStartVersionIngestion();
489+
recordTransformer.onStartVersionIngestion(isCurrentVersion.getAsBoolean());
490490
long endTime = System.currentTimeMillis();
491491
versionedIngestionStats.recordTransformerLifecycleStartLatency(
492492
storeName,
@@ -4074,7 +4074,8 @@ public synchronized void close() {
40744074

40754075
if (recordTransformer != null) {
40764076
long startTime = System.currentTimeMillis();
4077-
recordTransformer.onEndVersionIngestion();
4077+
Store store = storeRepository.getStoreOrThrow(storeName);
4078+
recordTransformer.onEndVersionIngestion(store.getCurrentVersion());
40784079
long endTime = System.currentTimeMillis();
40794080
versionedIngestionStats.recordTransformerLifecycleEndLatency(
40804081
storeName,

Diff for: clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/RecordTransformerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void testBlockingRecordTransformer() {
102102
DaVinciRecordTransformer<Integer, String, String> recordTransformer = new TestStringRecordTransformer(0, true);
103103
recordTransformer =
104104
new BlockingDaVinciRecordTransformer<>(recordTransformer, recordTransformer.getStoreRecordsInDaVinci());
105-
recordTransformer.onStartVersionIngestion();
105+
recordTransformer.onStartVersionIngestion(true);
106106

107107
assertTrue(recordTransformer.getStoreRecordsInDaVinci());
108108

@@ -120,7 +120,7 @@ public void testBlockingRecordTransformer() {
120120

121121
recordTransformer.processDelete(lazyKey);
122122

123-
recordTransformer.onEndVersionIngestion();
123+
recordTransformer.onEndVersionIngestion(2);
124124
}
125125

126126
}

Diff for: gradle/spotbugs/exclude.xml

+2
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,14 @@
499499
<Or>
500500
<Class name="com.linkedin.venice.duckdb.HelloWorldTest"/>
501501
<Class name="com.linkedin.venice.duckdb.DuckDBAvroToSQLTest"/>
502+
<Class name="com.linkedin.venice.sql.DuckDBDaVinciRecordTransformer"/>
502503
</Or>
503504
</Match>
504505
<Match>
505506
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
506507
<Or>
507508
<Class name="com.linkedin.venice.duckdb.DuckDBAvroToSQLTest"/>
509+
<Class name="com.linkedin.venice.sql.DuckDBDaVinciRecordTransformer"/>
508510
</Or>
509511
</Match>
510512
</FindBugsFilter>

Diff for: integrations/venice-duckdb/build.gradle

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ dependencies {
33
implementation libraries.avroUtilCompatHelper
44
implementation libraries.duckdbJdbc
55

6+
implementation project(':clients:da-vinci-client')
67
implementation project(':internal:venice-client-common')
78

8-
testImplementation project(':internal:venice-common')
9+
implementation project(':internal:venice-common')
910
}
1011

1112
checkerFramework {
1213
extraJavacArgs = ['-Xmaxerrs', '256']
1314
checkers = ['org.checkerframework.checker.nullness.NullnessChecker']
1415
skipCheckerFramework = true
1516
excludeTests = true
16-
}
17+
}

Diff for: integrations/venice-duckdb/src/main/java/com/linkedin/venice/sql/AvroToSQL.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public static String createTableStatement(
7777
throw new IllegalArgumentException("Only Avro records can have a corresponding CREATE TABLE statement.");
7878
}
7979
StringBuffer stringBuffer = new StringBuffer();
80-
stringBuffer.append("CREATE TABLE " + cleanTableName(tableName) + "(");
80+
stringBuffer.append("CREATE TABLE IF NOT EXISTS " + cleanTableName(tableName) + "(");
8181
boolean firstColumn = true;
8282

8383
for (Schema.Field field: avroSchema.getFields()) {
@@ -201,6 +201,7 @@ public static BiConsumer<GenericRecord, PreparedStatement> upsertProcessor(@Nonn
201201
// Unions are handled via unpacking
202202
fieldType = field.schema().getTypes().get(avroFieldIndexToUnionBranchIndex[field.pos()]).getType();
203203
}
204+
204205
processField(jdbcIndex, fieldType, fieldValue, preparedStatement, field.name());
205206
}
206207
preparedStatement.execute();
@@ -222,7 +223,7 @@ private static void processField(
222223
preparedStatement.setBytes(jdbcIndex, ByteUtils.extractByteArray((ByteBuffer) fieldValue));
223224
break;
224225
case STRING:
225-
preparedStatement.setString(jdbcIndex, (String) fieldValue);
226+
preparedStatement.setString(jdbcIndex, fieldValue.toString());
226227
break;
227228
case INT:
228229
preparedStatement.setInt(jdbcIndex, (int) fieldValue);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.linkedin.venice.sql;
2+
3+
import static com.linkedin.venice.sql.AvroToSQL.UnsupportedTypeHandling.FAIL;
4+
5+
import com.linkedin.davinci.client.DaVinciRecordTransformer;
6+
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
7+
import com.linkedin.venice.utils.lazy.Lazy;
8+
import java.sql.Connection;
9+
import java.sql.DriverManager;
10+
import java.sql.PreparedStatement;
11+
import java.sql.SQLException;
12+
import java.sql.Statement;
13+
import java.util.Collections;
14+
import java.util.Set;
15+
import java.util.function.BiConsumer;
16+
import org.apache.avro.Schema;
17+
import org.apache.avro.SchemaBuilder;
18+
import org.apache.avro.generic.GenericRecord;
19+
20+
21+
public class DuckDBDaVinciRecordTransformer extends DaVinciRecordTransformer<String, GenericRecord, GenericRecord> {
22+
private static final String duckDBFilePath = "my_database.duckdb";
23+
// ToDo: Don't hardcode the table name. Get it from the storeName
24+
private static final String baseVersionTableName = "my_table_v";
25+
private static final Set<String> primaryKeys = Collections.singleton("firstName");
26+
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
27+
private static final String createViewStatementTemplate =
28+
"CREATE OR REPLACE VIEW current_version AS SELECT * FROM %s;";
29+
private static final String dropTableStatementTemplate = "DROP TABLE %s;";
30+
private final String duckDBUrl;
31+
private final String versionTableName;
32+
33+
public DuckDBDaVinciRecordTransformer(int storeVersion, boolean storeRecordsInDaVinci, String baseDir) {
34+
super(storeVersion, storeRecordsInDaVinci);
35+
versionTableName = baseVersionTableName + storeVersion;
36+
duckDBUrl = "jdbc:duckdb:" + baseDir + "/" + duckDBFilePath;
37+
}
38+
39+
@Override
40+
public Schema getKeySchema() {
41+
return Schema.create(Schema.Type.STRING);
42+
}
43+
44+
@Override
45+
public Schema getOutputValueSchema() {
46+
return SchemaBuilder.record("nameRecord")
47+
.namespace("example.avro")
48+
.fields()
49+
.name("firstName")
50+
.type()
51+
.stringType()
52+
.stringDefault("")
53+
.name("lastName")
54+
.type()
55+
.stringType()
56+
.stringDefault("")
57+
.endRecord();
58+
}
59+
60+
@Override
61+
public DaVinciRecordTransformerResult<GenericRecord> transform(Lazy<String> key, Lazy<GenericRecord> value) {
62+
// Record transformation happens inside processPut as we need access to the connection object to create the prepared
63+
// statement
64+
return new DaVinciRecordTransformerResult<>(DaVinciRecordTransformerResult.Result.UNCHANGED);
65+
}
66+
67+
@Override
68+
public void processPut(Lazy<String> key, Lazy<GenericRecord> value) {
69+
Schema valueSchema = value.get().getSchema();
70+
String upsertStatement = AvroToSQL.upsertStatement(versionTableName, valueSchema);
71+
72+
// ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool?
73+
try (Connection connection = DriverManager.getConnection(duckDBUrl)) {
74+
BiConsumer<GenericRecord, PreparedStatement> upsertProcessor = AvroToSQL.upsertProcessor(valueSchema);
75+
76+
try (PreparedStatement preparedStatement = connection.prepareStatement(upsertStatement)) {
77+
upsertProcessor.accept(value.get(), preparedStatement);
78+
}
79+
} catch (SQLException e) {
80+
throw new RuntimeException(e);
81+
}
82+
}
83+
84+
@Override
85+
public void processDelete(Lazy<String> key) {
86+
// Unable to convert to prepared statement as table and column names can't be parameterized
87+
// ToDo make delete non-hardcoded on primaryKey
88+
String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "firstName");
89+
90+
// ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool?
91+
try (Connection connection = DriverManager.getConnection(duckDBUrl);
92+
PreparedStatement stmt = connection.prepareStatement(deleteStatement)) {
93+
stmt.setString(1, key.get());
94+
stmt.execute();
95+
} catch (SQLException e) {
96+
throw new RuntimeException(e);
97+
}
98+
}
99+
100+
@Override
101+
public void onStartVersionIngestion(boolean isCurrentVersion) {
102+
try (Connection connection = DriverManager.getConnection(duckDBUrl);
103+
Statement stmt = connection.createStatement()) {
104+
String createTableStatement =
105+
AvroToSQL.createTableStatement(versionTableName, getOutputValueSchema(), primaryKeys, FAIL);
106+
stmt.execute(createTableStatement);
107+
108+
if (isCurrentVersion) {
109+
// Unable to convert to prepared statement as table and column names can't be parameterized
110+
String createViewStatement = String.format(createViewStatementTemplate, versionTableName);
111+
stmt.execute(createViewStatement);
112+
}
113+
} catch (SQLException e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
118+
@Override
119+
public void onEndVersionIngestion(int currentVersion) {
120+
try (Connection connection = DriverManager.getConnection(duckDBUrl);
121+
Statement stmt = connection.createStatement()) {
122+
// Swap to current version
123+
String currentVersionTableName = baseVersionTableName + currentVersion;
124+
String createViewStatement = String.format(createViewStatementTemplate, currentVersionTableName);
125+
stmt.execute(createViewStatement);
126+
127+
// Unable to convert to prepared statement as table and column names can't be parameterized
128+
// Drop DuckDB table for storeVersion as it's retired
129+
String dropTableStatement = String.format(dropTableStatementTemplate, versionTableName);
130+
stmt.execute(dropTableStatement);
131+
} catch (SQLException e) {
132+
throw new RuntimeException(e);
133+
}
134+
}
135+
136+
public String getDuckDBUrl() {
137+
return duckDBUrl;
138+
}
139+
}

Diff for: integrations/venice-duckdb/src/test/java/com/linkedin/venice/sql/AvroToSQLTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
public class AvroToSQLTest {
19-
private static final String EXPECTED_CREATE_TABLE_STATEMENT_WITH_ALL_TYPES = "CREATE TABLE MyRecord(" //
19+
private static final String EXPECTED_CREATE_TABLE_STATEMENT_WITH_ALL_TYPES = "CREATE TABLE IF NOT EXISTS MyRecord(" //
2020
+ "fixedField BINARY, " //
2121
+ "stringField VARCHAR, " //
2222
+ "bytesField VARBINARY, "//

0 commit comments

Comments
 (0)