Skip to content

Commit 5dfe33b

Browse files
authored
[duckdb] Fixed delete bug in DuckDB DVRT (#1474)
Delete used to work only for key schemas that had a single string field named "key". Now it works for any schema. Miscellaneous: - Refactored the InsertProcessor code to be more generic. - Bumped DuckDB snapshot dep to the latest.
1 parent 47cb541 commit 5dfe33b

File tree

13 files changed

+268
-131
lines changed

13 files changed

+268
-131
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ ext.libraries = [
8282
commonsLang: 'commons-lang:commons-lang:2.6',
8383
conscrypt: 'org.conscrypt:conscrypt-openjdk-uber:2.5.2',
8484
d2: "com.linkedin.pegasus:d2:${pegasusVersion}",
85-
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250121.012246-127", // TODO: Remove SNAPSHOT when the real release is published!
85+
duckdbJdbc: "org.duckdb:duckdb_jdbc:1.2.0-20250124.011319-133", // TODO: Remove SNAPSHOT when the real release is published!
8686
failsafe: 'net.jodah:failsafe:2.4.0',
8787
fastUtil: 'it.unimi.dsi:fastutil:8.3.0',
8888
grpcNettyShaded: "io.grpc:grpc-netty-shaded:${grpcVersion}",

integrations/venice-duckdb/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies {
1616
testImplementation project(':internal:venice-common')
1717

1818
integrationTestImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
19+
integrationTestImplementation project(':clients:venice-producer')
1920
integrationTestImplementation project(':clients:venice-push-job')
2021
integrationTestImplementation project(':clients:venice-thin-client')
2122
integrationTestImplementation project(':integrations:venice-duckdb')

integrations/venice-duckdb/src/integrationTest/java/com/linkedin/venice/endToEnd/DuckDBDaVinciRecordTransformerIntegrationTest.java

+46-15
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@
3131
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
3232
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
3333
import com.linkedin.venice.D2.D2ClientUtils;
34+
import com.linkedin.venice.client.store.ClientConfig;
3435
import com.linkedin.venice.compression.CompressionStrategy;
3536
import com.linkedin.venice.controllerapi.ControllerClient;
3637
import com.linkedin.venice.controllerapi.SchemaResponse;
3738
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
3839
import com.linkedin.venice.duckdb.DuckDBDaVinciRecordTransformer;
3940
import com.linkedin.venice.integration.utils.ServiceFactory;
41+
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
4042
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
4143
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
44+
import com.linkedin.venice.producer.online.OnlineProducerFactory;
45+
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
4246
import com.linkedin.venice.utils.PropertyBuilder;
4347
import com.linkedin.venice.utils.PushInputSchemaBuilder;
4448
import com.linkedin.venice.utils.TestUtils;
@@ -91,20 +95,24 @@ public void setUp() {
9195
clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);
9296
clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true);
9397
clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3);
94-
cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, false, false, clusterConfig);
95-
d2Client = new D2ClientBuilder().setZkHosts(cluster.getZk().getAddress())
98+
this.cluster = ServiceFactory.getVeniceCluster(
99+
new VeniceClusterCreateOptions.Builder().numberOfControllers(1)
100+
.numberOfServers(2)
101+
.numberOfRouters(1)
102+
.replicationFactor(2)
103+
.extraProperties(clusterConfig)
104+
.build());
105+
this.d2Client = new D2ClientBuilder().setZkHosts(this.cluster.getZk().getAddress())
96106
.setZkSessionTimeout(3, TimeUnit.SECONDS)
97107
.setZkStartupTimeout(3, TimeUnit.SECONDS)
98108
.build();
99-
D2ClientUtils.startClient(d2Client);
109+
D2ClientUtils.startClient(this.d2Client);
100110
}
101111

102112
@AfterClass
103113
public void cleanUp() {
104-
if (d2Client != null) {
105-
D2ClientUtils.shutdownClient(d2Client);
106-
}
107-
Utils.closeQuietlyWithErrorLogged(cluster);
114+
D2ClientUtils.shutdownClient(this.d2Client);
115+
Utils.closeQuietlyWithErrorLogged(this.cluster);
108116
}
109117

