Skip to content

Rewrite the "sync_local" query #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 49 additions & 39 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ impl<'a> SyncOperation<'a> {
while statement.step().into_db_result(self.db)? == ResultCode::ROW {
let type_name = statement.column_text(0)?;
let id = statement.column_text(1)?;
let buckets = statement.column_int(3);
let data = statement.column_text(2);

let table_name = internal_table_name(type_name);

if self.data_tables.contains(&table_name) {
let quoted = quote_internal_name(type_name, false);

if buckets == 0 {
// is_err() is essentially a NULL check here.
// NULL data means no PUT operations found, so we delete the row.
if data.is_err() {
// DELETE
let delete_statement = self
.db
Expand All @@ -134,7 +135,7 @@ impl<'a> SyncOperation<'a> {
insert_statement.exec()?;
}
} else {
if buckets == 0 {
if data.is_err() {
// DELETE
// language=SQLite
let delete_statement = self
Expand Down Expand Up @@ -189,28 +190,32 @@ impl<'a> SyncOperation<'a> {
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH updated_rows AS (
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION SELECT row_type, row_id FROM ps_updated_rows
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)

-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id

) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;",
)
.into_db_result(self.db)?
}
Expand All @@ -220,33 +225,38 @@ GROUP BY b.row_type, b.row_id",
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH
involved_buckets (id) AS MATERIALIZED (
SELECT id FROM ps_buckets WHERE ?1 IS NULL
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
),
updated_rows AS (
SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
WHERE buckets.id IN (SELECT id FROM involved_buckets)
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
WHERE buckets.id IN (SELECT id FROM involved_buckets)
)

-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
AND r.bucket IN (SELECT id FROM involved_buckets)
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
AND r.bucket IN (SELECT id FROM involved_buckets)

) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;",
)
.into_db_result(self.db)?;
stmt.bind_text(1, partial.args, Destructor::STATIC)?;
Expand Down
4 changes: 2 additions & 2 deletions dart/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ packages:
dependency: "direct main"
description:
name: sqlite3
sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e"
sha256: c0503c69b44d5714e6abbf4c1f51a3c3cc42b75ce785f44404765e4635481d38
url: "https://pub.dev"
source: hosted
version: "2.7.5"
version: "2.7.6"
sqlite3_test:
dependency: "direct dev"
description:
Expand Down
4 changes: 2 additions & 2 deletions dart/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ description: Tests for powersync-sqlite-core
environment:
sdk: ^3.4.0
dependencies:
sqlite3: ^2.4.5
sqlite3: ^2.7.6
dev_dependencies:
test: ^1.25.0
file: ^7.0.1
sqlite3_test: ^0.1.1
fake_async: ^1.3.3
fake_async: ^1.3.3
6 changes: 4 additions & 2 deletions dart/test/js_key_encoding_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import 'package:test/test.dart';
import 'utils/native_test_utils.dart';

void main() {
final vfs = TestSqliteFileSystem(fs: const LocalFileSystem());
// Needs an unique name per test file to avoid concurrency issues
final vfs = TestSqliteFileSystem(
fs: const LocalFileSystem(), name: 'js-key-encoding-test-vfs');
late CommonDatabase db;

setUpAll(() {
Expand All @@ -19,7 +21,7 @@ void main() {
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));

setUp(() async {
db = openTestDatabase(vfs)
db = openTestDatabase(vfs: vfs)
..select('select powersync_init();')
..select('select powersync_replace_schema(?)', [json.encode(_schema)]);
});
Expand Down
134 changes: 134 additions & 0 deletions dart/test/perf_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import 'dart:convert';

import 'package:sqlite3/common.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:test/test.dart';

import 'utils/native_test_utils.dart';
import 'utils/tracking_vfs.dart';
import './schema_test.dart' show schema;

// These test how many filesystem reads and writes are performed during sync_local.
// The real world performane of filesystem operations depend a lot on the specific system.
// For example, on native desktop systems, the performance of temporary filesystem storage could
// be close to memory performance. However, on web and mobile, (temporary) filesystem operations
// could drastically slow down performance. So rather than only testing the real time for these
// queries, we count the number of filesystem operations.
void testFilesystemOperations(
{bool unique = true,
int count = 200000,
int alreadyApplied = 10000,
int buckets = 10}) {
late TrackingFileSystem vfs;
late CommonDatabase db;

setUpAll(() {
loadExtension();
});

setUp(() async {
// Needs an unique name per test file to avoid concurrency issues
vfs = new TrackingFileSystem(
parent: new InMemoryFileSystem(), name: 'perf-test-vfs');
sqlite3.registerVirtualFileSystem(vfs, makeDefault: false);
db = openTestDatabase(vfs: vfs, fileName: 'test.db');
});

tearDown(() {
db.dispose();
sqlite3.unregisterVirtualFileSystem(vfs);
});

setUp(() {
db.execute('SELECT powersync_replace_schema(?)', [json.encode(schema)]);
// Generate dummy data
// We can replace this with actual similated download operations later
db.execute('''
BEGIN TRANSACTION;

WITH RECURSIVE generate_rows(n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM generate_rows WHERE n < $count
)
INSERT INTO ps_oplog (bucket, op_id, row_type, row_id, key, data, hash)
SELECT
(n % $buckets), -- Generate n different buckets
n,
'assets',
${unique ? 'uuid()' : "'duplicated_id'"},
uuid(),
'{"description": "' || n || '", "make": "test", "model": "this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. "}',
(n * 17) % 1000000000 -- Some pseudo-random hash

FROM generate_rows;

WITH RECURSIVE generate_bucket_rows(n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM generate_bucket_rows WHERE n < $buckets
)
INSERT INTO ps_buckets (id, name, last_applied_op)
SELECT
(n % $buckets),
'bucket' || n,
$alreadyApplied -- simulate a percentage of operations previously applied

FROM generate_bucket_rows;

COMMIT;
''');
print('init stats: ${vfs.stats()}');

vfs.clearStats();
});

test('sync_local (full)', () {
var timer = Stopwatch()..start();
db.select('insert into powersync_operations(op, data) values(?, ?)',
['sync_local', '']);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');

// These are fairly generous limits, to catch significant regressions only.
expect(vfs.tempWrites, lessThan(count / 50));
expect(timer.elapsed,
lessThan(Duration(milliseconds: 100 + (count / 50).round())));
});

test('sync_local (partial)', () {
var timer = Stopwatch()..start();
db.select('insert into powersync_operations(op, data) values(?, ?)', [
'sync_local',
jsonEncode({
'buckets': ['bucket0', 'bucket3', 'bucket4', 'bucket5', 'bucket6'],
'priority': 2
})
]);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
expect(vfs.tempWrites, lessThan(count / 50));
expect(timer.elapsed,
lessThan(Duration(milliseconds: 100 + (count / 50).round())));
});
}

main() {
group('test filesystem operations with unique ids', () {
testFilesystemOperations(
unique: true, count: 500000, alreadyApplied: 10000, buckets: 10);
});
group('test filesytem operations with duplicate ids', () {
// If this takes more than a couple of milliseconds to complete, there is a performance bug
testFilesystemOperations(
unique: false, count: 5000, alreadyApplied: 1000, buckets: 10);
});

group('test filesystem operations with a small number of changes', () {
testFilesystemOperations(
unique: true, count: 100000, alreadyApplied: 95000, buckets: 10);
});

group('test filesystem operations with a large number of buckets', () {
testFilesystemOperations(
unique: true, count: 100000, alreadyApplied: 10000, buckets: 1000);
});
}
6 changes: 4 additions & 2 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import 'package:test/test.dart';
import 'utils/native_test_utils.dart';

void main() {
final vfs = TestSqliteFileSystem(fs: const LocalFileSystem());
// Needs an unique name per test file to avoid concurrency issues
final vfs =
TestSqliteFileSystem(fs: const LocalFileSystem(), name: 'sync-test-vfs');

setUpAll(() {
loadExtension();
Expand All @@ -22,7 +24,7 @@ void main() {
late CommonDatabase db;

setUp(() async {
db = openTestDatabase(vfs)
db = openTestDatabase(vfs: vfs)
..select('select powersync_init();')
..select('select powersync_replace_schema(?)', [json.encode(_schema)]);
});
Expand Down
5 changes: 3 additions & 2 deletions dart/test/utils/native_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ void applyOpenOverride() {
});
}

CommonDatabase openTestDatabase([VirtualFileSystem? vfs]) {
CommonDatabase openTestDatabase(
{VirtualFileSystem? vfs, String fileName = ':memory:'}) {
applyOpenOverride();
if (!didLoadExtension) {
loadExtension();
}

return sqlite3.open(':memory:', vfs: vfs?.name);
return sqlite3.open(fileName, vfs: vfs?.name);
}

void loadExtension() {
Expand Down
Loading