diff --git a/demos/django-todolist/lib/widgets/guard_by_sync.dart b/demos/django-todolist/lib/widgets/guard_by_sync.dart new file mode 100644 index 00000000..a6d6bb48 --- /dev/null +++ b/demos/django-todolist/lib/widgets/guard_by_sync.dart @@ -0,0 +1,55 @@ +import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart' hide Column; +import 'package:powersync_django_todolist_demo/powersync.dart'; + +/// A widget that shows [child] after a complete sync on the database has +/// completed and a progress bar before that. +class GuardBySync extends StatelessWidget { + final Widget child; + + /// When set, wait only for a complete sync within the [BucketPriority] + /// instead of a full sync. + final BucketPriority? priority; + + const GuardBySync({ + super.key, + required this.child, + this.priority, + }); + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, snapshot) { + final status = snapshot.requireData; + final (didSync, progress) = switch (priority) { + null => ( + status.hasSynced ?? false, + status.downloadProgress?.untilCompletion + ), + var priority? => ( + status.statusForPriority(priority).hasSynced ?? false, + status.downloadProgress?.untilPriority(priority) + ), + }; + + if (didSync) { + return child; + } else { + return Center( + child: Column( + children: [ + const Text('Busy with sync...'), + LinearProgressIndicator(value: progress?.fraction), + if (progress case final progress?) + Text('${progress.completed} out of ${progress.total}') + ], + ), + ); + } + }, + ); + } +} diff --git a/demos/django-todolist/lib/widgets/lists_page.dart b/demos/django-todolist/lib/widgets/lists_page.dart index e31c2fc8..17e7f512 100644 --- a/demos/django-todolist/lib/widgets/lists_page.dart +++ b/demos/django-todolist/lib/widgets/lists_page.dart @@ -1,6 +1,5 @@ -import 'dart:async'; - import 'package:flutter/material.dart'; +import 'package:powersync_django_todolist_demo/widgets/guard_by_sync.dart'; import './list_item.dart'; import './list_item_dialog.dart'; @@ -41,48 +40,27 @@ class ListsPage extends StatelessWidget { } } -class ListsWidget extends StatefulWidget { +final class ListsWidget extends StatelessWidget { const ListsWidget({super.key}); - @override - State createState() { - return _ListsWidgetState(); - } -} - -class _ListsWidgetState extends State { - List _data = []; - StreamSubscription? _subscription; - - _ListsWidgetState(); - - @override - void initState() { - super.initState(); - final stream = TodoList.watchListsWithStats(); - _subscription = stream.listen((data) { - if (!context.mounted) { - return; - } - setState(() { - _data = data; - }); - }); - } - - @override - void dispose() { - super.dispose(); - _subscription?.cancel(); - } - @override Widget build(BuildContext context) { - return ListView( - padding: const EdgeInsets.symmetric(vertical: 8.0), - children: _data.map((list) { - return ListItemWidget(list: list); - }).toList(), + return GuardBySync( + child: StreamBuilder( + stream: TodoList.watchListsWithStats(), + builder: (context, snapshot) { + if (snapshot.data case final todoLists?) { + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: todoLists.map((list) { + return ListItemWidget(list: list); + }).toList(), + ); + } else { + return const CircularProgressIndicator(); + } + }, + ), ); } } diff --git a/demos/django-todolist/macos/Podfile.lock b/demos/django-todolist/macos/Podfile.lock index c1d67d1d..ec19cbfa 100644 --- a/demos/django-todolist/macos/Podfile.lock +++ b/demos/django-todolist/macos/Podfile.lock @@ -24,7 +24,7 @@ PODS: - sqlite3_flutter_libs (0.0.1): - Flutter - FlutterMacOS - - sqlite3 (~> 3.49.1) + - sqlite3 (~> 3.49.0) - sqlite3/dbstatvtab - sqlite3/fts5 - sqlite3/perf-threadsafe @@ -61,7 +61,7 @@ SPEC CHECKSUMS: powersync_flutter_libs: 011c1704766d154faf2373bb9c973d26910d322b shared_preferences_foundation: 9e1978ff2562383bd5676f64ec4e9aa8fa06a6f7 sqlite3: fc1400008a9b3525f5914ed715a5d1af0b8f4983 - sqlite3_flutter_libs: f8fc13346870e73fe35ebf6dbb997fbcd156b241 + sqlite3_flutter_libs: 3c323550ef3b928bc0aa9513c841e45a7d242832 PODFILE CHECKSUM: 236401fc2c932af29a9fcf0e97baeeb2d750d367 diff --git a/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme b/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme index 2c526589..a0e59441 100644 --- a/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme +++ b/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme @@ -59,6 +59,7 @@ ignoresPersistentStateOnLaunch = "NO" debugDocumentVersioning = "YES" debugServiceExtension = "internal" + enableGPUValidationMode = "1" allowLocationSimulation = "YES"> diff --git a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart new file mode 100644 index 00000000..d55ed4e3 --- /dev/null +++ b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart @@ -0,0 +1,53 @@ +import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart' hide Column; +import 'package:powersync_flutter_demo/powersync.dart'; + +/// A widget that shows [child] after a complete sync on the database has +/// completed and a progress bar before that. +class GuardBySync extends StatelessWidget { + final Widget child; + + /// When set, wait only for a complete sync within the [BucketPriority] + /// instead of a full sync. + final BucketPriority? priority; + + const GuardBySync({ + super.key, + required this.child, + this.priority, + }); + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, snapshot) { + final status = snapshot.requireData; + final (didSync, progress) = switch (priority) { + null => (status.hasSynced ?? false, status.downloadProgress), + var priority? => ( + status.statusForPriority(priority).hasSynced ?? false, + status.downloadProgress?.untilPriority(priority) + ), + }; + + if (didSync) { + return child; + } else { + return Center( + child: Column( + children: [ + const Text('Busy with sync...'), + LinearProgressIndicator(value: progress?.downloadedFraction), + if (progress case final progress?) + Text( + '${progress.downloadedOperations} out of ${progress.totalOperations}') + ], + ), + ); + } + }, + ); + } +} diff --git a/demos/supabase-todolist/lib/widgets/lists_page.dart b/demos/supabase-todolist/lib/widgets/lists_page.dart index d60cb9f5..028dbe7e 100644 --- a/demos/supabase-todolist/lib/widgets/lists_page.dart +++ b/demos/supabase-todolist/lib/widgets/lists_page.dart @@ -1,6 +1,6 @@ import 'package:flutter/material.dart'; import 'package:powersync/powersync.dart'; -import 'package:powersync_flutter_demo/powersync.dart'; +import 'package:powersync_flutter_demo/widgets/guard_by_sync.dart'; import './list_item.dart'; import './list_item_dialog.dart'; @@ -46,29 +46,23 @@ final class ListsWidget extends StatelessWidget { @override Widget build(BuildContext context) { - return FutureBuilder( - future: db.waitForFirstSync(priority: _listsPriority), - builder: (context, snapshot) { - if (snapshot.connectionState == ConnectionState.done) { - return StreamBuilder( - stream: TodoList.watchListsWithStats(), - builder: (context, snapshot) { - if (snapshot.data case final todoLists?) { - return ListView( - padding: const EdgeInsets.symmetric(vertical: 8.0), - children: todoLists.map((list) { - return ListItemWidget(list: list); - }).toList(), - ); - } else { - return const CircularProgressIndicator(); - } - }, - ); - } else { - return const Text('Busy with sync...'); - } - }, + return GuardBySync( + priority: _listsPriority, + child: StreamBuilder( + stream: TodoList.watchListsWithStats(), + builder: (context, snapshot) { + if (snapshot.data case final todoLists?) { + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: todoLists.map((list) { + return ListItemWidget(list: list); + }).toList(), + ); + } else { + return const CircularProgressIndicator(); + } + }, + ), ); } diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 54e67156..998c0ce1 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,5 +10,6 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; -export 'src/sync_status.dart'; +export 'src/sync/sync_status.dart' + hide BucketProgress, InternalSyncDownloadProgress; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/database/core_version.dart b/packages/powersync_core/lib/src/database/core_version.dart index b0d5ff1c..dc14f169 100644 --- a/packages/powersync_core/lib/src/database/core_version.dart +++ b/packages/powersync_core/lib/src/database/core_version.dart @@ -60,7 +60,7 @@ extension type const PowerSyncCoreVersion((int, int, int) _tuple) { // Note: When updating this, also update the download URL in // scripts/init_powersync_core_binary.dart and the version ref in // packages/sqlite3_wasm_build/build.sh - static const minimum = PowerSyncCoreVersion((0, 3, 11)); + static const minimum = PowerSyncCoreVersion((0, 3, 12)); /// The first version of the core extensions that this version of the Dart /// SDK doesn't support. diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 0b61898c..f489f9bc 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -5,7 +5,7 @@ import 'package:meta/meta.dart'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:powersync_core/src/abort_controller.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/connector.dart'; import 'package:powersync_core/src/database/powersync_database.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; @@ -16,8 +16,8 @@ import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory. import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; -import 'package:powersync_core/src/sync_status.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 6add250b..81adb4fc 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -11,7 +11,7 @@ import 'package:powersync_core/src/database/core_version.dart'; import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; -import 'package:powersync_core/src/sync_status.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Schema used for the local database. @@ -175,16 +175,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @visibleForTesting void setStatus(SyncStatus status) { if (status != currentStatus) { - // Note that currently the streaming sync implementation will never set hasSynced. - // lastSyncedAt implies that syncing has completed at some point (hasSynced = true). - // The previous values of hasSynced should be preserved here. - final newStatus = status.copyWith( - hasSynced: status.lastSyncedAt != null - ? true - : status.hasSynced ?? currentStatus.hasSynced, - lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt); - // If the absence of hasSync was the only difference, the new states would be equal - // and don't require an event. So, check again. + final newStatus = SyncStatus( + connected: status.connected, + downloading: status.downloading, + uploading: status.uploading, + connecting: status.connecting, + uploadError: status.uploadError, + downloadError: status.downloadError, + priorityStatusEntries: status.priorityStatusEntries, + downloadProgress: status.downloadProgress, + // Note that currently the streaming sync implementation will never set + // hasSynced. lastSyncedAt implies that syncing has completed at some + // point (hasSynced = true). + // The previous values of hasSynced should be preserved here. + lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, + hasSynced: status.lastSyncedAt != null + ? true + : status.hasSynced ?? currentStatus.hasSynced, + ); + + // If the absence of hasSynced was the only difference, the new states + // would be equal and don't require an event. So, check again. if (newStatus != currentStatus) { currentStatus = newStatus; statusStreamController.add(currentStatus); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index a55aa5b4..169e3e81 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -3,7 +3,7 @@ import 'package:meta/meta.dart'; import 'package:fetch_client/fetch_client.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/src/abort_controller.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/connector.dart'; import 'package:powersync_core/src/database/powersync_database.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; @@ -11,7 +11,7 @@ import 'package:powersync_core/src/log.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/web/web_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:powersync_core/src/schema_logic.dart' as schema_logic; diff --git a/packages/powersync_core/lib/src/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart similarity index 90% rename from packages/powersync_core/lib/src/bucket_storage.dart rename to packages/powersync_core/lib/src/sync/bucket_storage.dart index d79e4c05..fe0d1168 100644 --- a/packages/powersync_core/lib/src/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -1,16 +1,22 @@ +@internal +library; + import 'dart:async'; import 'dart:convert'; import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'crud.dart'; -import 'schema_logic.dart'; -import 'sync_types.dart'; +import '../crud.dart'; +import '../schema_logic.dart'; +import 'protocol.dart'; const compactOperationInterval = 1000; +typedef LocalOperationCounters = ({int atLast, int sinceLast}); + class BucketStorage { final SqliteConnection _internalDb; bool _hasCompletedSync = false; @@ -29,18 +35,32 @@ class BucketStorage { return await _internalDb.execute(query, parameters); } - void startSession() {} - Future> getBucketStates() async { final rows = await select( - 'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\''); + "SELECT name, cast(last_op as TEXT) FROM ps_buckets WHERE pending_delete = 0 AND name != '\$local'"); return [ for (var row in rows) BucketState( - bucket: row['bucket'] as String, opId: row['op_id'] as String) + bucket: row.columnAt(0) as String, + opId: row.columnAt(1) as String, + ) ]; } + Future> + getBucketOperationProgress() async { + final rows = await select( + "SELECT name, count_at_last, count_since_last FROM ps_buckets"); + + return { + for (final row in rows) + (row.columnAt(0) as String): ( + atLast: row.columnAt(1) as int, + sinceLast: row.columnAt(2) as int, + ) + }; + } + Future getClientId() async { final rows = await select('SELECT powersync_client_id() as client_id'); return rows.first['client_id'] as String; @@ -161,6 +181,21 @@ class BucketStorage { final rs = await tx.execute('SELECT last_insert_rowid() as result'); final result = rs[0]['result']; if (result == 1) { + if (forPartialPriority == null) { + // Reset progress counters. We only do this for a complete sync, as we + // want a download progress to always cover a complete checkpoint + // instead of resetting for partial completions. + await tx.execute(''' +UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE ?1->name IS NOT NULL +''', [ + json.encode({ + for (final bucket in checkpoint.checksums) + if (bucket.count case final count?) bucket.bucket: count, + }), + ]); + } + return true; } else { // can_update_local(db) == false diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart new file mode 100644 index 00000000..6da63654 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -0,0 +1,124 @@ +import 'dart:async'; + +import 'package:collection/collection.dart'; + +import 'sync_status.dart'; +import 'bucket_storage.dart'; +import 'protocol.dart'; + +final class MutableSyncStatus { + bool connected = false; + bool connecting = false; + bool downloading = false; + bool uploading = false; + + InternalSyncDownloadProgress? downloadProgress; + List priorityStatusEntries = const []; + + DateTime? lastSyncedAt; + + Object? uploadError; + Object? downloadError; + + void setConnectingIfNotConnected() { + if (!connected) { + connecting = true; + } + } + + void setConnected() { + connected = true; + connecting = false; + } + + void applyDownloadError(Object error) { + connected = false; + connecting = false; + downloading = false; + downloadProgress = null; + downloadError = error; + } + + void applyCheckpointReached(Checkpoint applied) { + downloading = false; + downloadProgress = null; + downloadError = null; + final now = lastSyncedAt = DateTime.now(); + priorityStatusEntries = [ + if (applied.checksums.isNotEmpty) + ( + hasSynced: true, + lastSyncedAt: now, + priority: maxBy( + applied.checksums.map((cs) => BucketPriority(cs.priority)), + (priority) => priority, + compare: BucketPriority.comparator, + )!, + ) + ]; + } + + void applyCheckpointStarted( + Map localProgress, + Checkpoint target, + ) { + downloading = true; + downloadProgress = + InternalSyncDownloadProgress.forNewCheckpoint(localProgress, target); + } + + void applyUploadError(Object error) { + uploading = false; + uploadError = error; + } + + void applyBatchReceived(SyncDataBatch batch) { + downloading = true; + if (downloadProgress case final previousProgress?) { + downloadProgress = previousProgress.incrementDownloaded(batch); + } + } + + SyncStatus immutableSnapshot() { + return SyncStatus( + connected: connected, + connecting: connecting, + downloading: downloading, + uploading: uploading, + downloadProgress: downloadProgress?.asSyncDownloadProgress, + priorityStatusEntries: UnmodifiableListView(priorityStatusEntries), + lastSyncedAt: lastSyncedAt, + hasSynced: null, // Stream client is not supposed to set this value. + uploadError: uploadError, + downloadError: downloadError, + ); + } +} + +final class SyncStatusStateStream { + final MutableSyncStatus status = MutableSyncStatus(); + SyncStatus _lastPublishedStatus = const SyncStatus(); + + final StreamController _statusStreamController = + StreamController.broadcast(); + + Stream get statusStream => _statusStreamController.stream; + + void updateStatus(void Function(MutableSyncStatus status) change) { + change(status); + + if (_statusStreamController.isClosed) { + return; + } + + final current = status.immutableSnapshot(); + if (current != _lastPublishedStatus) { + _statusStreamController.add(current); + _lastPublishedStatus = current; + } + } + + void close() { + _statusStreamController.close(); + } +} diff --git a/packages/powersync_core/lib/src/sync_types.dart b/packages/powersync_core/lib/src/sync/protocol.dart similarity index 98% rename from packages/powersync_core/lib/src/sync_types.dart rename to packages/powersync_core/lib/src/sync/protocol.dart index 968b53d4..4e07334b 100644 --- a/packages/powersync_core/lib/src/sync_types.dart +++ b/packages/powersync_core/lib/src/sync/protocol.dart @@ -136,18 +136,12 @@ final class Checkpoint extends StreamingSyncLine { 'write_checkpoint': writeCheckpoint, 'buckets': checksums .where((c) => priority == null || c.priority <= priority) - .map((c) => { - 'bucket': c.bucket, - 'checksum': c.checksum, - 'priority': c.priority, - }) + .map((c) => c.toJson()) .toList(growable: false) }; } } -typedef BucketDescription = ({String name, int priority}); - class BucketChecksum { final String bucket; final int priority; @@ -173,6 +167,15 @@ class BucketChecksum { checksum = json['checksum'] as int, count = json['count'] as int?, lastOpId = json['last_op_id'] as String?; + + Map toJson() { + return { + 'bucket': bucket, + 'checksum': checksum, + 'priority': priority, + 'count': count, + }; + } } /// A variant of [Checkpoint] that may be sent when the server has already sent diff --git a/packages/powersync_core/lib/src/stream_utils.dart b/packages/powersync_core/lib/src/sync/stream_utils.dart similarity index 100% rename from packages/powersync_core/lib/src/stream_utils.dart rename to packages/powersync_core/lib/src/sync/stream_utils.dart diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart similarity index 80% rename from packages/powersync_core/lib/src/streaming_sync.dart rename to packages/powersync_core/lib/src/sync/streaming_sync.dart index 716b38bf..e30fdca9 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -1,8 +1,8 @@ import 'dart:async'; import 'dart:convert' as convert; -import 'package:collection/collection.dart'; import 'package:http/http.dart' as http; +import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; import 'package:powersync_core/src/log_internal.dart'; @@ -10,15 +10,12 @@ import 'package:powersync_core/src/user_agent/user_agent.dart'; import 'package:sqlite_async/mutex.dart'; import 'bucket_storage.dart'; -import 'connector.dart'; -import 'crud.dart'; +import '../connector.dart'; +import '../crud.dart'; +import 'mutable_sync_status.dart'; import 'stream_utils.dart'; import 'sync_status.dart'; -import 'sync_types.dart'; - -/// Since we use null to indicate "no change" in status updates, we need -/// a different value to indicate "no error". -const _noError = Object(); +import 'protocol.dart'; abstract interface class StreamingSync { Stream get statusStream; @@ -29,13 +26,14 @@ abstract interface class StreamingSync { Future abort(); } +@internal class StreamingSyncImplementation implements StreamingSync { - BucketStorage adapter; + final BucketStorage adapter; final Future Function() credentialsCallback; final Future Function()? invalidCredentialsCallback; - final Future Function() uploadCrud; + final Stream crudUpdateTriggerStream; // An internal controller which is used to trigger CRUD uploads internally // e.g. when reconnecting. @@ -44,15 +42,8 @@ class StreamingSyncImplementation implements StreamingSync { final StreamController _internalCrudTriggerController = StreamController.broadcast(); - final Stream crudUpdateTriggerStream; - - final StreamController _statusStreamController = - StreamController.broadcast(); - - @override - late final Stream statusStream; - - late final http.Client _client; + final http.Client _client; + final SyncStatusStateStream _state = SyncStatusStateStream(); final StreamController _localPingController = StreamController.broadcast(); @@ -61,8 +52,6 @@ class StreamingSyncImplementation implements StreamingSync { final Map? syncParameters; - SyncStatus lastStatus = const SyncStatus(); - AbortController? _abort; bool _safeToClose = true; @@ -70,7 +59,6 @@ class StreamingSyncImplementation implements StreamingSync { final Mutex syncMutex, crudMutex; final Map _userAgentHeaders; - String? clientId; StreamingSyncImplementation( @@ -86,12 +74,13 @@ class StreamingSyncImplementation implements StreamingSync { /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. String? identifier = "unknown"}) - : syncMutex = Mutex(identifier: "sync-$identifier"), + : _client = client, + syncMutex = Mutex(identifier: "sync-$identifier"), crudMutex = Mutex(identifier: "crud-$identifier"), - _userAgentHeaders = userAgentHeaders() { - _client = client; - statusStream = _statusStreamController.stream; - } + _userAgentHeaders = userAgentHeaders(); + + @override + Stream get statusStream => _state.statusStream; @override Future abort() async { @@ -119,17 +108,13 @@ class StreamingSyncImplementation implements StreamingSync { // Now close the client in all cases not covered above _client.close(); - _statusStreamController.close(); + _state.close(); } bool get aborted { return _abort?.aborted ?? false; } - bool get isConnected { - return lastStatus.connected; - } - @override Future streamingSync() async { try { @@ -138,7 +123,7 @@ class StreamingSyncImplementation implements StreamingSync { crudLoop(); var invalidCredentials = false; while (!aborted) { - _updateStatus(connecting: true); + _state.updateStatus((s) => s.setConnectingIfNotConnected()); try { if (invalidCredentials && invalidCredentialsCallback != null) { // This may error. In that case it will be retried again on the next @@ -160,11 +145,7 @@ class StreamingSyncImplementation implements StreamingSync { isolateLogger.warning('Sync error: $message', e, stacktrace); invalidCredentials = true; - _updateStatus( - connected: false, - connecting: true, - downloading: false, - downloadError: e); + _state.updateStatus((s) => s.applyDownloadError(e)); // On error, wait a little before retrying // When aborting, don't wait @@ -208,7 +189,7 @@ class StreamingSyncImplementation implements StreamingSync { // This is the first item in the FIFO CRUD queue. CrudEntry? nextCrudItem = await adapter.nextCrudItem(); if (nextCrudItem != null) { - _updateStatus(uploading: true); + _state.updateStatus((s) => s.uploading = true); if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. isolateLogger.warning( @@ -221,7 +202,7 @@ class StreamingSyncImplementation implements StreamingSync { checkedCrudItem = nextCrudItem; await uploadCrud(); - _updateStatus(uploadError: _noError); + _state.updateStatus((s) => s.uploadError = null); } else { // Uploading is completed await adapter.updateLocalTarget(() => getWriteCheckpoint()); @@ -230,9 +211,10 @@ class StreamingSyncImplementation implements StreamingSync { } catch (e, stacktrace) { checkedCrudItem = null; isolateLogger.warning('Data upload error', e, stacktrace); - _updateStatus(uploading: false, uploadError: e); + _state.updateStatus((s) => s.applyUploadError(e)); await _delayRetry(); - if (!isConnected) { + + if (!_state.status.connected) { // Exit the upload loop if the sync stream is no longer connected break; } @@ -241,7 +223,7 @@ class StreamingSyncImplementation implements StreamingSync { e, stacktrace); } finally { - _updateStatus(uploading: false); + _state.updateStatus((s) => s.uploading = false); } } }, timeout: retryDelay); @@ -276,50 +258,15 @@ class StreamingSyncImplementation implements StreamingSync { } void _updateStatusForPriority(SyncPriorityStatus completed) { - // All status entries with a higher priority can be deleted since this - // partial sync includes them. - _updateStatus(priorityStatusEntries: [ - for (final entry in lastStatus.priorityStatusEntries) - if (entry.priority < completed.priority) entry, - completed - ]); - } - - /// Update sync status based on any non-null parameters. - /// To clear errors, use [_noError] instead of null. - void _updateStatus({ - DateTime? lastSyncedAt, - bool? hasSynced, - bool? connected, - bool? connecting, - bool? downloading, - bool? uploading, - Object? uploadError, - Object? downloadError, - List? priorityStatusEntries, - }) { - final c = connected ?? lastStatus.connected; - var newStatus = SyncStatus( - connected: c, - connecting: !c && (connecting ?? lastStatus.connecting), - lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt, - hasSynced: hasSynced ?? lastStatus.hasSynced, - downloading: downloading ?? lastStatus.downloading, - uploading: uploading ?? lastStatus.uploading, - uploadError: uploadError == _noError - ? null - : (uploadError ?? lastStatus.uploadError), - downloadError: downloadError == _noError - ? null - : (downloadError ?? lastStatus.downloadError), - priorityStatusEntries: - priorityStatusEntries ?? lastStatus.priorityStatusEntries, - ); - - if (!_statusStreamController.isClosed) { - lastStatus = newStatus; - _statusStreamController.add(newStatus); - } + _state.updateStatus((s) { + // All status entries with a higher priority can be deleted since this + // partial sync includes them. + s.priorityStatusEntries = [ + for (final entry in s.priorityStatusEntries) + if (entry.priority < completed.priority) entry, + completed + ]; + }); } Future<(List, Map)> @@ -338,8 +285,6 @@ class StreamingSyncImplementation implements StreamingSync { Future streamingSyncIteration( {AbortController? abortController}) async { - adapter.startSession(); - var (bucketRequests, bucketMap) = await _collectLocalBucketState(); Checkpoint? targetCheckpoint; @@ -362,7 +307,7 @@ class StreamingSyncImplementation implements StreamingSync { break; } - _updateStatus(connected: true, connecting: false); + _state.updateStatus((s) => s.setConnected()); switch (line) { case Checkpoint(): targetCheckpoint = line; @@ -377,7 +322,9 @@ class StreamingSyncImplementation implements StreamingSync { } bucketMap = newBuckets; await adapter.removeBuckets([...bucketsToDelete]); - _updateStatus(downloading: true); + final initialProgress = await adapter.getBucketOperationProgress(); + _state.updateStatus( + (s) => s.applyCheckpointStarted(initialProgress, line)); case StreamingSyncCheckpointComplete(): final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { @@ -389,27 +336,9 @@ class StreamingSyncImplementation implements StreamingSync { // Checksums valid, but need more data for a consistent checkpoint. // Continue waiting. } else { - appliedCheckpoint = targetCheckpoint; - - final now = DateTime.now(); - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: now, - priorityStatusEntries: [ - if (appliedCheckpoint.checksums.isNotEmpty) - ( - hasSynced: true, - lastSyncedAt: now, - priority: maxBy( - appliedCheckpoint.checksums - .map((cs) => BucketPriority(cs.priority)), - (priority) => priority, - compare: BucketPriority.comparator, - )!, - ) - ], - ); + final thisCheckpoint = appliedCheckpoint = targetCheckpoint; + _state + .updateStatus((s) => s.applyCheckpointReached(thisCheckpoint)); } validatedCheckpoint = targetCheckpoint; @@ -438,7 +367,6 @@ class StreamingSyncImplementation implements StreamingSync { throw PowerSyncProtocolException( 'Checkpoint diff without previous checkpoint'); } - _updateStatus(downloading: true); final diff = line; final Map newBuckets = {}; for (var checksum in targetCheckpoint.checksums) { @@ -456,13 +384,18 @@ class StreamingSyncImplementation implements StreamingSync { checksums: [...newBuckets.values], writeCheckpoint: diff.writeCheckpoint); targetCheckpoint = newCheckpoint; + final initialProgress = await adapter.getBucketOperationProgress(); + _state.updateStatus( + (s) => s.applyCheckpointStarted(initialProgress, newCheckpoint)); bucketMap = newBuckets.map((name, checksum) => MapEntry(name, (name: name, priority: checksum.priority))); await adapter.removeBuckets(diff.removedBuckets); adapter.setTargetCheckpoint(targetCheckpoint); case SyncDataBatch(): - _updateStatus(downloading: true); + // TODO: This increments the counters before actually saving sync + // data. Might be fine though? + _state.updateStatus((s) => s.applyBatchReceived(line)); await adapter.saveSyncData(line); case StreamingSyncKeepalive(:final tokenExpiresIn): if (tokenExpiresIn == 0) { @@ -489,10 +422,9 @@ class StreamingSyncImplementation implements StreamingSync { isolateLogger.fine('Unknown sync line: $rawData'); case null: // Local ping if (targetCheckpoint == appliedCheckpoint) { - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + if (appliedCheckpoint case final completed?) { + _state.updateStatus((s) => s.applyCheckpointReached(completed)); + } } else if (validatedCheckpoint == targetCheckpoint) { final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { @@ -504,12 +436,8 @@ class StreamingSyncImplementation implements StreamingSync { // Checksums valid, but need more data for a consistent checkpoint. // Continue waiting. } else { - appliedCheckpoint = targetCheckpoint; - - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + final completed = appliedCheckpoint = targetCheckpoint; + _state.updateStatus((s) => s.applyCheckpointReached(completed)); } } } @@ -595,3 +523,8 @@ String _syncErrorMessage(Object? error) { return '${error.runtimeType}'; } } + +typedef BucketDescription = ({ + String name, + int priority, +}); diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart new file mode 100644 index 00000000..fabe4322 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -0,0 +1,377 @@ +import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; + +import 'bucket_storage.dart'; +import 'protocol.dart'; + +final class SyncStatus { + /// true if currently connected. + /// + /// This means the PowerSync connection is ready to download, and + /// [PowerSyncBackendConnector.uploadData] may be called for any local changes. + final bool connected; + + /// true if the PowerSync connection is busy connecting. + /// + /// During this stage, [PowerSyncBackendConnector.uploadData] may already be called, + /// called, and [uploading] may be true. + final bool connecting; + + /// true if actively downloading changes. + /// + /// This is only true when [connected] is also true. + final bool downloading; + + /// A realtime progress report on how many operations have been downloaded and + /// how many are necessary in total to complete the next sync iteration. + /// + /// This field is only set when [downloading] is also true. + final SyncDownloadProgress? downloadProgress; + + /// true if uploading changes + final bool uploading; + + /// Time that a last sync has fully completed, if any. + /// + /// This is null while loading the database. + final DateTime? lastSyncedAt; + + /// Indicates whether there has been at least one full sync, if any. + /// Is null when unknown, for example when state is still being loaded from the database. + final bool? hasSynced; + + /// Error during uploading. + /// + /// Cleared on the next successful upload. + final Object? uploadError; + + /// Error during downloading (including connecting). + /// + /// Cleared on the next successful data download. + final Object? downloadError; + + final List priorityStatusEntries; + + const SyncStatus({ + this.connected = false, + this.connecting = false, + this.lastSyncedAt, + this.hasSynced, + this.downloadProgress, + this.downloading = false, + this.uploading = false, + this.downloadError, + this.uploadError, + this.priorityStatusEntries = const [], + }); + + @override + bool operator ==(Object other) { + return (other is SyncStatus && + other.connected == connected && + other.downloading == downloading && + other.uploading == uploading && + other.connecting == connecting && + other.downloadError == downloadError && + other.uploadError == uploadError && + other.lastSyncedAt == lastSyncedAt && + other.hasSynced == hasSynced && + _statusEquality.equals( + other.priorityStatusEntries, priorityStatusEntries) && + other.downloadProgress == downloadProgress); + } + + // Deprecated because it can't set fields back to null + @Deprecated('Should not be used in user code') + SyncStatus copyWith({ + bool? connected, + bool? downloading, + bool? uploading, + bool? connecting, + Object? uploadError, + Object? downloadError, + DateTime? lastSyncedAt, + bool? hasSynced, + List? priorityStatusEntries, + }) { + return SyncStatus( + connected: connected ?? this.connected, + downloading: downloading ?? this.downloading, + uploading: uploading ?? this.uploading, + connecting: connecting ?? this.connecting, + uploadError: uploadError ?? this.uploadError, + downloadError: downloadError ?? this.downloadError, + lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt, + hasSynced: hasSynced ?? this.hasSynced, + priorityStatusEntries: + priorityStatusEntries ?? this.priorityStatusEntries, + ); + } + + /// Get the current [downloadError] or [uploadError]. + Object? get anyError { + return downloadError ?? uploadError; + } + + /// Returns information for [lastSyncedAt] and [hasSynced] information at a + /// partial sync priority, or `null` if the status for that priority is + /// unknown. + /// + /// The information returned may be more generic than requested. For instance, + /// a fully-completed sync cycle (as expressed by [lastSyncedAt]) necessarily + /// includes all buckets across all priorities. So, if no further partial + /// checkpoints have been received since that complete sync, + /// [statusForPriority] may return information for that complete sync. + /// Similarly, requesting the sync status for priority `1` may return + /// information extracted from the lower priority `2` since each partial sync + /// in priority `2` necessarily includes a consistent view over data in + /// priority `1`. + SyncPriorityStatus statusForPriority(BucketPriority priority) { + assert(priorityStatusEntries.isSortedByCompare( + (e) => e.priority, BucketPriority.comparator)); + + for (final known in priorityStatusEntries) { + // Lower-priority buckets are synchronized after higher-priority buckets, + // and since priorityStatusEntries is sorted we look for the first entry + // that doesn't have a higher priority. + if (known.priority <= priority) { + return known; + } + } + + // If we have a complete sync, that necessarily includes all priorities. + return ( + priority: priority, + hasSynced: hasSynced, + lastSyncedAt: lastSyncedAt + ); + } + + @override + int get hashCode { + return Object.hash( + connected, + downloading, + uploading, + connecting, + uploadError, + downloadError, + lastSyncedAt, + _statusEquality.hash(priorityStatusEntries), + downloadProgress, + ); + } + + @override + String toString() { + return "SyncStatus"; + } + + // This should be a ListEquality, but that appears to + // cause weird type errors with DDC (but only after hot reloads?!) + static const _statusEquality = ListEquality(); +} + +/// The priority of a PowerSync bucket. +extension type const BucketPriority._(int priorityNumber) { + static const _highest = 0; + + factory BucketPriority(int i) { + assert(i >= _highest); + return BucketPriority._(i); + } + + bool operator >(BucketPriority other) => comparator(this, other) > 0; + bool operator >=(BucketPriority other) => comparator(this, other) >= 0; + bool operator <(BucketPriority other) => comparator(this, other) < 0; + bool operator <=(BucketPriority other) => comparator(this, other) <= 0; + + /// A [Comparator] instance suitable for comparing [BucketPriority] values. + static int comparator(BucketPriority a, BucketPriority b) => + -a.priorityNumber.compareTo(b.priorityNumber); +} + +/// Partial information about the synchronization status for buckets within a +/// priority. +typedef SyncPriorityStatus = ({ + BucketPriority priority, + DateTime? lastSyncedAt, + bool? hasSynced, +}); + +/// Stats of the local upload queue. +class UploadQueueStats { + /// Number of records in the upload queue. + int count; + + /// Size of the upload queue in bytes. + int? size; + + UploadQueueStats({required this.count, this.size}); + + @override + String toString() { + if (size == null) { + return "UploadQueueStats"; + } else { + return "UploadQueueStats"; + } + } +} + +/// Per-bucket download progress information. +@internal +typedef BucketProgress = ({ + BucketPriority priority, + int atLast, + int sinceLast, + int targetCount, +}); + +@internal +final class InternalSyncDownloadProgress extends ProgressWithOperations { + final Map buckets; + + InternalSyncDownloadProgress(this.buckets) + : super._( + buckets.values.map((e) => e.targetCount - e.atLast).sum, + buckets.values.map((e) => e.sinceLast).sum, + ); + + factory InternalSyncDownloadProgress.forNewCheckpoint( + Map localProgress, Checkpoint target) { + final buckets = {}; + for (final bucket in target.checksums) { + final savedProgress = localProgress[bucket.bucket]; + + buckets[bucket.bucket] = ( + priority: BucketPriority._(bucket.priority), + atLast: savedProgress?.atLast ?? 0, + sinceLast: savedProgress?.sinceLast ?? 0, + targetCount: bucket.count ?? 0, + ); + } + + return InternalSyncDownloadProgress(buckets); + } + + static InternalSyncDownloadProgress ofPublic(SyncDownloadProgress public) { + return public._internal; + } + + /// Sums the total target and completed operations for all buckets up until + /// the given [priority] (inclusive). + ProgressWithOperations untilPriority(BucketPriority priority) { + final (total, downloaded) = + buckets.values.where((e) => e.priority >= priority).fold( + (0, 0), + (prev, entry) { + final downloaded = entry.sinceLast; + final total = entry.targetCount - entry.atLast; + return (prev.$1 + total, prev.$2 + downloaded); + }, + ); + + return ProgressWithOperations._(total, downloaded); + } + + InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) { + final newBucketStates = Map.of(buckets); + for (final dataForBucket in batch.buckets) { + final previous = newBucketStates[dataForBucket.bucket]!; + newBucketStates[dataForBucket.bucket] = ( + priority: previous.priority, + atLast: previous.atLast, + sinceLast: previous.sinceLast + dataForBucket.data.length, + targetCount: previous.targetCount, + ); + } + + return InternalSyncDownloadProgress(newBucketStates); + } + + SyncDownloadProgress get asSyncDownloadProgress => + SyncDownloadProgress._(this); + + @override + int get hashCode => _mapEquality.hash(buckets); + + @override + bool operator ==(Object other) { + return other is InternalSyncDownloadProgress && + // totalOperations and downloadedOperations are derived values, but + // comparing them first helps find a difference faster. + totalOperations == other.totalOperations && + downloadedOperations == other.downloadedOperations && + _mapEquality.equals(buckets, other.buckets); + } + + @override + String toString() { + final all = asSyncDownloadProgress; + return 'for total: ${all.downloadedOperations} / ${all.totalOperations}'; + } + + static const _mapEquality = MapEquality(); +} + +/// Information about a progressing download. +/// +/// This reports the `total` amount of operations to download, how many of them +/// have alreaady been `completed` and finally a `fraction` indicating relative +/// progress (as a number between `0.0` and `1.0`) +/// +/// To obtain these values, use [SyncDownloadProgress] available through +/// [SyncStatus.downloadProgress]. +final class ProgressWithOperations { + /// How many operations need to be downloaded in total until the current + /// download is complete. + final int totalOperations; + + /// How many operations have already been downloaded since the last complete + /// download. + final int downloadedOperations; + + ProgressWithOperations._(this.totalOperations, this.downloadedOperations); + + /// Relative progress (as a number between `0.0` and `1.0`). + double get downloadedFraction { + return totalOperations == 0 ? 0.0 : downloadedOperations / totalOperations; + } +} + +/// Provides realtime progress on how PowerSync is downloading rows. +/// +/// This type reports progress by implementing [ProgressWithOperations], meaning +/// that [downloadedOperations], [totalOperations] and [downloadedFraction] are +/// available on instances of [SyncDownloadProgress]. +/// Additionally, it's possible to obtain the progress towards a specific +/// priority only (instead of tracking progress for the entire download) by +/// using [untilPriority]. +/// +/// The reported progress always reflects the status towards the end of a +/// sync iteration (after which a consistent snapshot of all buckets is +/// available locally). +/// +/// In rare cases (in particular, when a [compacting] operation takes place +/// between syncs), it's possible for the returned numbers to be slightly +/// inaccurate. For this reason, [SyncDownloadProgress] should be seen as an +/// approximation of progress. The information returned is good enough to build +/// progress bars, but not exact enough to track individual download counts. +/// +/// Also note that data is downloaded in bulk, which means that individual +/// counters are unlikely to be updated one-by-one. +/// +/// [compacting]: https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets +extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) + implements ProgressWithOperations { + /// Returns download progress towards all data up until the specified + /// [priority] being received. + /// + /// The returned [ProgressWithOperations] tracks the target amount of + /// operations that need to be downloaded in total and how many of them have + /// already been received. + ProgressWithOperations untilPriority(BucketPriority priority) { + return _internal.untilPriority(priority); + } +} diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart deleted file mode 100644 index 86579907..00000000 --- a/packages/powersync_core/lib/src/sync_status.dart +++ /dev/null @@ -1,204 +0,0 @@ -import 'package:collection/collection.dart'; - -final class SyncStatus { - /// true if currently connected. - /// - /// This means the PowerSync connection is ready to download, and - /// [PowerSyncBackendConnector.uploadData] may be called for any local changes. - final bool connected; - - /// true if the PowerSync connection is busy connecting. - /// - /// During this stage, [PowerSyncBackendConnector.uploadData] may already be called, - /// called, and [uploading] may be true. - final bool connecting; - - /// true if actively downloading changes. - /// - /// This is only true when [connected] is also true. - final bool downloading; - - /// true if uploading changes - final bool uploading; - - /// Time that a last sync has fully completed, if any. - /// - /// This is null while loading the database. - final DateTime? lastSyncedAt; - - /// Indicates whether there has been at least one full sync, if any. - /// Is null when unknown, for example when state is still being loaded from the database. - final bool? hasSynced; - - /// Error during uploading. - /// - /// Cleared on the next successful upload. - final Object? uploadError; - - /// Error during downloading (including connecting). - /// - /// Cleared on the next successful data download. - final Object? downloadError; - - final List priorityStatusEntries; - - const SyncStatus({ - this.connected = false, - this.connecting = false, - this.lastSyncedAt, - this.hasSynced, - this.downloading = false, - this.uploading = false, - this.downloadError, - this.uploadError, - this.priorityStatusEntries = const [], - }); - - @override - bool operator ==(Object other) { - return (other is SyncStatus && - other.connected == connected && - other.downloading == downloading && - other.uploading == uploading && - other.connecting == connecting && - other.downloadError == downloadError && - other.uploadError == uploadError && - other.lastSyncedAt == lastSyncedAt && - other.hasSynced == hasSynced && - _statusEquality.equals( - other.priorityStatusEntries, priorityStatusEntries)); - } - - SyncStatus copyWith({ - bool? connected, - bool? downloading, - bool? uploading, - bool? connecting, - Object? uploadError, - Object? downloadError, - DateTime? lastSyncedAt, - bool? hasSynced, - List? priorityStatusEntries, - }) { - return SyncStatus( - connected: connected ?? this.connected, - downloading: downloading ?? this.downloading, - uploading: uploading ?? this.uploading, - connecting: connecting ?? this.connecting, - uploadError: uploadError ?? this.uploadError, - downloadError: downloadError ?? this.downloadError, - lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt, - hasSynced: hasSynced ?? this.hasSynced, - priorityStatusEntries: - priorityStatusEntries ?? this.priorityStatusEntries, - ); - } - - /// Get the current [downloadError] or [uploadError]. - Object? get anyError { - return downloadError ?? uploadError; - } - - /// Returns information for [lastSyncedAt] and [hasSynced] information at a - /// partial sync priority, or `null` if the status for that priority is - /// unknown. - /// - /// The information returned may be more generic than requested. For instance, - /// a fully-completed sync cycle (as expressed by [lastSyncedAt]) necessarily - /// includes all buckets across all priorities. So, if no further partial - /// checkpoints have been received since that complete sync, - /// [statusForPriority] may return information for that complete sync. - /// Similarly, requesting the sync status for priority `1` may return - /// information extracted from the lower priority `2` since each partial sync - /// in priority `2` necessarily includes a consistent view over data in - /// priority `1`. - SyncPriorityStatus statusForPriority(BucketPriority priority) { - assert(priorityStatusEntries.isSortedByCompare( - (e) => e.priority, BucketPriority.comparator)); - - for (final known in priorityStatusEntries) { - // Lower-priority buckets are synchronized after higher-priority buckets, - // and since priorityStatusEntries is sorted we look for the first entry - // that doesn't have a higher priority. - if (known.priority <= priority) { - return known; - } - } - - // If we have a complete sync, that necessarily includes all priorities. - return ( - priority: priority, - hasSynced: hasSynced, - lastSyncedAt: lastSyncedAt - ); - } - - @override - int get hashCode { - return Object.hash( - connected, - downloading, - uploading, - connecting, - uploadError, - downloadError, - lastSyncedAt, - _statusEquality.hash(priorityStatusEntries)); - } - - @override - String toString() { - return "SyncStatus"; - } - - // This should be a ListEquality, but that appears to - // cause weird type errors with DDC (but only after hot reloads?!) - static const _statusEquality = ListEquality(); -} - -/// The priority of a PowerSync bucket. -extension type const BucketPriority._(int priorityNumber) { - static const _highest = 0; - - factory BucketPriority(int i) { - assert(i >= _highest); - return BucketPriority._(i); - } - - bool operator >(BucketPriority other) => comparator(this, other) > 0; - bool operator >=(BucketPriority other) => comparator(this, other) >= 0; - bool operator <(BucketPriority other) => comparator(this, other) < 0; - bool operator <=(BucketPriority other) => comparator(this, other) <= 0; - - /// A [Comparator] instance suitable for comparing [BucketPriority] values. - static int comparator(BucketPriority a, BucketPriority b) => - -a.priorityNumber.compareTo(b.priorityNumber); -} - -/// Partial information about the synchronization status for buckets within a -/// priority. -typedef SyncPriorityStatus = ({ - BucketPriority priority, - DateTime? lastSyncedAt, - bool? hasSynced, -}); - -/// Stats of the local upload queue. -class UploadQueueStats { - /// Number of records in the upload queue. - int count; - - /// Size of the upload queue in bytes. - int? size; - - UploadQueueStats({required this.count, this.size}); - - @override - String toString() { - if (size == null) { - return "UploadQueueStats"; - } else { - return "UploadQueueStats"; - } - } -} diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index be930fb4..574c2b84 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -6,7 +6,7 @@ import 'package:sqlite_async/web.dart'; import 'package:web/web.dart'; import '../database/web/web_powersync_database.dart'; -import '../streaming_sync.dart'; +import '../sync/streaming_sync.dart'; import 'sync_worker_protocol.dart'; class SyncWorkerHandle implements StreamingSync { diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ebe6d586..f1dae4eb 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -13,7 +13,7 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide RequestMode; diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index c8f343cf..cd6b241a 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -7,7 +7,7 @@ import 'package:web/web.dart'; import '../connector.dart'; import '../log.dart'; -import '../sync_status.dart'; +import '../sync/sync_status.dart'; /// Names used in [SyncWorkerMessage] enum SyncWorkerMessageType { @@ -146,6 +146,50 @@ extension type SerializedCredentials._(JSObject _) implements JSObject { } } +@anonymous +extension type SerializedBucketProgress._(JSObject _) implements JSObject { + external factory SerializedBucketProgress({ + required String name, + required int priority, + required int atLast, + required int sinceLast, + required int targetCount, + }); + + external String name; + external int priority; + external int atLast; + external int sinceLast; + external int targetCount; + + static JSArray serialize( + Map buckets) { + return [ + for (final MapEntry(:key, :value) in buckets.entries) + SerializedBucketProgress( + name: key, + priority: value.priority.priorityNumber, + atLast: value.atLast, + sinceLast: value.sinceLast, + targetCount: value.targetCount, + ), + ].toJS; + } + + static Map deserialize( + JSArray array) { + return { + for (final entry in array.toDart) + entry.name: ( + priority: BucketPriority(entry.priority), + atLast: entry.atLast, + sinceLast: entry.sinceLast, + targetCount: entry.targetCount, + ), + }; + } +} + @anonymous extension type SerializedSyncStatus._(JSObject _) implements JSObject { external factory SerializedSyncStatus({ @@ -158,6 +202,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required String? uploadError, required String? downloadError, required JSArray? priorityStatusEntries, + required JSArray? syncProgress, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -178,6 +223,11 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { entry.hasSynced?.toJS, ].toJS ].toJS, + syncProgress: switch (status.downloadProgress) { + null => null, + var other => SerializedBucketProgress.serialize( + InternalSyncDownloadProgress.ofPublic(other).buckets), + }, ); } @@ -190,6 +240,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external String? uploadError; external String? downloadError; external JSArray? priorityStatusEntries; + external JSArray? syncProgress; SyncStatus asSyncStatus() { return SyncStatus( @@ -219,6 +270,12 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { ); }) ], + downloadProgress: switch (syncProgress) { + null => null, + final serializedProgress => InternalSyncDownloadProgress( + SerializedBucketProgress.deserialize(serializedProgress)) + .asSyncDownloadProgress, + }, ); } } diff --git a/packages/powersync_core/lib/src/web/web_bucket_storage.dart b/packages/powersync_core/lib/src/web/web_bucket_storage.dart index c2f276e8..4ba46b07 100644 --- a/packages/powersync_core/lib/src/web/web_bucket_storage.dart +++ b/packages/powersync_core/lib/src/web/web_bucket_storage.dart @@ -1,5 +1,5 @@ import 'package:powersync_core/sqlite_async.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:sqlite_async/web.dart'; class WebBucketStorage extends BucketStorage { diff --git a/packages/powersync_core/test/bucket_storage_test.dart b/packages/powersync_core/test/bucket_storage_test.dart index d1b29a3f..4676efd4 100644 --- a/packages/powersync_core/test/bucket_storage_test.dart +++ b/packages/powersync_core/test/bucket_storage_test.dart @@ -1,6 +1,6 @@ import 'package:powersync_core/powersync_core.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 359f68e7..4469d576 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -3,8 +3,8 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; import 'package:powersync_core/src/log_internal.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; import 'server/sync_server/in_memory_sync_server.dart'; @@ -20,19 +20,14 @@ void main() { late CommonDatabase raw; late PowerSyncDatabase database; late MockSyncService syncService; + late StreamingSync syncClient; var credentialsCallbackCount = 0; - setUp(() async { - credentialsCallbackCount = 0; + void createSyncClient() { final (client, server) = inMemoryServer(); - syncService = MockSyncService(); server.mount(syncService.router.call); - factory = await testUtils.testFactory(); - (raw, database) = await factory.openInMemoryDatabase(); - await database.initialize(); - syncClient = database.connectWithMockService( client, TestConnector(() async { @@ -44,6 +39,16 @@ void main() { ); }), ); + } + + setUp(() async { + credentialsCallbackCount = 0; + syncService = MockSyncService(); + + factory = await testUtils.testFactory(); + (raw, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + createSyncClient(); }); tearDown(() async { @@ -222,7 +227,8 @@ void main() { await expectLater( status, - emits(isSyncStatus(downloading: true, hasSynced: false).having( + emitsThrough( + isSyncStatus(downloading: true, hasSynced: false).having( (e) => e.statusForPriority(BucketPriority(0)).hasSynced, 'status for $prio', isTrue, @@ -240,8 +246,8 @@ void main() { 'checkpoint_complete': {'last_op_id': operationId.toString()} }); - await expectLater( - status, emits(isSyncStatus(downloading: false, hasSynced: true))); + await expectLater(status, + emitsThrough(isSyncStatus(downloading: false, hasSynced: true))); await database.waitForFirstSync(); expect(await database.getAll('SELECT * FROM customers'), hasLength(4)); }); @@ -329,14 +335,239 @@ void main() { expect(nextRequest.headers['Authorization'], 'Token token2'); expect(credentialsCallbackCount, 2); }); + + group('reports progress', () { + var lastOpId = 0; + + setUp(() => lastOpId = 0); + + BucketChecksum bucket(String name, int count, {int priority = 3}) { + return BucketChecksum( + bucket: name, priority: priority, checksum: 0, count: count); + } + + void addDataLine(String bucket, int amount) { + syncService.addLine({ + 'data': { + 'bucket': bucket, + 'data': >[ + for (var i = 0; i < amount; i++) + { + 'op_id': '${++lastOpId}', + 'op': 'PUT', + 'object_type': bucket, + 'object_id': '$lastOpId', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + } + + void addCheckpointComplete([int? priority]) { + if (priority case final partial?) { + syncService.addLine({ + 'partial_checkpoint_complete': { + 'last_op_id': '$lastOpId', + 'priority': partial, + } + }); + } else { + syncService.addLine({ + 'checkpoint_complete': { + 'last_op_id': '$lastOpId', + } + }); + } + } + + Future expectProgress( + StreamQueue status, { + required Object total, + Map priorities = const {}, + }) async { + await expectLater( + status, + emits(isSyncStatus( + downloading: true, + downloadProgress: isSyncDownloadProgress( + progress: total, + priorities: priorities, + ), + )), + ); + } + + test('without priorities', () async { + final status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + + addDataLine('a', 10); + await expectProgress(status, total: progress(10, 10)); + + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + + // Emit new data, progress should be 0/2 instead of 10/12 + syncService.addLine({ + 'checkpoint_diff': { + 'last_op_id': '12', + 'updated_buckets': [bucket('a', 12)], + 'removed_buckets': const [], + }, + }); + await expectProgress(status, total: progress(0, 2)); + addDataLine('a', 2); + await expectProgress(status, total: progress(2, 2)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('interrupted sync', () async { + var status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + addDataLine('a', 5); + await expectProgress(status, total: progress(5, 10)); + + // Emulate the app closing - create a new independent sync client. + await syncClient.abort(); + syncService.endCurrentListener(); + + createSyncClient(); + status = await waitForConnection(); + + // Send same checkpoint again + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + + // Progress should be restored instead of saying e.g 0/5 now. + await expectProgress(status, total: progress(5, 10)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('interrupted sync with new checkpoint', () async { + var status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + addDataLine('a', 5); + await expectProgress(status, total: progress(5, 10)); + + // Emulate the app closing - create a new independent sync client. + await syncClient.abort(); + syncService.endCurrentListener(); + + createSyncClient(); + status = await waitForConnection(); + + // Send checkpoint with additional data + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '12', + checksums: [bucket('a', 12)], + ) + }); + + await expectProgress(status, total: progress(5, 12)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('different priorities', () async { + var status = await waitForConnection(); + Future checkProgress(Object prio0, Object prio2) async { + await expectProgress( + status, + priorities: { + BucketPriority(0): prio0, + BucketPriority(2): prio2, + }, + total: prio2, + ); + } + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [ + bucket('a', 5, priority: 0), + bucket('b', 5, priority: 2) + ], + ), + }); + await checkProgress(progress(0, 5), progress(0, 10)); + + addDataLine('a', 5); + await checkProgress(progress(5, 5), progress(5, 10)); + + addCheckpointComplete(0); + await checkProgress(progress(5, 5), progress(5, 10)); + + addDataLine('b', 2); + await checkProgress(progress(5, 5), progress(7, 10)); + + // Before syncing b fully, send a new checkpoint + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '14', + checksums: [ + bucket('a', 8, priority: 0), + bucket('b', 6, priority: 2) + ], + ), + }); + await checkProgress(progress(5, 8), progress(7, 14)); + + addDataLine('a', 3); + await checkProgress(progress(8, 8), progress(10, 14)); + + addCheckpointComplete(0); + await checkProgress(progress(8, 8), progress(10, 14)); + + addDataLine('b', 4); + await checkProgress(progress(8, 8), progress(14, 14)); + + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + }); }); } -TypeMatcher isSyncStatus( - {Object? downloading, - Object? connected, - Object? connecting, - Object? hasSynced}) { +TypeMatcher isSyncStatus({ + Object? downloading, + Object? connected, + Object? connecting, + Object? hasSynced, + Object? downloadProgress, +}) { var matcher = isA(); if (downloading != null) { matcher = matcher.having((e) => e.downloading, 'downloading', downloading); @@ -350,6 +581,30 @@ TypeMatcher isSyncStatus( if (hasSynced != null) { matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); } + if (downloadProgress != null) { + matcher = matcher.having( + (e) => e.downloadProgress, 'downloadProgress', downloadProgress); + } return matcher; } + +TypeMatcher isSyncDownloadProgress({ + required Object progress, + Map priorities = const {}, +}) { + var matcher = + isA().having((e) => e, 'untilCompletion', progress); + priorities.forEach((priority, expected) { + matcher = matcher.having( + (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); + }); + + return matcher; +} + +TypeMatcher progress(int completed, int total) { + return isA() + .having((e) => e.downloadedOperations, 'completed', completed) + .having((e) => e.totalOperations, 'total', total); +} diff --git a/packages/powersync_core/test/stream_test.dart b/packages/powersync_core/test/stream_test.dart index 4806269a..852be90d 100644 --- a/packages/powersync_core/test/stream_test.dart +++ b/packages/powersync_core/test/stream_test.dart @@ -6,7 +6,7 @@ import 'dart:convert'; import 'dart:io'; import 'package:http/http.dart'; -import 'package:powersync_core/src/stream_utils.dart'; +import 'package:powersync_core/src/sync/stream_utils.dart'; import 'package:test/test.dart'; void main() { diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync_types_test.dart index 18e06931..261152b2 100644 --- a/packages/powersync_core/test/sync_types_test.dart +++ b/packages/powersync_core/test/sync_types_test.dart @@ -1,7 +1,7 @@ import 'dart:async'; -import 'package:powersync_core/src/sync_status.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; void main() { diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 48c73e03..8f7c3d94 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,8 +1,8 @@ import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test_api/src/backend/invoker.dart'; diff --git a/packages/powersync_core/test/utils/native_test_utils.dart b/packages/powersync_core/test/utils/native_test_utils.dart index 65916a5f..04bc0e73 100644 --- a/packages/powersync_core/test/utils/native_test_utils.dart +++ b/packages/powersync_core/test/utils/native_test_utils.dart @@ -19,6 +19,12 @@ class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { + // Prefer using Homebrew's SQLite which allows loading extensions. + const fromHomebrew = '/opt/homebrew/opt/sqlite/lib/libsqlite3.dylib'; + if (File(fromHomebrew).existsSync()) { + return DynamicLibrary.open(fromHomebrew); + } + return DynamicLibrary.open('libsqlite3.dylib'); }); }