Skip to content

Commit d300c52

Browse files
authored
[duckdb] Fixed multi-store bug (#1459)
The view created by DuckDBDVRT was called current_version, so it could not work with multiple stores. Changed it so that the view inherits the store name instead. Miscellaneous build changes: - Fixed the duckdb exclusion rule in all-modules. - Bumped up the duckdb snapshot dep to the current latest one.
1 parent 1b1a5fc commit d300c52

File tree

6 files changed

+116
-12
lines changed

6 files changed

+116
-12
lines changed

all-modules/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
dependencies {
44
rootProject.subprojects.each { subproject ->
55
// Excluding venice-duckdb as it's experimental and causes build issues
6-
if (subproject.path != project.path && subproject.subprojects.isEmpty() && subproject.path != ':venice-duckdb') {
6+
if (subproject.path != project.path && subproject.subprojects.isEmpty() && subproject.path != ':integrations:venice-duckdb') {
77
implementation (project(subproject.path)) {
88
exclude group: 'org.duckdb'
99
exclude module: 'venice-duckdb'

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ ext.libraries = [
8282
commonsLang: 'commons-lang:commons-lang:2.6',
8383
conscrypt: 'org.conscrypt:conscrypt-openjdk-uber:2.5.2',
8484
d2: "com.linkedin.pegasus:d2:${pegasusVersion}",
85-
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250117.012118-121", // TODO: Remove SNAPSHOT when the real release is published!
85+
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250121.012246-127", // TODO: Remove SNAPSHOT when the real release is published!
8686
failsafe: 'net.jodah:failsafe:2.4.0',
8787
fastUtil: 'it.unimi.dsi:fastutil:8.3.0',
8888
grpcNettyShaded: "io.grpc:grpc-netty-shaded:${grpcVersion}",

gradle/spotbugs/exclude.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,8 @@
504504
<Class name="com.linkedin.venice.duckdb.DuckDBHelloWorldTest"/>
505505
<Class name="com.linkedin.venice.duckdb.DuckDBAvroToSQLTest"/>
506506
<Class name="com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformer"/>
507+
<Class name="com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformerTest"/>
508+
<Class name="com.linkedin.venice.endToEnd.DuckDBDaVinciRecordTransformerIntegrationTest"/>
507509
<Class name="com.linkedin.venice.sql.SQLHelloWorldTest"/>
508510
<Class name="com.linkedin.venice.sql.SQLUtilsTest"/>
509511
<Class name="com.linkedin.venice.sql.SQLUtils"/>

integrations/venice-duckdb/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,18 @@ public void testRecordTransformer() throws Exception {
158158

159159
clientWithRecordTransformer.subscribeAll().get();
160160

161-
assertRowCount(duckDBUrl, "subscribeAll() finishes!");
161+
assertRowCount(duckDBUrl, storeName, "subscribeAll() finishes!");
162162

163163
clientWithRecordTransformer.unsubscribeAll();
164164
}
165165

166-
assertRowCount(duckDBUrl, "DVC gets closed!");
166+
assertRowCount(duckDBUrl, storeName, "DVC gets closed!");
167167
}
168168

169-
private void assertRowCount(String duckDBUrl, String assertionErrorMsg) throws SQLException {
169+
private void assertRowCount(String duckDBUrl, String storeName, String assertionErrorMsg) throws SQLException {
170170
try (Connection connection = DriverManager.getConnection(duckDBUrl);
171171
Statement statement = connection.createStatement();
172-
ResultSet rs = statement.executeQuery("SELECT count(*) FROM current_version")) {
172+
ResultSet rs = statement.executeQuery("SELECT count(*) FROM " + storeName)) {
173173
assertTrue(rs.next());
174174
int rowCount = rs.getInt(1);
175175
assertEquals(

integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ public class DuckDBDaVinciRecordTransformer
2929
private static final Logger LOGGER = LogManager.getLogger(DuckDBDaVinciRecordTransformer.class);
3030
private static final String duckDBFilePath = "my_database.duckdb";
3131
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
32-
private static final String createViewStatementTemplate =
33-
"CREATE OR REPLACE VIEW current_version AS SELECT * FROM %s;";
32+
private static final String createViewStatementTemplate = "CREATE OR REPLACE VIEW %s AS SELECT * FROM %s;";
3433
private static final String dropTableStatementTemplate = "DROP TABLE %s;";
3534
private final String storeNameWithoutVersionInfo;
3635
private final String versionTableName;
@@ -132,7 +131,8 @@ public void onStartVersionIngestion(boolean isCurrentVersion) {
132131

133132
if (isCurrentVersion) {
134133
// Unable to convert to prepared statement as table and column names can't be parameterized
135-
String createViewStatement = String.format(createViewStatementTemplate, versionTableName);
134+
String createViewStatement =
135+
String.format(createViewStatementTemplate, storeNameWithoutVersionInfo, versionTableName);
136136
stmt.execute(createViewStatement);
137137
}
138138
} catch (SQLException e) {
@@ -146,7 +146,8 @@ public void onEndVersionIngestion(int currentVersion) {
146146
Statement stmt = connection.createStatement()) {
147147
// Swap to current version
148148
String currentVersionTableName = buildStoreNameWithVersion(currentVersion);
149-
String createViewStatement = String.format(createViewStatementTemplate, currentVersionTableName);
149+
String createViewStatement =
150+
String.format(createViewStatementTemplate, storeNameWithoutVersionInfo, currentVersionTableName);
150151
stmt.execute(createViewStatement);
151152

152153
if (currentVersion != getStoreVersion()) {

integrations/venice-duckdb/src/test/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformerTest.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.venice.duckdb;
22

33
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V1_SCHEMA;
4+
import static com.linkedin.venice.utils.TestWriteUtils.SIMPLE_USER_WITH_DEFAULT_SCHEMA;
45
import static com.linkedin.venice.utils.TestWriteUtils.SINGLE_FIELD_RECORD_SCHEMA;
56
import static org.testng.Assert.assertEquals;
67
import static org.testng.Assert.assertFalse;
@@ -140,7 +141,7 @@ public void testVersionSwap() throws SQLException {
140141

141142
try (Connection connection = DriverManager.getConnection(duckDBUrl);
142143
Statement stmt = connection.createStatement()) {
143-
try (ResultSet rs = stmt.executeQuery("SELECT * FROM current_version")) {
144+
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + storeName)) {
144145
assertTrue(rs.next(), "There should be a first row!");
145146
assertEquals(rs.getString("firstName"), "Duck");
146147
assertEquals(rs.getString("lastName"), "Goose");
@@ -150,12 +151,112 @@ public void testVersionSwap() throws SQLException {
150151
// Swap here
151152
recordTransformer_v1.onEndVersionIngestion(2);
152153

153-
try (ResultSet rs = stmt.executeQuery("SELECT * FROM current_version")) {
154+
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + storeName)) {
154155
assertTrue(rs.next(), "There should be a first row!");
155156
assertEquals(rs.getString("firstName"), "Goose");
156157
assertEquals(rs.getString("lastName"), "Duck");
157158
assertFalse(rs.next(), "There should be only one row!");
158159
}
159160
}
160161
}
162+
163+
@Test
164+
public void testTwoTablesConcurrently() throws SQLException {
165+
String tempDir = Utils.getTempDataDirectory().getAbsolutePath();
166+
167+
String store1 = "store1";
168+
String store2 = "store2";
169+
170+
DuckDBDaVinciRecordTransformer recordTransformerForStore1 = new DuckDBDaVinciRecordTransformer(
171+
1,
172+
SINGLE_FIELD_RECORD_SCHEMA,
173+
NAME_RECORD_V1_SCHEMA,
174+
NAME_RECORD_V1_SCHEMA,
175+
false,
176+
tempDir,
177+
store1,
178+
columnsToProject);
179+
DuckDBDaVinciRecordTransformer recordTransformerForStore2 = new DuckDBDaVinciRecordTransformer(
180+
1,
181+
SIMPLE_USER_WITH_DEFAULT_SCHEMA,
182+
NAME_RECORD_V1_SCHEMA,
183+
NAME_RECORD_V1_SCHEMA,
184+
false,
185+
tempDir,
186+
store2,
187+
columnsToProject);
188+
189+
String duckDBUrl = recordTransformerForStore1.getDuckDBUrl();
190+
191+
recordTransformerForStore1.onStartVersionIngestion(true);
192+
193+
GenericRecord keyRecord = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA);
194+
keyRecord.put("key", "key");
195+
Lazy<GenericRecord> lazyKey = Lazy.of(() -> keyRecord);
196+
197+
GenericRecord valueRecordForStore1 = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
198+
valueRecordForStore1.put("firstName", "Duck");
199+
valueRecordForStore1.put("lastName", "Goose");
200+
Lazy<GenericRecord> lazyValue = Lazy.of(() -> valueRecordForStore1);
201+
recordTransformerForStore1.processPut(lazyKey, lazyValue);
202+
203+
try (Connection connection = DriverManager.getConnection(duckDBUrl);
204+
Statement stmt = connection.createStatement()) {
205+
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + store1)) {
206+
assertTrue(rs.next(), "There should be a first row!");
207+
assertEquals(rs.getString("key"), "key");
208+
assertEquals(rs.getString("firstName"), "Duck");
209+
assertEquals(rs.getString("lastName"), "Goose");
210+
assertFalse(rs.next(), "There should be only one row!");
211+
}
212+
}
213+
214+
recordTransformerForStore2.onStartVersionIngestion(true);
215+
216+
GenericRecord keyRecordForStore2 = new GenericData.Record(SIMPLE_USER_WITH_DEFAULT_SCHEMA);
217+
keyRecordForStore2.put("key", "key");
218+
keyRecordForStore2.put("value", "value");
219+
Lazy<GenericRecord> lazyKeyForStore2 = Lazy.of(() -> keyRecordForStore2);
220+
221+
GenericRecord valueRecordForStore2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
222+
valueRecordForStore2.put("firstName", "Duck2");
223+
valueRecordForStore2.put("lastName", "Goose2");
224+
Lazy<GenericRecord> lazyValueForStore2 = Lazy.of(() -> valueRecordForStore2);
225+
recordTransformerForStore2.processPut(lazyKeyForStore2, lazyValueForStore2);
226+
227+
try (Connection connection = DriverManager.getConnection(duckDBUrl);
228+
Statement stmt = connection.createStatement()) {
229+
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + store1)) {
230+
assertTrue(rs.next(), "There should be a first row!");
231+
assertEquals(rs.getString("key"), "key");
232+
assertEquals(rs.getString("firstName"), "Duck");
233+
assertEquals(rs.getString("lastName"), "Goose");
234+
assertFalse(rs.next(), "There should be only one row!");
235+
}
236+
237+
try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + store2)) {
238+
assertTrue(rs.next(), "There should be a first row!");
239+
assertEquals(rs.getString("key"), "key");
240+
assertEquals(rs.getString("value"), "value");
241+
assertEquals(rs.getString("firstName"), "Duck2");
242+
assertEquals(rs.getString("lastName"), "Goose2");
243+
assertFalse(rs.next(), "There should be only one row!");
244+
}
245+
246+
try (ResultSet rs = stmt.executeQuery(
247+
"SELECT s1.key AS s1key, s1.firstName AS s1FirstName, s1.lastName AS s1LastName, "
248+
+ "s2.key AS s2key, s2.value AS s2value, s2.firstName AS s2FirstName, s2.lastName AS s2LastName "
249+
+ "FROM " + store1 + " s1 JOIN " + store2 + " s2 ON s1.key = s2.key")) {
250+
assertTrue(rs.next(), "There should be a first row!");
251+
assertEquals(rs.getString("s1key"), "key");
252+
assertEquals(rs.getString("s1FirstName"), "Duck");
253+
assertEquals(rs.getString("s1LastName"), "Goose");
254+
assertEquals(rs.getString("s2key"), "key");
255+
assertEquals(rs.getString("s2value"), "value");
256+
assertEquals(rs.getString("s2FirstName"), "Duck2");
257+
assertEquals(rs.getString("s2LastName"), "Goose2");
258+
assertFalse(rs.next(), "There should be only one row!");
259+
}
260+
}
261+
}
161262
}

0 commit comments

Comments
 (0)