Skip to content

Track downloaded operations in ps_buckets #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ inherits = "release"
inherits = "wasm"

[workspace.package]
version = "0.3.12"
version = "0.3.13"
edition = "2021"
authors = ["JourneyApps"]
keywords = ["sqlite", "powersync"]
Expand Down
2 changes: 1 addition & 1 deletion android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = "co.powersync"
version = "0.3.12"
version = "0.3.13"
description = "PowerSync Core SQLite Extension"

repositories {
Expand Down
2 changes: 1 addition & 1 deletion android/src/prefab/prefab.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"name": "powersync_sqlite_core",
"schema_version": 2,
"dependencies": [],
"version": "0.3.12"
"version": "0.3.13"
}
16 changes: 16 additions & 0 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::fix035::apply_v035_fix;

pub const LATEST_VERSION: i32 = 9;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
target_version: i32,
Expand Down Expand Up @@ -354,5 +356,19 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 8')
local_db.exec_safe(&stmt).into_db_result(local_db)?;
}

if current_version < 9 && target_version >= 9 {
let stmt = "\
ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0;
ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0;
INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_at_last'),
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_since_last'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 9')
));
";

local_db.exec_safe(stmt).into_db_result(local_db)?;
}

Ok(())
}
6 changes: 5 additions & 1 deletion crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
let mut last_op: Option<i64> = None;
let mut add_checksum: i32 = 0;
let mut op_checksum: i32 = 0;
let mut added_ops: i32 = 0;

while iterate_statement.step()? == ResultCode::ROW {
let op_id = iterate_statement.column_int64(0)?;
Expand All @@ -113,6 +114,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
let op_data = iterate_statement.column_text(5);

last_op = Some(op_id);
added_ops += 1;

if op == "PUT" || op == "REMOVE" {
let key: String;
Expand Down Expand Up @@ -236,13 +238,15 @@ WHERE bucket = ?1",
"UPDATE ps_buckets
SET last_op = ?2,
add_checksum = (add_checksum + ?3) & 0xffffffff,
op_checksum = (op_checksum + ?4) & 0xffffffff
op_checksum = (op_checksum + ?4) & 0xffffffff,
count_since_last = count_since_last + ?5
WHERE id = ?1",
)?;
statement.bind_int64(1, bucket_id)?;
statement.bind_int64(2, *last_op)?;
statement.bind_int(3, add_checksum)?;
statement.bind_int(4, op_checksum)?;
statement.bind_int(5, added_ops)?;

statement.exec()?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/view_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::error::SQLiteError;
use crate::migrations::powersync_migrate;
use crate::migrations::{powersync_migrate, LATEST_VERSION};
use crate::util::quote_identifier;
use crate::{create_auto_tx_function, create_sqlite_text_fn};

Expand Down Expand Up @@ -120,7 +120,7 @@ fn powersync_init_impl(

setup_internal_views(local_db)?;

powersync_migrate(ctx, 8)?;
powersync_migrate(ctx, LATEST_VERSION)?;

Ok(String::from(""))
}
Expand Down
64 changes: 64 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,70 @@ void main() {
{'r': isNull});
expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0));
});

test('tracks download progress', () {
const bucket = 'bkt';
void expectProgress(int atLast, int sinceLast) {
final [row] = db.select(
'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?',
[bucket],
);
final [actualAtLast, actualSinceLast] = row.values;

expect(actualAtLast, atLast, reason: 'count_at_last mismatch');
expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch');
}

pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'});
expectProgress(0, 1);

pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'});
expectProgress(0, 2);

expect(
pushCheckpointComplete(
'2',
null,
[_bucketChecksum(bucket, 1, checksum: 0)],
priority: 1,
),
isTrue,
);

// Running partial or complete checkpoints should not reset stats, client
// SDKs are responsible for that.
expectProgress(0, 2);
expect(db.select('SELECT * FROM items'), isNotEmpty);

expect(
pushCheckpointComplete(
'2',
null,
[_bucketChecksum(bucket, 1, checksum: 0)],
),
isTrue,
);
expectProgress(0, 2);

db.execute('''
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
WHERE ?1->name IS NOT NULL
''', [
json.encode({bucket: 2}),
]);
expectProgress(2, 0);

// Run another iteration of this
pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'});
expectProgress(2, 1);
db.execute('''
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
WHERE ?1->name IS NOT NULL
''', [
json.encode({bucket: 3}),
]);
expectProgress(3, 0);
});
});
}

Expand Down
12 changes: 6 additions & 6 deletions dart/test/utils/fix_035_fixtures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ const dataBroken = '''

/// Data after applying the migration fix, but before sync_local
const dataMigrated = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
Expand All @@ -39,9 +39,9 @@ const dataMigrated = '''

/// Data after applying the migration fix and sync_local
const dataFixed = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
Expand Down
60 changes: 59 additions & 1 deletion dart/test/utils/migration_fixtures.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The current database version
const databaseVersion = 8;
const databaseVersion = 9;

/// This is the base database state that we expect at various schema versions.
/// Generated by loading the specific library version, and exporting the schema.
Expand Down Expand Up @@ -261,6 +261,52 @@ const expectedState = <int, String>{
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
''',
9: r'''
;CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
;CREATE TABLE ps_oplog(
bucket INTEGER NOT NULL,
op_id INTEGER NOT NULL,
row_type TEXT,
row_id TEXT,
key TEXT,
data TEXT,
hash INTEGER NOT NULL) STRICT
;CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL PRIMARY KEY,
last_synced_at TEXT NOT NULL
) STRICT
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
;CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
''',
};

Expand Down Expand Up @@ -341,6 +387,17 @@ const data1 = <int, String>{
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
''',
9: r'''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
'''
};

Expand Down Expand Up @@ -384,6 +441,7 @@ final dataDown1 = <int, String>{
5: data1[5]!,
6: data1[5]!,
7: data1[5]!,
8: data1[5]!,
};

final finalData1 = data1[databaseVersion]!;
Expand Down
7 changes: 7 additions & 0 deletions dart/test/utils/native_test_utils.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:ffi';
import 'dart:io';

import 'package:sqlite3/common.dart';
import 'package:sqlite3/open.dart' as sqlite_open;
Expand All @@ -15,6 +16,12 @@ void applyOpenOverride() {
return DynamicLibrary.open('libsqlite3.so.0');
});
sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () {
// Prefer using Homebrew's SQLite which allows loading extensions.
const fromHomebrew = '/opt/homebrew/opt/sqlite/lib/libsqlite3.dylib';
if (File(fromHomebrew).existsSync()) {
return DynamicLibrary.open(fromHomebrew);
}

return DynamicLibrary.open('libsqlite3.dylib');
});
}
Expand Down
Loading