Skip to content

Commit 17b7f46

Browse files
authored
Merge pull request #68 from powersync-ja/track-opcounts
Track downloaded operations in `ps_buckets`
2 parents d810646 + 3fe09ed commit 17b7f46

14 files changed

+229
-20
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ inherits = "release"
2929
inherits = "wasm"
3030

3131
[workspace.package]
32-
version = "0.3.12"
32+
version = "0.3.13"
3333
edition = "2021"
3434
authors = ["JourneyApps"]
3535
keywords = ["sqlite", "powersync"]

android/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
group = "co.powersync"
9-
version = "0.3.12"
9+
version = "0.3.13"
1010
description = "PowerSync Core SQLite Extension"
1111

1212
repositories {

android/src/prefab/prefab.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"name": "powersync_sqlite_core",
33
"schema_version": 2,
44
"dependencies": [],
5-
"version": "0.3.12"
5+
"version": "0.3.13"
66
}

crates/core/src/migrations.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::bucket_priority::BucketPriority;
1212
use crate::error::{PSResult, SQLiteError};
1313
use crate::fix035::apply_v035_fix;
1414

15+
pub const LATEST_VERSION: i32 = 9;
16+
1517
pub fn powersync_migrate(
1618
ctx: *mut sqlite::context,
1719
target_version: i32,
@@ -354,5 +356,19 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 8')
354356
local_db.exec_safe(&stmt).into_db_result(local_db)?;
355357
}
356358

359+
if current_version < 9 && target_version >= 9 {
360+
let stmt = "\
361+
ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0;
362+
ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0;
363+
INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
364+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_at_last'),
365+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_since_last'),
366+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 9')
367+
));
368+
";
369+
370+
local_db.exec_safe(stmt).into_db_result(local_db)?;
371+
}
372+
357373
Ok(())
358374
}

crates/core/src/operations.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
103103
let mut last_op: Option<i64> = None;
104104
let mut add_checksum: i32 = 0;
105105
let mut op_checksum: i32 = 0;
106+
let mut added_ops: i32 = 0;
106107

107108
while iterate_statement.step()? == ResultCode::ROW {
108109
let op_id = iterate_statement.column_int64(0)?;
@@ -113,6 +114,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
113114
let op_data = iterate_statement.column_text(5);
114115

115116
last_op = Some(op_id);
117+
added_ops += 1;
116118

117119
if op == "PUT" || op == "REMOVE" {
118120
let key: String;
@@ -236,13 +238,15 @@ WHERE bucket = ?1",
236238
"UPDATE ps_buckets
237239
SET last_op = ?2,
238240
add_checksum = (add_checksum + ?3) & 0xffffffff,
239-
op_checksum = (op_checksum + ?4) & 0xffffffff
241+
op_checksum = (op_checksum + ?4) & 0xffffffff,
242+
count_since_last = count_since_last + ?5
240243
WHERE id = ?1",
241244
)?;
242245
statement.bind_int64(1, bucket_id)?;
243246
statement.bind_int64(2, *last_op)?;
244247
statement.bind_int(3, add_checksum)?;
245248
statement.bind_int(4, op_checksum)?;
249+
statement.bind_int(5, added_ops)?;
246250

247251
statement.exec()?;
248252
}

crates/core/src/view_admin.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use sqlite_nostd as sqlite;
1111
use sqlite_nostd::{Connection, Context};
1212

1313
use crate::error::SQLiteError;
14-
use crate::migrations::powersync_migrate;
14+
use crate::migrations::{powersync_migrate, LATEST_VERSION};
1515
use crate::util::quote_identifier;
1616
use crate::{create_auto_tx_function, create_sqlite_text_fn};
1717

@@ -120,7 +120,7 @@ fn powersync_init_impl(
120120

121121
setup_internal_views(local_db)?;
122122

123-
powersync_migrate(ctx, 8)?;
123+
powersync_migrate(ctx, LATEST_VERSION)?;
124124

125125
Ok(String::from(""))
126126
}

