Skip to content

Commit 1d3e7dd

Browse files
committed
Use int bucket ids.
1 parent 819db43 commit 1d3e7dd

File tree

3 files changed

+49
-45
lines changed

3 files changed

+49
-45
lines changed

crates/core/src/operations.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,27 @@ FROM json_each(?) e",
5757
)?;
5858
iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
5959

60+
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
61+
// We can consider splitting this into separate SELECT and INSERT statements.
62+
// language=SQLite
63+
let bucket_statement = db.prepare_v2(
64+
"INSERT INTO ps_buckets(name)
65+
VALUES(?)
66+
ON CONFLICT DO UPDATE
67+
SET last_applied_op = last_applied_op
68+
RETURNING id, last_applied_op",
69+
)?;
70+
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
71+
bucket_statement.step()?;
72+
73+
let bucket_id = bucket_statement.column_int64(0)?;
74+
75+
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
76+
// operations when last_applied_op = 0.
77+
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
78+
// operation can supersede another PUT operation we're syncing at the same time.
79+
let mut last_applied_op = bucket_statement.column_int64(1)?;
80+
6081
// Statement to supersede (replace) operations with the same key.
6182
// language=SQLite
6283
let supersede_statement = db.prepare_v2(
@@ -66,37 +87,18 @@ DELETE FROM ps_oplog
6687
AND ps_oplog.key = ?2
6788
RETURNING op_id, hash",
6889
)?;
69-
supersede_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
90+
supersede_statement.bind_int64(1, bucket_id)?;
7091

7192
// language=SQLite
7293
let insert_statement = db.prepare_v2("\
7394
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
74-
insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
95+
insert_statement.bind_int64(1, bucket_id)?;
7596

7697
let updated_row_statement = db.prepare_v2(
7798
"\
7899
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
79100
)?;
80101

81-
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
82-
// We can consider splitting this into separate SELECT and INSERT statements.
83-
// language=SQLite
84-
let bucket_statement = db.prepare_v2(
85-
"INSERT INTO ps_buckets(name)
86-
VALUES(?)
87-
ON CONFLICT DO UPDATE
88-
SET last_applied_op = last_applied_op
89-
RETURNING last_applied_op",
90-
)?;
91-
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
92-
bucket_statement.step()?;
93-
94-
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
95-
// operations when last_applied_op = 0.
96-
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
97-
// operation can supersede another PUT operation we're syncing at the same time.
98-
let mut last_applied_op = bucket_statement.column_int64(0)?;
99-
100102
bucket_statement.reset()?;
101103

102104
let mut last_op: Option<i64> = None;
@@ -204,22 +206,22 @@ FROM ps_oplog
204206
WHERE bucket = ?1",
205207
)
206208
.into_db_result(db)?;
207-
clear_statement1.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
209+
clear_statement1.bind_int64(1, bucket_id)?;
208210
clear_statement1.exec()?;
209211

210212
let clear_statement2 = db
211213
.prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1")
212214
.into_db_result(db)?;
213-
clear_statement2.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
215+
clear_statement2.bind_int64(1, bucket_id)?;
214216
clear_statement2.exec()?;
215217

216218
// And we need to re-apply all of those.
217219
// We also replace the checksum with the checksum of the CLEAR op.
218220
// language=SQLite
219221
let clear_statement2 = db.prepare_v2(
220-
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2",
222+
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
221223
)?;
222-
clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?;
224+
clear_statement2.bind_int64(2, bucket_id)?;
223225
clear_statement2.bind_int(1, checksum)?;
224226
clear_statement2.exec()?;
225227

@@ -236,9 +238,9 @@ WHERE bucket = ?1",
236238
SET last_op = ?2,
237239
add_checksum = (add_checksum + ?3) & 0xffffffff,
238240
op_checksum = (op_checksum + ?4) & 0xffffffff
239-
WHERE name = ?1",
241+
WHERE id = ?1",
240242
)?;
241-
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
243+
statement.bind_int64(1, bucket_id)?;
242244
statement.bind_int64(2, *last_op)?;
243245
statement.bind_int(3, add_checksum)?;
244246
statement.bind_int(4, op_checksum)?;
@@ -263,26 +265,28 @@ pub fn delete_pending_buckets(db: *mut sqlite::sqlite3, _data: &str) -> Result<(
263265

264266
pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> {
265267
// language=SQLite
266-
let statement = db.prepare_v2(
267-
"\
268+
let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?;
269+
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
270+
271+
if statement.step()? == ResultCode::ROW {
272+
let bucket_id = statement.column_int64(0)?;
273+
274+
// language=SQLite
275+
let updated_statement = db.prepare_v2(
276+
"\
268277
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
269278
SELECT row_type, row_id
270279
FROM ps_oplog
271280
WHERE bucket = ?1",
272-
)?;
273-
statement.bind_text(1, &name, sqlite::Destructor::STATIC)?;
274-
statement.exec()?;
275-
276-
// Rename bucket
277-
// language=SQLite
278-
let statement = db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?;
279-
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
280-
statement.exec()?;
281+
)?;
282+
updated_statement.bind_int64(1, bucket_id)?;
283+
updated_statement.exec()?;
281284

282-
// language=SQLite
283-
let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1")?;
284-
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
285-
statement.exec()?;
285+
// language=SQLite
286+
let delete_statement = db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?;
287+
delete_statement.bind_int64(1, bucket_id)?;
288+
delete_statement.exec()?;
289+
}
286290

287291
Ok(())
288292
}

crates/core/src/sync_local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteEr
6767
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
6868
WITH updated_rows AS (
6969
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
70-
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.name
70+
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
7171
AND (b.op_id > buckets.last_applied_op)
7272
UNION SELECT row_type, row_id FROM ps_updated_rows
7373
)

crates/core/src/view_admin.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ CREATE TABLE ps_buckets(
286286
add_checksum INTEGER NOT NULL DEFAULT 0,
287287
op_checksum INTEGER NOT NULL DEFAULT 0,
288288
pending_delete INTEGER NOT NULL DEFAULT 0
289-
);
289+
) STRICT;
290290
291291
CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name);
292292
@@ -297,7 +297,7 @@ CREATE TABLE ps_oplog(
297297
row_id TEXT,
298298
key TEXT,
299299
data TEXT,
300-
hash INTEGER NOT NULL);
300+
hash INTEGER NOT NULL) STRICT;
301301
302302
CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id);
303303
CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id);

0 commit comments

Comments
 (0)