110118
@BeforeMethod
@@ -158,24 +166,40 @@ public void testRecordTransformer() throws Exception {
158166

159167
clientWithRecordTransformer.subscribeAll().get();
160168

161-
assertRowCount(duckDBUrl, storeName, "subscribeAll() finishes!");
169+
assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT, "subscribeAll() finishes!");
170+
171+
try (OnlineVeniceProducer producer = OnlineProducerFactory.createProducer(
172+
ClientConfig.defaultGenericClientConfig(storeName)
173+
.setD2Client(d2Client)
174+
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME),
175+
VeniceProperties.empty(),
176+
null)) {
177+
producer.asyncDelete(getKey(1)).get();
178+
}
179+
180+
TestUtils.waitForNonDeterministicAssertion(
181+
10,
182+
TimeUnit.SECONDS,
183+
true,
184+
() -> assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT - 1, "deleting 1 row!"));
162185

163186
clientWithRecordTransformer.unsubscribeAll();
164187
}
165188

166-
assertRowCount(duckDBUrl, storeName, "DVC gets closed!");
189+
assertRowCount(duckDBUrl, storeName, DEFAULT_USER_DATA_RECORD_COUNT - 1, "DVC gets closed!");
167190
}
168191

169-
private void assertRowCount(String duckDBUrl, String storeName, String assertionErrorMsg) throws SQLException {
192+
private void assertRowCount(String duckDBUrl, String storeName, int expectedCount, String assertionErrorMsg)
193+
throws SQLException {
170194
try (Connection connection = DriverManager.getConnection(duckDBUrl);
171195
Statement statement = connection.createStatement();
172196
ResultSet rs = statement.executeQuery("SELECT count(*) FROM " + storeName)) {
173197
assertTrue(rs.next());
174198
int rowCount = rs.getInt(1);
175199
assertEquals(
176200
rowCount,
177-
DEFAULT_USER_DATA_RECORD_COUNT,
178-
"The DB should contain " + DEFAULT_USER_DATA_RECORD_COUNT + " right after " + assertionErrorMsg);
201+
expectedCount,
202+
"The DB should contain " + expectedCount + " rows right after " + assertionErrorMsg);
179203
}
180204
}
181205