dart/test/sync_test.dart

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,70 @@ void main() {
248248
{'r': isNull});
249249
expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0));
250250
});
251+
252+
test('tracks download progress', () {
253+
const bucket = 'bkt';
254+
void expectProgress(int atLast, int sinceLast) {
255+
final [row] = db.select(
256+
'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?',
257+
[bucket],
258+
);
259+
final [actualAtLast, actualSinceLast] = row.values;
260+
261+
expect(actualAtLast, atLast, reason: 'count_at_last mismatch');
262+
expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch');
263+
}
264+
265+
pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'});
266+
expectProgress(0, 1);
267+
268+
pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'});
269+
expectProgress(0, 2);
270+
271+
expect(
272+
pushCheckpointComplete(
273+
'2',
274+
null,
275+
[_bucketChecksum(bucket, 1, checksum: 0)],
276+
priority: 1,
277+
),
278+
isTrue,
279+
);
280+
281+
// Running partial or complete checkpoints should not reset stats, client
282+
// SDKs are responsible for that.
283+
expectProgress(0, 2);
284+
expect(db.select('SELECT * FROM items'), isNotEmpty);
285+
286+
expect(
287+
pushCheckpointComplete(
288+
'2',
289+
null,
290+
[_bucketChecksum(bucket, 1, checksum: 0)],
291+
),
292+
isTrue,
293+
);
294+
expectProgress(0, 2);
295+
296+
db.execute('''
297+
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
298+
WHERE ?1->name IS NOT NULL
299+
''', [
300+
json.encode({bucket: 2}),
301+
]);
302+
expectProgress(2, 0);
303+
304+
// Run another iteration of this
305+
pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'});
306+
expectProgress(2, 1);
307+
db.execute('''
308+
UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
309+
WHERE ?1->name IS NOT NULL
310+
''', [
311+
json.encode({bucket: 3}),
312+
]);
313+
expectProgress(3, 0);
314+
});
251315
});
252316
}
253317

dart/test/utils/fix_035_fixtures.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ const dataBroken = '''
1818

1919
/// Data after applying the migration fix, but before sync_local
2020
const dataMigrated = '''
21-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
22-
(1, 'b1', 0, 0, 0, 0, 120, 0),
23-
(2, 'b2', 0, 0, 0, 0, 3, 0)
21+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
22+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
23+
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
2424
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
2525
(1, 1, 'todos', 't1', '', '{}', 100),
2626
(1, 2, 'todos', 't2', '', '{}', 20),
@@ -39,9 +39,9 @@ const dataMigrated = '''
3939

4040
/// Data after applying the migration fix and sync_local
4141
const dataFixed = '''
42-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
43-
(1, 'b1', 0, 0, 0, 0, 120, 0),
44-
(2, 'b2', 0, 0, 0, 0, 3, 0)
42+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
43+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
44+
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
4545
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
4646
(1, 1, 'todos', 't1', '', '{}', 100),
4747
(1, 2, 'todos', 't2', '', '{}', 20),

dart/test/utils/migration_fixtures.dart

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 8;
2+
const databaseVersion = 9;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -261,6 +261,52 @@ const expectedState = <int, String>{
261261
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
262262
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
263263
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
264+
''',
265+
9: r'''
266+
;CREATE TABLE ps_buckets(
267+
id INTEGER PRIMARY KEY,
268+
name TEXT NOT NULL,
269+
last_applied_op INTEGER NOT NULL DEFAULT 0,
270+
last_op INTEGER NOT NULL DEFAULT 0,
271+
target_op INTEGER NOT NULL DEFAULT 0,
272+
add_checksum INTEGER NOT NULL DEFAULT 0,
273+
op_checksum INTEGER NOT NULL DEFAULT 0,
274+
pending_delete INTEGER NOT NULL DEFAULT 0
275+
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
276+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
277+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
278+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
279+
;CREATE TABLE ps_oplog(
280+
bucket INTEGER NOT NULL,
281+
op_id INTEGER NOT NULL,
282+
row_type TEXT,
283+
row_id TEXT,
284+
key TEXT,
285+
data TEXT,
286+
hash INTEGER NOT NULL) STRICT
287+
;CREATE TABLE ps_sync_state (
288+
priority INTEGER NOT NULL PRIMARY KEY,
289+
last_synced_at TEXT NOT NULL
290+
) STRICT
291+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
292+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
293+
;CREATE TABLE ps_updated_rows(
294+
row_type TEXT,
295+
row_id TEXT,
296+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
297+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
298+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
299+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
300+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
301+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
302+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
303+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
304+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
305+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
306+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
307+
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
308+
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
309+
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
264310
''',
265311
};
266312

@@ -341,6 +387,17 @@ const data1 = <int, String>{
341387
(2, 3, 'lists', 'l1', '', '{}', 3)
342388
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
343389
('lists', 'l2')
390+
''',
391+
9: r'''
392+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
393+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
394+
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
395+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
396+
(1, 1, 'todos', 't1', '', '{}', 100),
397+
(1, 2, 'todos', 't2', '', '{}', 20),
398+
(2, 3, 'lists', 'l1', '', '{}', 3)
399+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
400+
('lists', 'l2')
344401
'''
345402
};
346403

@@ -384,6 +441,7 @@ final dataDown1 = <int, String>{
384441
5: data1[5]!,
385442
6: data1[5]!,
386443
7: data1[5]!,
444+
8: data1[5]!,
387445
};
388446

389447
final finalData1 = data1[databaseVersion]!;

0 commit comments

Comments
 (0)