Skip to content

Rewrite the "sync_local" query #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 44 additions & 41 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ impl<'a> SyncOperation<'a> {
while statement.step().into_db_result(self.db)? == ResultCode::ROW {
let type_name = statement.column_text(0)?;
let id = statement.column_text(1)?;
let buckets = statement.column_int(3);
let data = statement.column_text(2);

let table_name = internal_table_name(type_name);

if self.data_tables.contains(&table_name) {
let quoted = quote_internal_name(type_name, false);

if buckets == 0 {
// is_err() is essentially a NULL check here.
// NULL data means no PUT operations found, so we delete the row.
if data.is_err() {
// DELETE
let delete_statement = self
.db
Expand All @@ -134,7 +135,7 @@ impl<'a> SyncOperation<'a> {
insert_statement.exec()?;
}
} else {
if buckets == 0 {
if data.is_err() {
// DELETE
// language=SQLite
let delete_statement = self
Expand Down Expand Up @@ -185,32 +186,29 @@ impl<'a> SyncOperation<'a> {
Ok(match &self.partial {
None => {
// Complete sync
// See dart/test/sync_local_performance_test.dart for an annotated version of this query.
self.db
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
WITH updated_rows AS (
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION SELECT row_type, row_id FROM ps_updated_rows
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)

-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id

) as data
FROM updated_rows b
GROUP BY b.row_type, b.row_id;",
)
.into_db_result(self.db)?
}
Expand All @@ -220,33 +218,38 @@ GROUP BY b.row_type, b.row_id",
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH
involved_buckets (id) AS MATERIALIZED (
SELECT id FROM ps_buckets WHERE ?1 IS NULL
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
),
updated_rows AS (
SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
WHERE buckets.id IN (SELECT id FROM involved_buckets)
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
WHERE buckets.id IN (SELECT id FROM involved_buckets)
)

-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
AND r.bucket IN (SELECT id FROM involved_buckets)
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
AND r.bucket IN (SELECT id FROM involved_buckets)

) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;",
)
.into_db_result(self.db)?;
stmt.bind_text(1, partial.args, Destructor::STATIC)?;
Expand Down
4 changes: 2 additions & 2 deletions dart/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ packages:
dependency: "direct main"
description:
name: sqlite3
sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e"
sha256: c0503c69b44d5714e6abbf4c1f51a3c3cc42b75ce785f44404765e4635481d38
url: "https://pub.dev"
source: hosted
version: "2.7.5"
version: "2.7.6"
sqlite3_test:
dependency: "direct dev"
description:
Expand Down
4 changes: 2 additions & 2 deletions dart/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ description: Tests for powersync-sqlite-core
environment:
sdk: ^3.4.0
dependencies:
sqlite3: ^2.4.5
sqlite3: ^2.7.6
dev_dependencies:
test: ^1.25.0
file: ^7.0.1
sqlite3_test: ^0.1.1
fake_async: ^1.3.3
fake_async: ^1.3.3
6 changes: 4 additions & 2 deletions dart/test/js_key_encoding_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import 'package:test/test.dart';
import 'utils/native_test_utils.dart';

void main() {
final vfs = TestSqliteFileSystem(fs: const LocalFileSystem());
// Needs an unique name per test file to avoid concurrency issues
final vfs = TestSqliteFileSystem(
fs: const LocalFileSystem(), name: 'js-key-encoding-test-vfs');
late CommonDatabase db;

setUpAll(() {
Expand All @@ -19,7 +21,7 @@ void main() {
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));

setUp(() async {
db = openTestDatabase(vfs)
db = openTestDatabase(vfs: vfs)
..select('select powersync_init();')
..select('select powersync_replace_schema(?)', [json.encode(_schema)]);
});
Expand Down
Loading