@@ -195,8 +219,7 @@ protected void setUpStore(
195219
String lastName = "last_name_";
196220
Schema valueSchema = writeSimpleAvroFile(inputDir, pushRecordSchema, i -> {
197221
GenericRecord keyValueRecord = new GenericData.Record(pushRecordSchema);
198-
GenericRecord key = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA);
199-
key.put("key", i.toString());
222+
GenericRecord key = getKey(i);
200223
keyValueRecord.put(DEFAULT_KEY_FIELD_PROP, key);
201224
GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
202225
valueRecord.put("firstName", firstName + i);
@@ -214,7 +237,9 @@ protected void setUpStore(
214237
final int numPartitions = 3;
215238
UpdateStoreQueryParams params = new UpdateStoreQueryParams().setPartitionCount(numPartitions)
216239
.setChunkingEnabled(chunkingEnabled)
217-
.setCompressionStrategy(compressionStrategy);
240+
.setCompressionStrategy(compressionStrategy)
241+
.setHybridOffsetLagThreshold(10)
242+
.setHybridRewindSeconds(1);
218243

219244
paramsConsumer.accept(params);
220245

@@ -231,6 +256,12 @@ protected void setUpStore(
231256
}
232257
}
233258

259+
private GenericRecord getKey(Integer i) {
260+
GenericRecord key = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA);
261+
key.put("key", i.toString());
262+
return key;
263+
}
264+
234265
private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, VeniceClusterWrapper cluster) {
235266
long vpjStart = System.currentTimeMillis();
236267
String jobName = Utils.getUniqueString("batch-job-" + expectedVersionNumber);

integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java

+9-17
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
77
import com.linkedin.venice.exceptions.VeniceException;
88
import com.linkedin.venice.sql.AvroToSQL;
9-
import com.linkedin.venice.sql.InsertProcessor;
9+
import com.linkedin.venice.sql.PreparedStatementProcessor;
1010
import com.linkedin.venice.sql.SQLUtils;
1111
import com.linkedin.venice.sql.TableDefinition;
1212
import com.linkedin.venice.utils.concurrent.CloseableThreadLocal;
1313
import com.linkedin.venice.utils.lazy.Lazy;
14-
import java.io.IOException;
1514
import java.sql.Connection;
1615
import java.sql.DriverManager;
1716
import java.sql.PreparedStatement;
@@ -28,7 +27,6 @@ public class DuckDBDaVinciRecordTransformer
2827
extends DaVinciRecordTransformer<GenericRecord, GenericRecord, GenericRecord> {
2928
private static final Logger LOGGER = LogManager.getLogger(DuckDBDaVinciRecordTransformer.class);
3029
private static final String duckDBFilePath = "my_database.duckdb";
31-
private static final String deleteStatementTemplate = "DELETE FROM %s WHERE %s = ?;";
3230
private static final String createViewStatementTemplate = "CREATE OR REPLACE VIEW %s AS SELECT * FROM %s;";
3331
private static final String dropTableStatementTemplate = "DROP TABLE %s;";
3432
private final String storeNameWithoutVersionInfo;
@@ -38,7 +36,8 @@ public class DuckDBDaVinciRecordTransformer
3836
private final CloseableThreadLocal<Connection> connection;
3937
private final CloseableThreadLocal<PreparedStatement> deletePreparedStatement;
4038
private final CloseableThreadLocal<PreparedStatement> upsertPreparedStatement;
41-
private final InsertProcessor upsertProcessor;
39+
private final PreparedStatementProcessor upsertProcessor;
40+
private final PreparedStatementProcessor deleteProcessor;
4241

4342
public DuckDBDaVinciRecordTransformer(
4443
int storeVersion,
@@ -54,8 +53,7 @@ public DuckDBDaVinciRecordTransformer(
5453
this.versionTableName = buildStoreNameWithVersion(storeVersion);
5554
this.duckDBUrl = "jdbc:duckdb:" + baseDir + "/" + duckDBFilePath;
5655
this.columnsToProject = columnsToProject;
57-
String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key"); // TODO: Fix this, it is
58-
// broken
56+
String deleteStatement = AvroToSQL.deleteStatement(versionTableName, keySchema);
5957
String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, inputValueSchema, columnsToProject);
6058
this.connection = CloseableThreadLocal.withInitial(() -> {
6159
try {
@@ -68,17 +66,18 @@ public DuckDBDaVinciRecordTransformer(
6866
try {
6967
return this.connection.get().prepareStatement(deleteStatement);
7068
} catch (SQLException e) {
71-
throw new VeniceException("Failed to create PreparedStatement!", e);
69+
throw new VeniceException("Failed to create PreparedStatement for: " + deleteStatement, e);
7270
}
7371
});
7472
this.upsertPreparedStatement = CloseableThreadLocal.withInitial(() -> {
7573
try {
7674
return this.connection.get().prepareStatement(upsertStatement);
7775
} catch (SQLException e) {
78-
throw new VeniceException("Failed to create PreparedStatement!", e);
76+
throw new VeniceException("Failed to create PreparedStatement for: " + upsertStatement, e);
7977
}
8078
});
8179
this.upsertProcessor = AvroToSQL.upsertProcessor(keySchema, inputValueSchema, columnsToProject);
80+
this.deleteProcessor = AvroToSQL.deleteProcessor(keySchema);
8281
}
8382

8483
@Override
@@ -95,14 +94,7 @@ public void processPut(Lazy<GenericRecord> key, Lazy<GenericRecord> value) {
9594

9695
@Override
9796
public void processDelete(Lazy<GenericRecord> key) {
98-
try {
99-
PreparedStatement stmt = this.deletePreparedStatement.get();
100-
// TODO: Fix this, it is broken.
101-
stmt.setString(1, key.get().get("key").toString());
102-
stmt.execute();
103-
} catch (SQLException e) {
104-
throw new VeniceException("Failed to execute delete!");
105-
}
97+
this.deleteProcessor.process(key.get(), null, this.deletePreparedStatement.get());
10698
}
10799

108100
@Override
@@ -176,7 +168,7 @@ public String buildStoreNameWithVersion(int version) {
176168
}
177169

178170
@Override
179-
public void close() throws IOException {
171+
public void close() {
180172
this.deletePreparedStatement.close();
181173
this.upsertPreparedStatement.close();
182174
this.connection.close();

integrations/venice-duckdb/src/main/java/com/linkedin/venice/sql/AvroToSQL.java

+35-2
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,44 @@ public static String upsertStatement(
133133
}
134134

135135
@Nonnull
136-
public static InsertProcessor upsertProcessor(
136+
public static PreparedStatementProcessor upsertProcessor(
137137
@Nonnull Schema keySchema,
138138
@Nonnull Schema valueSchema,
139139
@Nonnull Set<String> columnsToProject) {
140-
return new InsertProcessor(keySchema, valueSchema, columnsToProject);
140+
return new KeyValuePreparedStatementProcessor(keySchema, valueSchema, columnsToProject);
141+
}
142+
143+
@Nonnull
144+
public static String deleteStatement(@Nonnull String tableName, @Nonnull Schema keySchema) {
145+
StringBuffer stringBuffer = new StringBuffer();
146+
stringBuffer.append("DELETE FROM " + SQLUtils.cleanTableName(tableName) + " WHERE ");
147+
boolean firstColumn = true;
148+
149+
for (Schema.Field field: keySchema.getFields()) {
150+
JDBCType correspondingType = getCorrespondingType(field);
151+
if (correspondingType == null) {
152+
// Skipped field.
153+
throw new IllegalArgumentException(
154+
"All types from the key schema must be supported, but field '" + field.name() + "' is of type: "
155+
+ field.schema().getType());
156+
}
157+
158+
if (firstColumn) {
159+
firstColumn = false;
160+
} else {
161+
stringBuffer.append(" AND ");
162+
}
163+
stringBuffer.append(SQLUtils.cleanColumnName(field.name()));
164+
stringBuffer.append(" = ?");
165+
}
166+
stringBuffer.append(";");
167+
168+
return stringBuffer.toString();
169+
}
170+
171+
@Nonnull
172+
public static PreparedStatementProcessor deleteProcessor(@Nonnull Schema keySchema) {
173+
return new KeyOnlyPreparedStatementProcessor(keySchema);
141174
}
142175

143176
@Nullable

integrations/venice-duckdb/src/main/java/com/linkedin/venice/sql/InsertProcessor.java integrations/venice-duckdb/src/main/java/com/linkedin/venice/sql/KeyOnlyPreparedStatementProcessor.java

+25-44
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.venice.sql;
22

3+
import com.linkedin.venice.exceptions.VeniceException;
34
import com.linkedin.venice.utils.ByteUtils;
45
import java.nio.ByteBuffer;
56
import java.sql.JDBCType;
@@ -14,70 +15,47 @@
1415
import org.apache.avro.generic.GenericRecord;
1516

1617

17-
public class InsertProcessor {
18+
/** This class provides plumbing to plug the fields of Avro records into a {@link PreparedStatement}. */
19+
public class KeyOnlyPreparedStatementProcessor implements PreparedStatementProcessor {
1820
private final int[] keyFieldIndexToJdbcIndexMapping;
19-
private final int[] valueFieldIndexToJdbcIndexMapping;
2021
private final int[] keyFieldIndexToUnionBranchIndex;
21-
private final int[] valueFieldIndexToUnionBranchIndex;
2222
private final JDBCType[] keyFieldIndexToCorrespondingType;
23-
private final JDBCType[] valueFieldIndexToCorrespondingType;
2423

25-
InsertProcessor(@Nonnull Schema keySchema, @Nonnull Schema valueSchema, @Nonnull Set<String> columnsToProject) {
26-
Objects.requireNonNull(keySchema);
27-
Objects.requireNonNull(valueSchema);
28-
Objects.requireNonNull(columnsToProject);
29-
30-
int keyFieldCount = keySchema.getFields().size();
24+
KeyOnlyPreparedStatementProcessor(@Nonnull Schema keySchema) {
25+
int keyFieldCount = Objects.requireNonNull(keySchema).getFields().size();
3126
this.keyFieldIndexToJdbcIndexMapping = new int[keyFieldCount];
3227
this.keyFieldIndexToUnionBranchIndex = new int[keyFieldCount];
3328
this.keyFieldIndexToCorrespondingType = new JDBCType[keyFieldCount];
3429

35-
int valueFieldCount = valueSchema.getFields().size();
36-
this.valueFieldIndexToJdbcIndexMapping = new int[valueFieldCount];
37-
this.valueFieldIndexToUnionBranchIndex = new int[valueFieldCount];
38-
this.valueFieldIndexToCorrespondingType = new JDBCType[valueFieldCount];
39-
40-
// N.B.: JDBC indices start at 1, not at 0.
41-
int index = 1;
42-
index = populateArrays(
43-
index,
30+
populateArrays(
31+
1, // N.B.: JDBC indices start at 1, not at 0.
4432
keySchema,
4533
this.keyFieldIndexToJdbcIndexMapping,
4634
this.keyFieldIndexToUnionBranchIndex,
4735
this.keyFieldIndexToCorrespondingType,
4836
Collections.emptySet()); // N.B.: All key columns must be projected.
49-
populateArrays(
50-
index, // N.B.: The same index value needs to carry over from key to value columns.
51-
valueSchema,
52-
this.valueFieldIndexToJdbcIndexMapping,
53-
this.valueFieldIndexToUnionBranchIndex,
54-
this.valueFieldIndexToCorrespondingType,
55-
columnsToProject);
5637
}
5738

39+
@Override
5840
public void process(GenericRecord key, GenericRecord value, PreparedStatement preparedStatement) {
5941
try {
60-
processRecord(
61-
key,
62-
preparedStatement,
63-
this.keyFieldIndexToJdbcIndexMapping,
64-
this.keyFieldIndexToUnionBranchIndex,
65-
this.keyFieldIndexToCorrespondingType);
66-
67-
processRecord(
68-
value,
69-
preparedStatement,
70-
this.valueFieldIndexToJdbcIndexMapping,
71-
this.valueFieldIndexToUnionBranchIndex,
72-
this.valueFieldIndexToCorrespondingType);
73-
42+
processKey(key, preparedStatement);
7443
preparedStatement.execute();
7544
} catch (SQLException e) {
76-
throw new RuntimeException(e);
45+
throw new VeniceException("Failed to execute prepared statement!", e);
7746
}
7847
}
7948

80-
private int populateArrays(
49+
protected void processKey(GenericRecord key, PreparedStatement preparedStatement) throws SQLException {
50+
processRecord(
51+
key,
52+
preparedStatement,
53+
this.keyFieldIndexToJdbcIndexMapping,
54+
this.keyFieldIndexToUnionBranchIndex,
55+
this.keyFieldIndexToCorrespondingType);
56+
}
57+
58+
protected void populateArrays(
8159
int index,
8260
@Nonnull Schema schema,
8361
@Nonnull int[] avroFieldIndexToJdbcIndexMapping,
@@ -108,10 +86,13 @@ private int populateArrays(
10886
}
10987
}
11088
}
111-
return index;
11289
}
11390

114-
private void processRecord(
91+
protected final int getLastKeyJdbcIndex() {
92+
return this.keyFieldIndexToJdbcIndexMapping[this.keyFieldIndexToJdbcIndexMapping.length - 1];
93+
}
94+
95+
protected void processRecord(
11596
GenericRecord record,
11697
PreparedStatement preparedStatement,
11798
int[] avroFieldIndexToJdbcIndexMapping,

0 commit comments

Comments
 (0)