From 83e90ee15eccfe71d48f9028bfeb867f1daf7342 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 13 Mar 2025 17:05:01 +0100 Subject: [PATCH 01/16] Add sync download progress interfaces --- .../powersync_core/lib/powersync_core.dart | 3 +- .../powersync_core/lib/src/sync_status.dart | 88 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 54e67156..7eeb7388 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,5 +10,6 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; -export 'src/sync_status.dart'; +export 'src/sync_status.dart' + hide InternalSyncDownloadProgress, OperationCounter; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index 86579907..d66feb63 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -1,4 +1,5 @@ import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; final class SyncStatus { /// true if currently connected. @@ -18,6 +19,12 @@ final class SyncStatus { /// This is only true when [connected] is also true. final bool downloading; + /// A realtime progress report on how many operations have been downloaded and + /// how many are necessary in total to complete the next sync iteration. + /// + /// This field is only set when [downloading] is also true. + final SyncDownloadProgress? downloadProgress; + /// true if uploading changes final bool uploading; @@ -47,6 +54,7 @@ final class SyncStatus { this.connecting = false, this.lastSyncedAt, this.hasSynced, + this.downloadProgress, this.downloading = false, this.uploading = false, this.downloadError, @@ -202,3 +210,83 @@ class UploadQueueStats { } } } + +@internal +typedef OperationCounter = ({BucketPriority priority, int opCount}); + +@internal +final class InternalSyncDownloadProgress { + final List downloaded; + final List target; + + final int _totalDownloaded; + final int _totalTarget; + + InternalSyncDownloadProgress(this.downloaded, this.target) + : _totalDownloaded = downloaded.map((e) => e.opCount).sum, + _totalTarget = target.map((e) => e.opCount).sum; + + static int sumInPriority( + List counters, BucketPriority priority) { + return counters + .where((e) => e.priority >= priority) + .map((e) => e.opCount) + .sum; + } +} + +/// Provides realtime progress about how PowerSync is downloading rows. +/// +/// The reported progress always reflects the status towards the end of a +/// sync iteration (after which a consistent snapshot of all buckets is +/// available locally). Note that [downloaded] starts at `0` every time an +/// iteration begins. +/// This has an effect when iterations are interrupted. Consider this flow +/// as an example: +/// +/// 1. The client comes online for the first time and has to synchronize a +/// large amount of rows (say 100k). Here, [downloaded] starts at `0` and +/// [total] would be the `100,000` rows. +/// 2. The client makes some progress, so that [downloaded] is perhaps +/// `60,000`. +/// 3. The client briefly looses connectivity. +/// 4. Back online, a new sync iteration starts. This means that [downloaded] +/// is reset to `0`. However, since half of the target has already been +/// downloaded in the earlier iteration, [total] is now set to `40,000` to +/// reflect the remaining rows to download in the new iteration. +extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { + /// The amount of operations that have been downloaded in the current sync + /// iteration. + /// + /// This number always starts at zero as [SyncStatus.downloading] changes + /// from `false` to `true`. + int get downloaded => _internal._totalDownloaded; + + /// The total amount of operations expected for this sync operation. + int get total => _internal._totalTarget; + + /// The fraction of [total] operations that have already been [downloaded], as + /// a number between 0 and 1. + double get progress => _internal._totalDownloaded / _internal._totalTarget; + + int downloadedFor(BucketPriority priority) { + return InternalSyncDownloadProgress.sumInPriority( + _internal.downloaded, priority); + } + + int totalFor(BucketPriority priority) { + return InternalSyncDownloadProgress.sumInPriority( + _internal.target, priority); + } + + double progressFor(BucketPriority priority) { + final downloaded = downloadedFor(priority); + final total = totalFor(priority); + + if (total == 0) { + return 0; + } + + return downloaded / total; + } +} From 27623d9eb2f5be1d9adc4cf47d4a3efe5b2bb966 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 21 Mar 2025 15:57:31 +0100 Subject: [PATCH 02/16] Start tracking download operations --- .../lib/src/bucket_storage.dart | 2 +- .../native/native_powersync_database.dart | 2 +- .../database/web/web_powersync_database.dart | 2 +- .../lib/src/sync/mutable_sync_status.dart | 111 ++++++++++++ .../{sync_types.dart => sync/protocol.dart} | 2 +- .../lib/src/{ => sync}/stream_utils.dart | 0 .../lib/src/{ => sync}/streaming_sync.dart | 170 +++++------------- .../powersync_core/lib/src/sync_status.dart | 63 ++++++- .../lib/src/web/sync_controller.dart | 2 +- .../lib/src/web/sync_worker.dart | 2 +- .../test/bucket_storage_test.dart | 2 +- .../test/in_memory_sync_test.dart | 4 +- packages/powersync_core/test/stream_test.dart | 2 +- .../powersync_core/test/sync_types_test.dart | 2 +- .../test/utils/abstract_test_utils.dart | 2 +- 15 files changed, 229 insertions(+), 139 deletions(-) create mode 100644 packages/powersync_core/lib/src/sync/mutable_sync_status.dart rename packages/powersync_core/lib/src/{sync_types.dart => sync/protocol.dart} (99%) rename packages/powersync_core/lib/src/{ => sync}/stream_utils.dart (100%) rename packages/powersync_core/lib/src/{ => sync}/streaming_sync.dart (80%) diff --git a/packages/powersync_core/lib/src/bucket_storage.dart b/packages/powersync_core/lib/src/bucket_storage.dart index d79e4c05..1f18a41e 100644 --- a/packages/powersync_core/lib/src/bucket_storage.dart +++ b/packages/powersync_core/lib/src/bucket_storage.dart @@ -7,7 +7,7 @@ import 'package:powersync_core/sqlite3_common.dart'; import 'crud.dart'; import 'schema_logic.dart'; -import 'sync_types.dart'; +import 'sync/protocol.dart'; const compactOperationInterval = 1000; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 0b61898c..cfc70cf5 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -16,7 +16,7 @@ import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory. import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync_status.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index a55aa5b4..980dcae9 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -11,7 +11,7 @@ import 'package:powersync_core/src/log.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/web/web_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:powersync_core/src/schema_logic.dart' as schema_logic; diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart new file mode 100644 index 00000000..85ef87a6 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -0,0 +1,111 @@ +import 'dart:async'; + +import 'package:collection/collection.dart'; + +import '../sync_status.dart'; +import 'protocol.dart'; + +final class MutableSyncStatus { + bool connected = false; + bool connecting = false; + bool downloading = false; + bool uploading = false; + + InternalSyncDownloadProgress? downloadProgress; + List priorityStatusEntries = const []; + + DateTime? lastSyncedAt; + + Object? uploadError; + Object? downloadError; + + void setConnectingIfNotConnected() { + if (!connected) { + connecting = true; + } + } + + void setConnected() { + connected = true; + connecting = false; + } + + void applyDownloadError(Object error) { + connected = false; + connecting = false; + downloading = false; + downloadProgress = null; + downloadError = error; + } + + void applyCheckpointReached(Checkpoint applied) { + downloading = false; + downloadError = null; + final now = lastSyncedAt = DateTime.now(); + priorityStatusEntries = [ + if (applied.checksums.isNotEmpty) + ( + hasSynced: true, + lastSyncedAt: now, + priority: maxBy( + applied.checksums.map((cs) => BucketPriority(cs.priority)), + (priority) => priority, + compare: BucketPriority.comparator, + )!, + ) + ]; + } + + void applyCheckpointStarted(Checkpoint target) { + downloading = true; + downloadProgress = InternalSyncDownloadProgress.fromZero(target); + } + + void applyUploadError(Object error) { + uploading = false; + uploadError = error; + } + + SyncStatus immutableSnapsot() { + return SyncStatus( + connected: connected, + connecting: connecting, + downloading: downloading, + uploading: uploading, + downloadProgress: downloadProgress?.asSyncDownloadProgress, + priorityStatusEntries: UnmodifiableListView(priorityStatusEntries), + lastSyncedAt: lastSyncedAt, + hasSynced: lastSyncedAt != null, + uploadError: uploadError, + downloadError: downloadError, + ); + } +} + +final class SyncStatusStateStream { + final MutableSyncStatus status = MutableSyncStatus(); + SyncStatus _lastPublishedStatus = const SyncStatus(); + + final StreamController _statusStreamController = + StreamController.broadcast(); + + Stream get statusStream => _statusStreamController.stream; + + void updateStatus(void Function(MutableSyncStatus status) change) { + change(status); + + if (_statusStreamController.isClosed) { + return; + } + + final current = status.immutableSnapsot(); + if (current != _lastPublishedStatus) { + _statusStreamController.add(current); + _lastPublishedStatus = current; + } + } + + void close() { + _statusStreamController.close(); + } +} diff --git a/packages/powersync_core/lib/src/sync_types.dart b/packages/powersync_core/lib/src/sync/protocol.dart similarity index 99% rename from packages/powersync_core/lib/src/sync_types.dart rename to packages/powersync_core/lib/src/sync/protocol.dart index 968b53d4..5e9c6362 100644 --- a/packages/powersync_core/lib/src/sync_types.dart +++ b/packages/powersync_core/lib/src/sync/protocol.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'dart:convert'; -import 'bucket_storage.dart'; +import '../bucket_storage.dart'; /// Messages sent from the sync service. sealed class StreamingSyncLine { diff --git a/packages/powersync_core/lib/src/stream_utils.dart b/packages/powersync_core/lib/src/sync/stream_utils.dart similarity index 100% rename from packages/powersync_core/lib/src/stream_utils.dart rename to packages/powersync_core/lib/src/sync/stream_utils.dart diff --git a/packages/powersync_core/lib/src/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart similarity index 80% rename from packages/powersync_core/lib/src/streaming_sync.dart rename to packages/powersync_core/lib/src/sync/streaming_sync.dart index 716b38bf..5a3cb4cb 100644 --- a/packages/powersync_core/lib/src/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -1,24 +1,21 @@ import 'dart:async'; import 'dart:convert' as convert; -import 'package:collection/collection.dart'; import 'package:http/http.dart' as http; +import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/user_agent/user_agent.dart'; import 'package:sqlite_async/mutex.dart'; -import 'bucket_storage.dart'; -import 'connector.dart'; -import 'crud.dart'; +import '../bucket_storage.dart'; +import '../connector.dart'; +import '../crud.dart'; +import 'mutable_sync_status.dart'; import 'stream_utils.dart'; -import 'sync_status.dart'; -import 'sync_types.dart'; - -/// Since we use null to indicate "no change" in status updates, we need -/// a different value to indicate "no error". -const _noError = Object(); +import '../sync_status.dart'; +import 'protocol.dart'; abstract interface class StreamingSync { Stream get statusStream; @@ -29,13 +26,14 @@ abstract interface class StreamingSync { Future abort(); } +@internal class StreamingSyncImplementation implements StreamingSync { BucketStorage adapter; final Future Function() credentialsCallback; final Future Function()? invalidCredentialsCallback; - final Future Function() uploadCrud; + final Stream crudUpdateTriggerStream; // An internal controller which is used to trigger CRUD uploads internally // e.g. when reconnecting. @@ -44,15 +42,8 @@ class StreamingSyncImplementation implements StreamingSync { final StreamController _internalCrudTriggerController = StreamController.broadcast(); - final Stream crudUpdateTriggerStream; - - final StreamController _statusStreamController = - StreamController.broadcast(); - - @override - late final Stream statusStream; - - late final http.Client _client; + final http.Client _client; + final SyncStatusStateStream _state = SyncStatusStateStream(); final StreamController _localPingController = StreamController.broadcast(); @@ -61,8 +52,6 @@ class StreamingSyncImplementation implements StreamingSync { final Map? syncParameters; - SyncStatus lastStatus = const SyncStatus(); - AbortController? _abort; bool _safeToClose = true; @@ -70,7 +59,6 @@ class StreamingSyncImplementation implements StreamingSync { final Mutex syncMutex, crudMutex; final Map _userAgentHeaders; - String? clientId; StreamingSyncImplementation( @@ -86,12 +74,13 @@ class StreamingSyncImplementation implements StreamingSync { /// A unique identifier for this streaming sync implementation /// A good value is typically the DB file path which it will mutate when syncing. String? identifier = "unknown"}) - : syncMutex = Mutex(identifier: "sync-$identifier"), + : _client = client, + syncMutex = Mutex(identifier: "sync-$identifier"), crudMutex = Mutex(identifier: "crud-$identifier"), - _userAgentHeaders = userAgentHeaders() { - _client = client; - statusStream = _statusStreamController.stream; - } + _userAgentHeaders = userAgentHeaders(); + + @override + Stream get statusStream => _state.statusStream; @override Future abort() async { @@ -119,17 +108,13 @@ class StreamingSyncImplementation implements StreamingSync { // Now close the client in all cases not covered above _client.close(); - _statusStreamController.close(); + _state.close(); } bool get aborted { return _abort?.aborted ?? false; } - bool get isConnected { - return lastStatus.connected; - } - @override Future streamingSync() async { try { @@ -138,7 +123,7 @@ class StreamingSyncImplementation implements StreamingSync { crudLoop(); var invalidCredentials = false; while (!aborted) { - _updateStatus(connecting: true); + _state.updateStatus((s) => s.setConnectingIfNotConnected()); try { if (invalidCredentials && invalidCredentialsCallback != null) { // This may error. In that case it will be retried again on the next @@ -160,11 +145,7 @@ class StreamingSyncImplementation implements StreamingSync { isolateLogger.warning('Sync error: $message', e, stacktrace); invalidCredentials = true; - _updateStatus( - connected: false, - connecting: true, - downloading: false, - downloadError: e); + _state.updateStatus((s) => s.applyDownloadError(e)); // On error, wait a little before retrying // When aborting, don't wait @@ -208,7 +189,7 @@ class StreamingSyncImplementation implements StreamingSync { // This is the first item in the FIFO CRUD queue. CrudEntry? nextCrudItem = await adapter.nextCrudItem(); if (nextCrudItem != null) { - _updateStatus(uploading: true); + _state.updateStatus((s) => s.uploading = true); if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. isolateLogger.warning( @@ -221,7 +202,7 @@ class StreamingSyncImplementation implements StreamingSync { checkedCrudItem = nextCrudItem; await uploadCrud(); - _updateStatus(uploadError: _noError); + _state.updateStatus((s) => s.uploadError = null); } else { // Uploading is completed await adapter.updateLocalTarget(() => getWriteCheckpoint()); @@ -230,9 +211,10 @@ class StreamingSyncImplementation implements StreamingSync { } catch (e, stacktrace) { checkedCrudItem = null; isolateLogger.warning('Data upload error', e, stacktrace); - _updateStatus(uploading: false, uploadError: e); + _state.updateStatus((s) => s.applyUploadError(e)); await _delayRetry(); - if (!isConnected) { + + if (!_state.status.connected) { // Exit the upload loop if the sync stream is no longer connected break; } @@ -241,7 +223,7 @@ class StreamingSyncImplementation implements StreamingSync { e, stacktrace); } finally { - _updateStatus(uploading: false); + _state.updateStatus((s) => s.uploading = false); } } }, timeout: retryDelay); @@ -276,50 +258,15 @@ class StreamingSyncImplementation implements StreamingSync { } void _updateStatusForPriority(SyncPriorityStatus completed) { - // All status entries with a higher priority can be deleted since this - // partial sync includes them. - _updateStatus(priorityStatusEntries: [ - for (final entry in lastStatus.priorityStatusEntries) - if (entry.priority < completed.priority) entry, - completed - ]); - } - - /// Update sync status based on any non-null parameters. - /// To clear errors, use [_noError] instead of null. - void _updateStatus({ - DateTime? lastSyncedAt, - bool? hasSynced, - bool? connected, - bool? connecting, - bool? downloading, - bool? uploading, - Object? uploadError, - Object? downloadError, - List? priorityStatusEntries, - }) { - final c = connected ?? lastStatus.connected; - var newStatus = SyncStatus( - connected: c, - connecting: !c && (connecting ?? lastStatus.connecting), - lastSyncedAt: lastSyncedAt ?? lastStatus.lastSyncedAt, - hasSynced: hasSynced ?? lastStatus.hasSynced, - downloading: downloading ?? lastStatus.downloading, - uploading: uploading ?? lastStatus.uploading, - uploadError: uploadError == _noError - ? null - : (uploadError ?? lastStatus.uploadError), - downloadError: downloadError == _noError - ? null - : (downloadError ?? lastStatus.downloadError), - priorityStatusEntries: - priorityStatusEntries ?? lastStatus.priorityStatusEntries, - ); - - if (!_statusStreamController.isClosed) { - lastStatus = newStatus; - _statusStreamController.add(newStatus); - } + _state.updateStatus((s) { + // All status entries with a higher priority can be deleted since this + // partial sync includes them. + s.priorityStatusEntries = [ + for (final entry in s.priorityStatusEntries) + if (entry.priority < completed.priority) entry, + completed + ]; + }); } Future<(List, Map)> @@ -362,7 +309,7 @@ class StreamingSyncImplementation implements StreamingSync { break; } - _updateStatus(connected: true, connecting: false); + _state.updateStatus((s) => s.setConnected()); switch (line) { case Checkpoint(): targetCheckpoint = line; @@ -377,7 +324,7 @@ class StreamingSyncImplementation implements StreamingSync { } bucketMap = newBuckets; await adapter.removeBuckets([...bucketsToDelete]); - _updateStatus(downloading: true); + _state.updateStatus((s) => s.downloading = true); case StreamingSyncCheckpointComplete(): final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { @@ -389,27 +336,9 @@ class StreamingSyncImplementation implements StreamingSync { // Checksums valid, but need more data for a consistent checkpoint. // Continue waiting. } else { - appliedCheckpoint = targetCheckpoint; - - final now = DateTime.now(); - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: now, - priorityStatusEntries: [ - if (appliedCheckpoint.checksums.isNotEmpty) - ( - hasSynced: true, - lastSyncedAt: now, - priority: maxBy( - appliedCheckpoint.checksums - .map((cs) => BucketPriority(cs.priority)), - (priority) => priority, - compare: BucketPriority.comparator, - )!, - ) - ], - ); + final thisCheckpoint = appliedCheckpoint = targetCheckpoint; + _state + .updateStatus((s) => s.applyCheckpointReached(thisCheckpoint)); } validatedCheckpoint = targetCheckpoint; @@ -438,7 +367,7 @@ class StreamingSyncImplementation implements StreamingSync { throw PowerSyncProtocolException( 'Checkpoint diff without previous checkpoint'); } - _updateStatus(downloading: true); + _state.updateStatus((s) => s.downloading = true); final diff = line; final Map newBuckets = {}; for (var checksum in targetCheckpoint.checksums) { @@ -462,7 +391,7 @@ class StreamingSyncImplementation implements StreamingSync { await adapter.removeBuckets(diff.removedBuckets); adapter.setTargetCheckpoint(targetCheckpoint); case SyncDataBatch(): - _updateStatus(downloading: true); + _state.updateStatus((s) => s.downloading = true); await adapter.saveSyncData(line); case StreamingSyncKeepalive(:final tokenExpiresIn): if (tokenExpiresIn == 0) { @@ -489,10 +418,9 @@ class StreamingSyncImplementation implements StreamingSync { isolateLogger.fine('Unknown sync line: $rawData'); case null: // Local ping if (targetCheckpoint == appliedCheckpoint) { - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + if (appliedCheckpoint case final completed?) { + _state.updateStatus((s) => s.applyCheckpointReached(completed)); + } } else if (validatedCheckpoint == targetCheckpoint) { final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { @@ -504,12 +432,8 @@ class StreamingSyncImplementation implements StreamingSync { // Checksums valid, but need more data for a consistent checkpoint. // Continue waiting. } else { - appliedCheckpoint = targetCheckpoint; - - _updateStatus( - downloading: false, - downloadError: _noError, - lastSyncedAt: DateTime.now()); + final completed = appliedCheckpoint = targetCheckpoint; + _state.updateStatus((s) => s.applyCheckpointReached(completed)); } } } diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index d66feb63..0bdbd1cd 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -1,6 +1,8 @@ import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; +import 'sync/protocol.dart'; + final class SyncStatus { /// true if currently connected. /// @@ -77,6 +79,7 @@ final class SyncStatus { other.priorityStatusEntries, priorityStatusEntries)); } + @Deprecated('Should not be used in user code') SyncStatus copyWith({ bool? connected, bool? downloading, @@ -158,12 +161,12 @@ final class SyncStatus { String toString() { return "SyncStatus"; } - - // This should be a ListEquality, but that appears to - // cause weird type errors with DDC (but only after hot reloads?!) - static const _statusEquality = ListEquality(); } +// This should be a ListEquality, but that appears to +// cause weird type errors with DDC (but only after hot reloads?!) +const _statusEquality = ListEquality(); + /// The priority of a PowerSync bucket. extension type const BucketPriority._(int priorityNumber) { static const _highest = 0; @@ -226,6 +229,23 @@ final class InternalSyncDownloadProgress { : _totalDownloaded = downloaded.map((e) => e.opCount).sum, _totalTarget = target.map((e) => e.opCount).sum; + factory InternalSyncDownloadProgress.fromZero(Checkpoint target) { + final totalOpsPerPriority = + target.checksums.groupFoldBy( + (cs) => BucketPriority(cs.priority), + (prev, cs) => (prev ?? 0) + (cs.count ?? 0), + ); + final downloaded = [ + for (final involvedPriority in totalOpsPerPriority.keys) + (priority: involvedPriority, opCount: 0), + ]; + final targetOps = totalOpsPerPriority.entries + .map((e) => (priority: e.key, opCount: e.value)) + .toList(); + + return InternalSyncDownloadProgress(downloaded, targetOps); + } + static int sumInPriority( List counters, BucketPriority priority) { return counters @@ -233,6 +253,26 @@ final class InternalSyncDownloadProgress { .map((e) => e.opCount) .sum; } + + SyncDownloadProgress get asSyncDownloadProgress => + SyncDownloadProgress._(this); + + @override + int get hashCode => Object.hash( + _statusEquality.hash(downloaded), + _statusEquality.hash(target), + ); + + @override + bool operator ==(Object other) { + return other is InternalSyncDownloadProgress && + // _totalDownloaded and _totalTarget are derived values, but comparing + // them first helps find a difference faster. + _totalDownloaded == other._totalDownloaded && + _totalTarget == other._totalTarget && + _statusEquality.equals(downloaded, other.downloaded) && + _statusEquality.equals(target, other.target); + } } /// Provides realtime progress about how PowerSync is downloading rows. @@ -269,16 +309,31 @@ extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { /// a number between 0 and 1. double get progress => _internal._totalDownloaded / _internal._totalTarget; + /// Returns how many operations have been downloaded for buckets in + /// [priority]. + /// + /// Under the consistency guarantees offered by PowerSync, this will also + /// include operations from higher-priority buckets. int downloadedFor(BucketPriority priority) { return InternalSyncDownloadProgress.sumInPriority( _internal.downloaded, priority); } + /// Returns how many operations in total need to be downloaded before the + /// client has reached a consistent states for buckets with the given + /// [priority]. + /// + /// Under the consistency guarantees offered by PowerSync, this will also + /// include operations from higher-priority buckets. int totalFor(BucketPriority priority) { return InternalSyncDownloadProgress.sumInPriority( _internal.target, priority); } + /// The progress towards syncing the given [priority]. + /// + /// Returns the fraction of [downloadedFor] to [totalFor], as a number between + /// 0 and 1. double progressFor(BucketPriority priority) { final downloaded = downloadedFor(priority); final total = totalFor(priority); diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index be930fb4..574c2b84 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -6,7 +6,7 @@ import 'package:sqlite_async/web.dart'; import 'package:web/web.dart'; import '../database/web/web_powersync_database.dart'; -import '../streaming_sync.dart'; +import '../sync/streaming_sync.dart'; import 'sync_worker_protocol.dart'; class SyncWorkerHandle implements StreamingSync { diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ebe6d586..f1dae4eb 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -13,7 +13,7 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/web.dart'; import 'package:web/web.dart' hide RequestMode; diff --git a/packages/powersync_core/test/bucket_storage_test.dart b/packages/powersync_core/test/bucket_storage_test.dart index d1b29a3f..541fd9a0 100644 --- a/packages/powersync_core/test/bucket_storage_test.dart +++ b/packages/powersync_core/test/bucket_storage_test.dart @@ -1,6 +1,6 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/bucket_storage.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 359f68e7..b587715a 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -3,8 +3,8 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; import 'package:powersync_core/src/log_internal.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; import 'server/sync_server/in_memory_sync_server.dart'; diff --git a/packages/powersync_core/test/stream_test.dart b/packages/powersync_core/test/stream_test.dart index 4806269a..852be90d 100644 --- a/packages/powersync_core/test/stream_test.dart +++ b/packages/powersync_core/test/stream_test.dart @@ -6,7 +6,7 @@ import 'dart:convert'; import 'dart:io'; import 'package:http/http.dart'; -import 'package:powersync_core/src/stream_utils.dart'; +import 'package:powersync_core/src/sync/stream_utils.dart'; import 'package:test/test.dart'; void main() { diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync_types_test.dart index 18e06931..c3e0ed76 100644 --- a/packages/powersync_core/test/sync_types_test.dart +++ b/packages/powersync_core/test/sync_types_test.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'package:powersync_core/src/sync_status.dart'; -import 'package:powersync_core/src/sync_types.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; void main() { diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 48c73e03..fcebed22 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -2,7 +2,7 @@ import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/bucket_storage.dart'; -import 'package:powersync_core/src/streaming_sync.dart'; +import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test_api/src/backend/invoker.dart'; From bd01bfdd063e5b0b9800c7a15b75aa3789bc130d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 24 Mar 2025 15:33:33 +0200 Subject: [PATCH 03/16] Serialize download progress over worker --- .../powersync_core/lib/src/sync_status.dart | 4 ++ .../lib/src/web/sync_worker_protocol.dart | 71 +++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index 0bdbd1cd..a5531b35 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -246,6 +246,10 @@ final class InternalSyncDownloadProgress { return InternalSyncDownloadProgress(downloaded, targetOps); } + static InternalSyncDownloadProgress ofPublic(SyncDownloadProgress public) { + return public._internal; + } + static int sumInPriority( List counters, BucketPriority priority) { return counters diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index c8f343cf..cebdcacb 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -146,6 +146,69 @@ extension type SerializedCredentials._(JSObject _) implements JSObject { } } +@anonymous +extension type SerializedOperationCounter._(JSObject _) implements JSObject { + external factory SerializedOperationCounter({ + required int priority, + required int opCount, + }); + + factory SerializedOperationCounter.fromDart(OperationCounter progress) { + return SerializedOperationCounter( + priority: progress.priority.priorityNumber, + opCount: progress.opCount, + ); + } + + external JSNumber get priority; + external JSNumber get opCount; + + OperationCounter get toDart { + return ( + priority: BucketPriority(priority.toDartInt), + opCount: opCount.toDartInt + ); + } +} + +@anonymous +extension type SerializedDownloadProgress._(JSObject _) implements JSObject { + external factory SerializedDownloadProgress({ + required JSArray downloaded, + required JSArray target, + }); + + external JSArray get downloaded; + external JSArray get target; + + factory SerializedDownloadProgress.fromDart( + InternalSyncDownloadProgress progress) { + return SerializedDownloadProgress( + downloaded: _serializeCounters(progress.downloaded), + target: _serializeCounters(progress.target), + ); + } + + InternalSyncDownloadProgress get toDart { + return InternalSyncDownloadProgress( + _deserializeCounters(downloaded), + _deserializeCounters(target), + ); + } + + static JSArray _serializeCounters( + List counters) { + return [ + for (final entry in counters) SerializedOperationCounter.fromDart(entry) + ].toJS; + } + + static List _deserializeCounters( + JSArray counters) { + return [for (final entry in counters.toDart) entry.toDart]; + } +} + @anonymous extension type SerializedSyncStatus._(JSObject _) implements JSObject { external factory SerializedSyncStatus({ @@ -158,6 +221,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required String? uploadError, required String? downloadError, required JSArray? priorityStatusEntries, + required SerializedDownloadProgress? syncProgress, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -178,6 +242,11 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { entry.hasSynced?.toJS, ].toJS ].toJS, + syncProgress: switch (status.downloadProgress) { + null => null, + var other => SerializedDownloadProgress.fromDart( + InternalSyncDownloadProgress.ofPublic(other)), + }, ); } @@ -190,6 +259,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external String? uploadError; external String? downloadError; external JSArray? priorityStatusEntries; + external SerializedDownloadProgress? syncProgress; SyncStatus asSyncStatus() { return SyncStatus( @@ -219,6 +289,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { ); }) ], + downloadProgress: syncProgress?.toDart.asSyncDownloadProgress, ); } } From cadd6824e0b11905c7fb83aea77fbc1e81a87bb1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 24 Mar 2025 15:56:05 +0200 Subject: [PATCH 04/16] Increment counters when receiving sync lines --- .../powersync_core/lib/powersync_core.dart | 3 +- .../lib/src/sync/mutable_sync_status.dart | 13 ++++ .../lib/src/sync/streaming_sync.dart | 3 + .../powersync_core/lib/src/sync_status.dart | 65 +++++++++++-------- .../lib/src/web/sync_worker_protocol.dart | 29 ++++----- 5 files changed, 65 insertions(+), 48 deletions(-) diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 7eeb7388..3f16aef6 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,6 +10,5 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; -export 'src/sync_status.dart' - hide InternalSyncDownloadProgress, OperationCounter; +export 'src/sync_status.dart' hide InternalSyncDownloadProgress; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 85ef87a6..b0088f53 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -58,6 +58,7 @@ final class MutableSyncStatus { void applyCheckpointStarted(Checkpoint target) { downloading = true; + // TODO: Include pending ops from interrupted download, if any... downloadProgress = InternalSyncDownloadProgress.fromZero(target); } @@ -66,6 +67,18 @@ final class MutableSyncStatus { uploadError = error; } + void applyBatchReceived( + Map currentBuckets, SyncDataBatch batch) { + downloading = true; + if (downloadProgress case final previousProgress?) { + downloadProgress = previousProgress.incrementDownloaded([ + for (final bucket in batch.buckets) + if (currentBuckets[bucket.bucket] case final knownBucket?) + (BucketPriority(knownBucket.priority), bucket.data.length), + ]); + } + } + SyncStatus immutableSnapsot() { return SyncStatus( connected: connected, diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 5a3cb4cb..5c7c5d24 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -391,6 +391,9 @@ class StreamingSyncImplementation implements StreamingSync { await adapter.removeBuckets(diff.removedBuckets); adapter.setTargetCheckpoint(targetCheckpoint); case SyncDataBatch(): + // TODO: This increments the counters before actually saving sync + // data. Might be fine though? + _state.updateStatus((s) => s.applyBatchReceived(bucketMap, line)); _state.updateStatus((s) => s.downloading = true); await adapter.saveSyncData(line); case StreamingSyncKeepalive(:final tokenExpiresIn): diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index a5531b35..751cfa9d 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -1,3 +1,5 @@ +import 'dart:math'; + import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; @@ -161,11 +163,11 @@ final class SyncStatus { String toString() { return "SyncStatus"; } -} -// This should be a ListEquality, but that appears to -// cause weird type errors with DDC (but only after hot reloads?!) -const _statusEquality = ListEquality(); + // This should be a ListEquality, but that appears to + // cause weird type errors with DDC (but only after hot reloads?!) + static const _statusEquality = ListEquality(); +} /// The priority of a PowerSync bucket. extension type const BucketPriority._(int priorityNumber) { @@ -214,34 +216,24 @@ class UploadQueueStats { } } -@internal -typedef OperationCounter = ({BucketPriority priority, int opCount}); - @internal final class InternalSyncDownloadProgress { - final List downloaded; - final List target; + final Map downloaded; + final Map target; final int _totalDownloaded; final int _totalTarget; InternalSyncDownloadProgress(this.downloaded, this.target) - : _totalDownloaded = downloaded.map((e) => e.opCount).sum, - _totalTarget = target.map((e) => e.opCount).sum; + : _totalDownloaded = target.values.sum, + _totalTarget = target.values.sum; factory InternalSyncDownloadProgress.fromZero(Checkpoint target) { - final totalOpsPerPriority = - target.checksums.groupFoldBy( + final targetOps = target.checksums.groupFoldBy( (cs) => BucketPriority(cs.priority), (prev, cs) => (prev ?? 0) + (cs.count ?? 0), ); - final downloaded = [ - for (final involvedPriority in totalOpsPerPriority.keys) - (priority: involvedPriority, opCount: 0), - ]; - final targetOps = totalOpsPerPriority.entries - .map((e) => (priority: e.key, opCount: e.value)) - .toList(); + final downloaded = targetOps.map((k, v) => MapEntry(k, 0)); return InternalSyncDownloadProgress(downloaded, targetOps); } @@ -251,20 +243,35 @@ final class InternalSyncDownloadProgress { } static int sumInPriority( - List counters, BucketPriority priority) { - return counters - .where((e) => e.priority >= priority) - .map((e) => e.opCount) + Map counters, BucketPriority priority) { + return counters.entries + .where((e) => e.key >= priority) + .map((e) => e.value) .sum; } + InternalSyncDownloadProgress incrementDownloaded( + List<(BucketPriority, int)> opsInPriority) { + var downloadedOps = {...downloaded}; + + for (final (priority, addedOps) in opsInPriority) { + assert(downloaded.containsKey(priority)); + assert(target.containsKey(priority)); + + downloadedOps[priority] = + max(downloadedOps[priority]! + addedOps, target[priority]!); + } + + return InternalSyncDownloadProgress(downloadedOps, target); + } + SyncDownloadProgress get asSyncDownloadProgress => SyncDownloadProgress._(this); @override int get hashCode => Object.hash( - _statusEquality.hash(downloaded), - _statusEquality.hash(target), + _mapEquality.hash(downloaded), + _mapEquality.hash(target), ); @override @@ -274,9 +281,11 @@ final class InternalSyncDownloadProgress { // them first helps find a difference faster. _totalDownloaded == other._totalDownloaded && _totalTarget == other._totalTarget && - _statusEquality.equals(downloaded, other.downloaded) && - _statusEquality.equals(target, other.target); + _mapEquality.equals(downloaded, other.downloaded) && + _mapEquality.equals(target, other.target); } + + static const _mapEquality = MapEquality(); } /// Provides realtime progress about how PowerSync is downloading rows. diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index cebdcacb..a358bb47 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -153,22 +153,8 @@ extension type SerializedOperationCounter._(JSObject _) implements JSObject { required int opCount, }); - factory SerializedOperationCounter.fromDart(OperationCounter progress) { - return SerializedOperationCounter( - priority: progress.priority.priorityNumber, - opCount: progress.opCount, - ); - } - external JSNumber get priority; external JSNumber get opCount; - - OperationCounter get toDart { - return ( - priority: BucketPriority(priority.toDartInt), - opCount: opCount.toDartInt - ); - } } @anonymous @@ -197,15 +183,22 @@ extension type SerializedDownloadProgress._(JSObject _) implements JSObject { } static JSArray _serializeCounters( - List counters) { + Map counters) { return [ - for (final entry in counters) SerializedOperationCounter.fromDart(entry) + for (final MapEntry(:key, :value) in counters.entries) + SerializedOperationCounter( + priority: key.priorityNumber, + opCount: value, + ) ].toJS; } - static List _deserializeCounters( + static Map _deserializeCounters( JSArray counters) { - return [for (final entry in counters.toDart) entry.toDart]; + return { + for (final entry in counters.toDart) + BucketPriority(entry.priority.toDartInt): entry.opCount.toDartInt, + }; } } From 720a4e9ce4d8bcd1171976f0c1fcad0ae225300a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 24 Mar 2025 17:42:06 +0200 Subject: [PATCH 05/16] Adopt progress api in demo --- .../lib/widgets/guard_by_sync.dart | 53 +++++++++++++++++++ .../lib/widgets/lists_page.dart | 42 +++++++-------- 2 files changed, 71 insertions(+), 24 deletions(-) create mode 100644 demos/supabase-todolist/lib/widgets/guard_by_sync.dart diff --git a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart new file mode 100644 index 00000000..45567681 --- /dev/null +++ b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart @@ -0,0 +1,53 @@ +import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart' hide Column; +import 'package:powersync_flutter_demo/powersync.dart'; + +/// A widget that shows [child] after a complete sync on the database has +/// completed and a progress bar before that. +class GuardBySync extends StatelessWidget { + final Widget child; + + /// When set, wait only for a complete sync within the [BucketPriority] + /// instead of a full sync. + final BucketPriority? priority; + + const GuardBySync({ + super.key, + required this.child, + this.priority, + }); + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, snapshot) { + final status = snapshot.requireData; + final (didSync, progress) = switch (priority) { + null => ( + status.hasSynced ?? false, + status.downloadProgress?.progress + ), + var priority? => ( + status.statusForPriority(priority).hasSynced ?? false, + status.downloadProgress?.progressFor(priority) + ), + }; + + if (didSync) { + return child; + } else { + return Center( + child: Column( + children: [ + const Text('Busy with sync...'), + LinearProgressIndicator(value: progress), + ], + ), + ); + } + }, + ); + } +} diff --git a/demos/supabase-todolist/lib/widgets/lists_page.dart b/demos/supabase-todolist/lib/widgets/lists_page.dart index d60cb9f5..028dbe7e 100644 --- a/demos/supabase-todolist/lib/widgets/lists_page.dart +++ b/demos/supabase-todolist/lib/widgets/lists_page.dart @@ -1,6 +1,6 @@ import 'package:flutter/material.dart'; import 'package:powersync/powersync.dart'; -import 'package:powersync_flutter_demo/powersync.dart'; +import 'package:powersync_flutter_demo/widgets/guard_by_sync.dart'; import './list_item.dart'; import './list_item_dialog.dart'; @@ -46,29 +46,23 @@ final class ListsWidget extends StatelessWidget { @override Widget build(BuildContext context) { - return FutureBuilder( - future: db.waitForFirstSync(priority: _listsPriority), - builder: (context, snapshot) { - if (snapshot.connectionState == ConnectionState.done) { - return StreamBuilder( - stream: TodoList.watchListsWithStats(), - builder: (context, snapshot) { - if (snapshot.data case final todoLists?) { - return ListView( - padding: const EdgeInsets.symmetric(vertical: 8.0), - children: todoLists.map((list) { - return ListItemWidget(list: list); - }).toList(), - ); - } else { - return const CircularProgressIndicator(); - } - }, - ); - } else { - return const Text('Busy with sync...'); - } - }, + return GuardBySync( + priority: _listsPriority, + child: StreamBuilder( + stream: TodoList.watchListsWithStats(), + builder: (context, snapshot) { + if (snapshot.data case final todoLists?) { + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: todoLists.map((list) { + return ListItemWidget(list: list); + }).toList(), + ); + } else { + return const CircularProgressIndicator(); + } + }, + ), ); } From 9d45ba37bfaff529246c7124231bcfcbab2613fb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 24 Mar 2025 17:50:12 +0200 Subject: [PATCH 06/16] Extract into record --- .../lib/widgets/guard_by_sync.dart | 8 ++++--- .../powersync_core/lib/src/sync_status.dart | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart index 45567681..b2514f86 100644 --- a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart +++ b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart @@ -27,11 +27,11 @@ class GuardBySync extends StatelessWidget { final (didSync, progress) = switch (priority) { null => ( status.hasSynced ?? false, - status.downloadProgress?.progress + status.downloadProgress?.untilCompletion ), var priority? => ( status.statusForPriority(priority).hasSynced ?? false, - status.downloadProgress?.progressFor(priority) + status.downloadProgress?.untilPriority(priority) ), }; @@ -42,7 +42,9 @@ class GuardBySync extends StatelessWidget { child: Column( children: [ const Text('Busy with sync...'), - LinearProgressIndicator(value: progress), + LinearProgressIndicator(value: progress?.fraction), + if (progress case final progress?) + Text('${progress.completed} out of ${progress.total}') ], ), ); diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index 751cfa9d..edfb220c 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -288,6 +288,12 @@ final class InternalSyncDownloadProgress { static const _mapEquality = MapEquality(); } +typedef ProgressWithOperations = ({ + int total, + int completed, + double fraction, +}); + /// Provides realtime progress about how PowerSync is downloading rows. /// /// The reported progress always reflects the status towards the end of a @@ -322,6 +328,24 @@ extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { /// a number between 0 and 1. double get progress => _internal._totalDownloaded / _internal._totalTarget; + ProgressWithOperations get untilCompletion => ( + total: total, + completed: downloaded, + fraction: progress, + ); + + ProgressWithOperations untilPriority(BucketPriority priority) { + final downloaded = downloadedFor(priority); + final total = totalFor(priority); + final progress = total == 0 ? 0.0 : downloaded / total; + + return ( + total: totalFor(priority), + completed: downloaded, + fraction: progress, + ); + } + /// Returns how many operations have been downloaded for buckets in /// [priority]. /// From 6e66936354f35e9c606419753ba567bf76a1e1b1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 25 Mar 2025 16:46:22 +0200 Subject: [PATCH 07/16] Start migrating to new model --- .../powersync_core/lib/powersync_core.dart | 2 +- .../lib/src/bucket_storage.dart | 2 - .../lib/src/sync/mutable_sync_status.dart | 1 + .../lib/src/sync/streaming_sync.dart | 2 - .../powersync_core/lib/src/sync_status.dart | 50 ++++++++++--------- 5 files changed, 28 insertions(+), 29 deletions(-) diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 3f16aef6..24318277 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,5 +10,5 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; -export 'src/sync_status.dart' hide InternalSyncDownloadProgress; +export 'src/sync_status.dart' hide BucketProgress, InternalSyncDownloadProgress; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/bucket_storage.dart b/packages/powersync_core/lib/src/bucket_storage.dart index 1f18a41e..5a23dd20 100644 --- a/packages/powersync_core/lib/src/bucket_storage.dart +++ b/packages/powersync_core/lib/src/bucket_storage.dart @@ -29,8 +29,6 @@ class BucketStorage { return await _internalDb.execute(query, parameters); } - void startSession() {} - Future> getBucketStates() async { final rows = await select( 'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\''); diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index b0088f53..ca033b18 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -40,6 +40,7 @@ final class MutableSyncStatus { void applyCheckpointReached(Checkpoint applied) { downloading = false; + downloadProgress = null; downloadError = null; final now = lastSyncedAt = DateTime.now(); priorityStatusEntries = [ diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 5c7c5d24..01f07b6c 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -285,8 +285,6 @@ class StreamingSyncImplementation implements StreamingSync { Future streamingSyncIteration( {AbortController? abortController}) async { - adapter.startSession(); - var (bucketRequests, bucketMap) = await _collectLocalBucketState(); Checkpoint? targetCheckpoint; diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync_status.dart index edfb220c..573a1154 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync_status.dart @@ -216,17 +216,25 @@ class UploadQueueStats { } } +/// Per-bucket download progress information. +@internal +typedef BucketProgress = ({ + BucketPriority priority, + int atLast, + int sinceLast, + int targetCount, +}); + @internal final class InternalSyncDownloadProgress { - final Map downloaded; - final Map target; + final Map buckets; final int _totalDownloaded; final int _totalTarget; - InternalSyncDownloadProgress(this.downloaded, this.target) - : _totalDownloaded = target.values.sum, - _totalTarget = target.values.sum; + InternalSyncDownloadProgress(this.buckets) + : _totalDownloaded = buckets.values.map((e) => e.sinceLast).sum, + _totalTarget = buckets.values.map((e) => e.targetCount - e.atLast).sum; factory InternalSyncDownloadProgress.fromZero(Checkpoint target) { final targetOps = target.checksums.groupFoldBy( @@ -234,8 +242,6 @@ final class InternalSyncDownloadProgress { (prev, cs) => (prev ?? 0) + (cs.count ?? 0), ); final downloaded = targetOps.map((k, v) => MapEntry(k, 0)); - - return InternalSyncDownloadProgress(downloaded, targetOps); } static InternalSyncDownloadProgress ofPublic(SyncDownloadProgress public) { @@ -250,29 +256,26 @@ final class InternalSyncDownloadProgress { .sum; } - InternalSyncDownloadProgress incrementDownloaded( - List<(BucketPriority, int)> opsInPriority) { - var downloadedOps = {...downloaded}; - - for (final (priority, addedOps) in opsInPriority) { - assert(downloaded.containsKey(priority)); - assert(target.containsKey(priority)); - - downloadedOps[priority] = - max(downloadedOps[priority]! + addedOps, target[priority]!); + InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) { + final newBucketStates = Map.of(buckets); + for (final dataForBucket in batch.buckets) { + final previous = newBucketStates[dataForBucket.bucket]!; + newBucketStates[dataForBucket.bucket] = ( + priority: previous.priority, + atLast: previous.atLast, + sinceLast: previous.sinceLast, + targetCount: previous.targetCount, + ); } - return InternalSyncDownloadProgress(downloadedOps, target); + return InternalSyncDownloadProgress(newBucketStates); } SyncDownloadProgress get asSyncDownloadProgress => SyncDownloadProgress._(this); @override - int get hashCode => Object.hash( - _mapEquality.hash(downloaded), - _mapEquality.hash(target), - ); + int get hashCode => _mapEquality.hash(buckets); @override bool operator ==(Object other) { @@ -281,8 +284,7 @@ final class InternalSyncDownloadProgress { // them first helps find a difference faster. _totalDownloaded == other._totalDownloaded && _totalTarget == other._totalTarget && - _mapEquality.equals(downloaded, other.downloaded) && - _mapEquality.equals(target, other.target); + _mapEquality.equals(buckets, other.buckets); } static const _mapEquality = MapEquality(); From 8a3ddedcea7c588e1e85173227efa53fcbcfb58e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 26 Mar 2025 12:11:24 +0200 Subject: [PATCH 08/16] Track operation counters from previous syncs --- .../powersync_core/lib/powersync_core.dart | 3 +- .../native/native_powersync_database.dart | 4 +- .../lib/src/database/powersync_db_mixin.dart | 2 +- .../database/web/web_powersync_database.dart | 2 +- .../lib/src/{ => sync}/bucket_storage.dart | 32 +++- .../lib/src/sync/mutable_sync_status.dart | 21 ++- .../powersync_core/lib/src/sync/protocol.dart | 4 +- .../lib/src/sync/streaming_sync.dart | 17 +- .../lib/src/{ => sync}/sync_status.dart | 154 +++++++----------- .../lib/src/web/sync_worker_protocol.dart | 93 +++++------ .../lib/src/web/web_bucket_storage.dart | 2 +- .../test/bucket_storage_test.dart | 2 +- .../powersync_core/test/sync_types_test.dart | 2 +- .../test/utils/abstract_test_utils.dart | 2 +- 14 files changed, 166 insertions(+), 174 deletions(-) rename packages/powersync_core/lib/src/{ => sync}/bucket_storage.dart (94%) rename packages/powersync_core/lib/src/{ => sync}/sync_status.dart (73%) diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index 24318277..998c0ce1 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -10,5 +10,6 @@ export 'src/exceptions.dart'; export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; -export 'src/sync_status.dart' hide BucketProgress, InternalSyncDownloadProgress; +export 'src/sync/sync_status.dart' + hide BucketProgress, InternalSyncDownloadProgress; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index cfc70cf5..f489f9bc 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -5,7 +5,7 @@ import 'package:meta/meta.dart'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:powersync_core/src/abort_controller.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/connector.dart'; import 'package:powersync_core/src/database/powersync_database.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; @@ -17,7 +17,7 @@ import 'package:powersync_core/src/open_factory/native/native_open_factory.dart' import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; -import 'package:powersync_core/src/sync_status.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 6add250b..5874ba1b 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -11,7 +11,7 @@ import 'package:powersync_core/src/database/core_version.dart'; import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; -import 'package:powersync_core/src/sync_status.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Schema used for the local database. diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 980dcae9..169e3e81 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -3,7 +3,7 @@ import 'package:meta/meta.dart'; import 'package:fetch_client/fetch_client.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/src/abort_controller.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/connector.dart'; import 'package:powersync_core/src/database/powersync_database.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; diff --git a/packages/powersync_core/lib/src/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart similarity index 94% rename from packages/powersync_core/lib/src/bucket_storage.dart rename to packages/powersync_core/lib/src/sync/bucket_storage.dart index 5a23dd20..78e4d7cc 100644 --- a/packages/powersync_core/lib/src/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -1,16 +1,22 @@ +@internal +library; + import 'dart:async'; import 'dart:convert'; import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; import 'package:powersync_core/sqlite_async.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'crud.dart'; -import 'schema_logic.dart'; -import 'sync/protocol.dart'; +import '../crud.dart'; +import '../schema_logic.dart'; +import 'protocol.dart'; const compactOperationInterval = 1000; +typedef LocalOperationCounters = ({int atLast, int sinceLast}); + class BucketStorage { final SqliteConnection _internalDb; bool _hasCompletedSync = false; @@ -31,14 +37,30 @@ class BucketStorage { Future> getBucketStates() async { final rows = await select( - 'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != \'\$local\''); + "SELECT name, cast(last_op as TEXT) FROM ps_buckets WHERE pending_delete = 0 AND name != '\$local'"); return [ for (var row in rows) BucketState( - bucket: row['bucket'] as String, opId: row['op_id'] as String) + bucket: row.columnAt(0) as String, + opId: row.columnAt(1) as String, + ) ]; } + Future> + getBucketOperationProgress() async { + final rows = await select( + "SELECT name, count_at_last, count_since_last FROM ps_buckets"); + + return { + for (final row in rows) + (row.columnAt(0) as String): ( + atLast: row.columnAt(1) as int, + sinceLast: row.columnAt(2) as int, + ) + }; + } + Future getClientId() async { final rows = await select('SELECT powersync_client_id() as client_id'); return rows.first['client_id'] as String; diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index ca033b18..3a6663b7 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -2,7 +2,8 @@ import 'dart:async'; import 'package:collection/collection.dart'; -import '../sync_status.dart'; +import 'sync_status.dart'; +import 'bucket_storage.dart'; import 'protocol.dart'; final class MutableSyncStatus { @@ -57,10 +58,13 @@ final class MutableSyncStatus { ]; } - void applyCheckpointStarted(Checkpoint target) { + void applyCheckpointStarted( + Map localProgress, + Checkpoint target, + ) { downloading = true; - // TODO: Include pending ops from interrupted download, if any... - downloadProgress = InternalSyncDownloadProgress.fromZero(target); + downloadProgress = + InternalSyncDownloadProgress.forNewCheckpoint(localProgress, target); } void applyUploadError(Object error) { @@ -68,15 +72,10 @@ final class MutableSyncStatus { uploadError = error; } - void applyBatchReceived( - Map currentBuckets, SyncDataBatch batch) { + void applyBatchReceived(SyncDataBatch batch) { downloading = true; if (downloadProgress case final previousProgress?) { - downloadProgress = previousProgress.incrementDownloaded([ - for (final bucket in batch.buckets) - if (currentBuckets[bucket.bucket] case final knownBucket?) - (BucketPriority(knownBucket.priority), bucket.data.length), - ]); + downloadProgress = previousProgress.incrementDownloaded(batch); } } diff --git a/packages/powersync_core/lib/src/sync/protocol.dart b/packages/powersync_core/lib/src/sync/protocol.dart index 5e9c6362..f2bfc403 100644 --- a/packages/powersync_core/lib/src/sync/protocol.dart +++ b/packages/powersync_core/lib/src/sync/protocol.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'dart:convert'; -import '../bucket_storage.dart'; +import 'bucket_storage.dart'; /// Messages sent from the sync service. sealed class StreamingSyncLine { @@ -146,8 +146,6 @@ final class Checkpoint extends StreamingSyncLine { } } -typedef BucketDescription = ({String name, int priority}); - class BucketChecksum { final String bucket; final int priority; diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 01f07b6c..cb479c38 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -9,12 +9,12 @@ import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/user_agent/user_agent.dart'; import 'package:sqlite_async/mutex.dart'; -import '../bucket_storage.dart'; +import 'bucket_storage.dart'; import '../connector.dart'; import '../crud.dart'; import 'mutable_sync_status.dart'; import 'stream_utils.dart'; -import '../sync_status.dart'; +import 'sync_status.dart'; import 'protocol.dart'; abstract interface class StreamingSync { @@ -28,7 +28,7 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { - BucketStorage adapter; + final BucketStorage adapter; final Future Function() credentialsCallback; final Future Function()? invalidCredentialsCallback; @@ -322,7 +322,9 @@ class StreamingSyncImplementation implements StreamingSync { } bucketMap = newBuckets; await adapter.removeBuckets([...bucketsToDelete]); - _state.updateStatus((s) => s.downloading = true); + final initialProgress = await adapter.getBucketOperationProgress(); + _state.updateStatus( + (s) => s.applyCheckpointStarted(initialProgress, line)); case StreamingSyncCheckpointComplete(): final result = await adapter.syncLocalDatabase(targetCheckpoint!); if (!result.checkpointValid) { @@ -391,7 +393,7 @@ class StreamingSyncImplementation implements StreamingSync { case SyncDataBatch(): // TODO: This increments the counters before actually saving sync // data. Might be fine though? - _state.updateStatus((s) => s.applyBatchReceived(bucketMap, line)); + _state.updateStatus((s) => s.applyBatchReceived(line)); _state.updateStatus((s) => s.downloading = true); await adapter.saveSyncData(line); case StreamingSyncKeepalive(:final tokenExpiresIn): @@ -520,3 +522,8 @@ String _syncErrorMessage(Object? error) { return '${error.runtimeType}'; } } + +typedef BucketDescription = ({ + String name, + int priority, +}); diff --git a/packages/powersync_core/lib/src/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart similarity index 73% rename from packages/powersync_core/lib/src/sync_status.dart rename to packages/powersync_core/lib/src/sync/sync_status.dart index 573a1154..6a6be165 100644 --- a/packages/powersync_core/lib/src/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -1,9 +1,8 @@ -import 'dart:math'; - import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; -import 'sync/protocol.dart'; +import 'bucket_storage.dart'; +import 'protocol.dart'; final class SyncStatus { /// true if currently connected. @@ -173,6 +172,13 @@ final class SyncStatus { extension type const BucketPriority._(int priorityNumber) { static const _highest = 0; + /// The a bucket priority lower than the lowest priority that will ever be + /// allowed by the sync service. + /// + /// This can be used as a priority that tracks complete syncs instead of + /// partial completions. + static const _sentinel = BucketPriority._(2147483647); + factory BucketPriority(int i) { assert(i >= _highest); return BucketPriority._(i); @@ -236,24 +242,35 @@ final class InternalSyncDownloadProgress { : _totalDownloaded = buckets.values.map((e) => e.sinceLast).sum, _totalTarget = buckets.values.map((e) => e.targetCount - e.atLast).sum; - factory InternalSyncDownloadProgress.fromZero(Checkpoint target) { - final targetOps = target.checksums.groupFoldBy( - (cs) => BucketPriority(cs.priority), - (prev, cs) => (prev ?? 0) + (cs.count ?? 0), - ); - final downloaded = targetOps.map((k, v) => MapEntry(k, 0)); + factory InternalSyncDownloadProgress.forNewCheckpoint( + Map localProgress, Checkpoint target) { + final buckets = {}; + for (final bucket in target.checksums) { + final savedProgress = localProgress[bucket.bucket]; + + buckets[bucket.bucket] = ( + priority: BucketPriority._(bucket.priority), + atLast: savedProgress?.atLast ?? 0, + sinceLast: savedProgress?.sinceLast ?? 0, + targetCount: bucket.count ?? 0, + ); + } + + return InternalSyncDownloadProgress(buckets); } static InternalSyncDownloadProgress ofPublic(SyncDownloadProgress public) { return public._internal; } - static int sumInPriority( - Map counters, BucketPriority priority) { - return counters.entries - .where((e) => e.key >= priority) - .map((e) => e.value) - .sum; + /// Sums the total target and completed operations for all buckets up until + /// the given [priority] (inclusive). + (int, int) targetAndCompletedCounts(BucketPriority priority) { + return buckets.values.fold((0, 0), (prev, entry) { + final downloaded = entry.sinceLast; + final total = entry.targetCount - entry.atLast; + return (prev.$1 + total, prev.$2 + downloaded); + }); } InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) { @@ -290,6 +307,11 @@ final class InternalSyncDownloadProgress { static const _mapEquality = MapEquality(); } +/// Information about a progressing download. +/// +/// This reports the `total` amount of operations to download, how many of them +/// have alreaady been `completed` and finally a `fraction` indicating relative +/// progress (as a number between `0.0` and `1.0`) typedef ProgressWithOperations = ({ int total, int completed, @@ -300,87 +322,37 @@ typedef ProgressWithOperations = ({ /// /// The reported progress always reflects the status towards the end of a /// sync iteration (after which a consistent snapshot of all buckets is -/// available locally). Note that [downloaded] starts at `0` every time an -/// iteration begins. -/// This has an effect when iterations are interrupted. Consider this flow -/// as an example: +/// available locally). /// -/// 1. The client comes online for the first time and has to synchronize a -/// large amount of rows (say 100k). Here, [downloaded] starts at `0` and -/// [total] would be the `100,000` rows. -/// 2. The client makes some progress, so that [downloaded] is perhaps -/// `60,000`. -/// 3. The client briefly looses connectivity. -/// 4. Back online, a new sync iteration starts. This means that [downloaded] -/// is reset to `0`. However, since half of the target has already been -/// downloaded in the earlier iteration, [total] is now set to `40,000` to -/// reflect the remaining rows to download in the new iteration. +/// In rare cases (in particular, when a [compacting] operation takes place +/// between syncs), it's possible for the returned numbers to be slightly +/// inaccurate. For this reason, [SyncDownloadProgress] should be seen as an +/// approximation of progress. The information returned is good enough to build +/// progress bars, but not exact enough to track individual download counts. +/// +/// Also note that data is downloaded in bulk, which means that individual +/// counters are unlikely to be updated one-by-one. +/// +/// [compacting]: https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { - /// The amount of operations that have been downloaded in the current sync - /// iteration. + /// Returns download progress towards a complete checkpoint being received. /// - /// This number always starts at zero as [SyncStatus.downloading] changes - /// from `false` to `true`. - int get downloaded => _internal._totalDownloaded; - - /// The total amount of operations expected for this sync operation. - int get total => _internal._totalTarget; - - /// The fraction of [total] operations that have already been [downloaded], as - /// a number between 0 and 1. - double get progress => _internal._totalDownloaded / _internal._totalTarget; - - ProgressWithOperations get untilCompletion => ( - total: total, - completed: downloaded, - fraction: progress, - ); - + /// The returned [ProgressWithOperations] tracks the target amount of + /// operations that need to be downloaded in total and how many of them have + /// already been received. + ProgressWithOperations get untilCompletion => + untilPriority(BucketPriority._sentinel); + + /// Returns download progress towards all data up until the specified + /// [priority] being received. + /// + /// The returned [ProgressWithOperations] tracks the target amount of + /// operations that need to be downloaded in total and how many of them have + /// already been received. ProgressWithOperations untilPriority(BucketPriority priority) { - final downloaded = downloadedFor(priority); - final total = totalFor(priority); + final (total, downloaded) = _internal.targetAndCompletedCounts(priority); final progress = total == 0 ? 0.0 : downloaded / total; - return ( - total: totalFor(priority), - completed: downloaded, - fraction: progress, - ); - } - - /// Returns how many operations have been downloaded for buckets in - /// [priority]. - /// - /// Under the consistency guarantees offered by PowerSync, this will also - /// include operations from higher-priority buckets. - int downloadedFor(BucketPriority priority) { - return InternalSyncDownloadProgress.sumInPriority( - _internal.downloaded, priority); - } - - /// Returns how many operations in total need to be downloaded before the - /// client has reached a consistent states for buckets with the given - /// [priority]. - /// - /// Under the consistency guarantees offered by PowerSync, this will also - /// include operations from higher-priority buckets. - int totalFor(BucketPriority priority) { - return InternalSyncDownloadProgress.sumInPriority( - _internal.target, priority); - } - - /// The progress towards syncing the given [priority]. - /// - /// Returns the fraction of [downloadedFor] to [totalFor], as a number between - /// 0 and 1. - double progressFor(BucketPriority priority) { - final downloaded = downloadedFor(priority); - final total = totalFor(priority); - - if (total == 0) { - return 0; - } - - return downloaded / total; + return (total: total, completed: downloaded, fraction: progress); } } diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index a358bb47..cd6b241a 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -7,7 +7,7 @@ import 'package:web/web.dart'; import '../connector.dart'; import '../log.dart'; -import '../sync_status.dart'; +import '../sync/sync_status.dart'; /// Names used in [SyncWorkerMessage] enum SyncWorkerMessageType { @@ -147,57 +147,45 @@ extension type SerializedCredentials._(JSObject _) implements JSObject { } @anonymous -extension type SerializedOperationCounter._(JSObject _) implements JSObject { - external factory SerializedOperationCounter({ +extension type SerializedBucketProgress._(JSObject _) implements JSObject { + external factory SerializedBucketProgress({ + required String name, required int priority, - required int opCount, + required int atLast, + required int sinceLast, + required int targetCount, }); - external JSNumber get priority; - external JSNumber get opCount; -} - -@anonymous -extension type SerializedDownloadProgress._(JSObject _) implements JSObject { - external factory SerializedDownloadProgress({ - required JSArray downloaded, - required JSArray target, - }); - - external JSArray get downloaded; - external JSArray get target; - - factory SerializedDownloadProgress.fromDart( - InternalSyncDownloadProgress progress) { - return SerializedDownloadProgress( - downloaded: _serializeCounters(progress.downloaded), - target: _serializeCounters(progress.target), - ); - } - - InternalSyncDownloadProgress get toDart { - return InternalSyncDownloadProgress( - _deserializeCounters(downloaded), - _deserializeCounters(target), - ); - } - - static JSArray _serializeCounters( - Map counters) { - return [ - for (final MapEntry(:key, :value) in counters.entries) - SerializedOperationCounter( - priority: key.priorityNumber, - opCount: value, - ) + external String name; + external int priority; + external int atLast; + external int sinceLast; + external int targetCount; + + static JSArray serialize( + Map buckets) { + return [ + for (final MapEntry(:key, :value) in buckets.entries) + SerializedBucketProgress( + name: key, + priority: value.priority.priorityNumber, + atLast: value.atLast, + sinceLast: value.sinceLast, + targetCount: value.targetCount, + ), ].toJS; } - static Map _deserializeCounters( - JSArray counters) { + static Map deserialize( + JSArray array) { return { - for (final entry in counters.toDart) - BucketPriority(entry.priority.toDartInt): entry.opCount.toDartInt, + for (final entry in array.toDart) + entry.name: ( + priority: BucketPriority(entry.priority), + atLast: entry.atLast, + sinceLast: entry.sinceLast, + targetCount: entry.targetCount, + ), }; } } @@ -214,7 +202,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required String? uploadError, required String? downloadError, required JSArray? priorityStatusEntries, - required SerializedDownloadProgress? syncProgress, + required JSArray? syncProgress, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -237,8 +225,8 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { ].toJS, syncProgress: switch (status.downloadProgress) { null => null, - var other => SerializedDownloadProgress.fromDart( - InternalSyncDownloadProgress.ofPublic(other)), + var other => SerializedBucketProgress.serialize( + InternalSyncDownloadProgress.ofPublic(other).buckets), }, ); } @@ -252,7 +240,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external String? uploadError; external String? downloadError; external JSArray? priorityStatusEntries; - external SerializedDownloadProgress? syncProgress; + external JSArray? syncProgress; SyncStatus asSyncStatus() { return SyncStatus( @@ -282,7 +270,12 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { ); }) ], - downloadProgress: syncProgress?.toDart.asSyncDownloadProgress, + downloadProgress: switch (syncProgress) { + null => null, + final serializedProgress => InternalSyncDownloadProgress( + SerializedBucketProgress.deserialize(serializedProgress)) + .asSyncDownloadProgress, + }, ); } } diff --git a/packages/powersync_core/lib/src/web/web_bucket_storage.dart b/packages/powersync_core/lib/src/web/web_bucket_storage.dart index c2f276e8..4ba46b07 100644 --- a/packages/powersync_core/lib/src/web/web_bucket_storage.dart +++ b/packages/powersync_core/lib/src/web/web_bucket_storage.dart @@ -1,5 +1,5 @@ import 'package:powersync_core/sqlite_async.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:sqlite_async/web.dart'; class WebBucketStorage extends BucketStorage { diff --git a/packages/powersync_core/test/bucket_storage_test.dart b/packages/powersync_core/test/bucket_storage_test.dart index 541fd9a0..4676efd4 100644 --- a/packages/powersync_core/test/bucket_storage_test.dart +++ b/packages/powersync_core/test/bucket_storage_test.dart @@ -1,5 +1,5 @@ import 'package:powersync_core/powersync_core.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync_types_test.dart index c3e0ed76..261152b2 100644 --- a/packages/powersync_core/test/sync_types_test.dart +++ b/packages/powersync_core/test/sync_types_test.dart @@ -1,6 +1,6 @@ import 'dart:async'; -import 'package:powersync_core/src/sync_status.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:test/test.dart'; diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index fcebed22..8f7c3d94 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,7 +1,7 @@ import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; -import 'package:powersync_core/src/bucket_storage.dart'; +import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:sqlite_async/sqlite_async.dart'; From a2322df5216eb4c7ff6980baed9fe37b237a549b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 26 Mar 2025 15:08:12 +0200 Subject: [PATCH 09/16] Add tests --- .../lib/src/database/powersync_db_mixin.dart | 31 +- .../powersync_core/lib/src/sync/protocol.dart | 1 + .../lib/src/sync/streaming_sync.dart | 1 - .../lib/src/sync/sync_status.dart | 46 ++- .../test/in_memory_sync_test.dart | 284 +++++++++++++++++- .../test/utils/native_test_utils.dart | 6 + 6 files changed, 327 insertions(+), 42 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 5874ba1b..81adb4fc 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -175,16 +175,27 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @visibleForTesting void setStatus(SyncStatus status) { if (status != currentStatus) { - // Note that currently the streaming sync implementation will never set hasSynced. - // lastSyncedAt implies that syncing has completed at some point (hasSynced = true). - // The previous values of hasSynced should be preserved here. - final newStatus = status.copyWith( - hasSynced: status.lastSyncedAt != null - ? true - : status.hasSynced ?? currentStatus.hasSynced, - lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt); - // If the absence of hasSync was the only difference, the new states would be equal - // and don't require an event. So, check again. + final newStatus = SyncStatus( + connected: status.connected, + downloading: status.downloading, + uploading: status.uploading, + connecting: status.connecting, + uploadError: status.uploadError, + downloadError: status.downloadError, + priorityStatusEntries: status.priorityStatusEntries, + downloadProgress: status.downloadProgress, + // Note that currently the streaming sync implementation will never set + // hasSynced. lastSyncedAt implies that syncing has completed at some + // point (hasSynced = true). + // The previous values of hasSynced should be preserved here. + lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, + hasSynced: status.lastSyncedAt != null + ? true + : status.hasSynced ?? currentStatus.hasSynced, + ); + + // If the absence of hasSynced was the only difference, the new states + // would be equal and don't require an event. So, check again. if (newStatus != currentStatus) { currentStatus = newStatus; statusStreamController.add(currentStatus); diff --git a/packages/powersync_core/lib/src/sync/protocol.dart b/packages/powersync_core/lib/src/sync/protocol.dart index f2bfc403..983baa86 100644 --- a/packages/powersync_core/lib/src/sync/protocol.dart +++ b/packages/powersync_core/lib/src/sync/protocol.dart @@ -140,6 +140,7 @@ final class Checkpoint extends StreamingSyncLine { 'bucket': c.bucket, 'checksum': c.checksum, 'priority': c.priority, + 'count': c.count, }) .toList(growable: false) }; diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index cb479c38..0e4d3397 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -394,7 +394,6 @@ class StreamingSyncImplementation implements StreamingSync { // TODO: This increments the counters before actually saving sync // data. Might be fine though? _state.updateStatus((s) => s.applyBatchReceived(line)); - _state.updateStatus((s) => s.downloading = true); await adapter.saveSyncData(line); case StreamingSyncKeepalive(:final tokenExpiresIn): if (tokenExpiresIn == 0) { diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index 6a6be165..d90d05eb 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -77,9 +77,11 @@ final class SyncStatus { other.lastSyncedAt == lastSyncedAt && other.hasSynced == hasSynced && _statusEquality.equals( - other.priorityStatusEntries, priorityStatusEntries)); + other.priorityStatusEntries, priorityStatusEntries) && + other.downloadProgress == downloadProgress); } + // Deprecated because it can't set fields back to null @Deprecated('Should not be used in user code') SyncStatus copyWith({ bool? connected, @@ -148,19 +150,21 @@ final class SyncStatus { @override int get hashCode { return Object.hash( - connected, - downloading, - uploading, - connecting, - uploadError, - downloadError, - lastSyncedAt, - _statusEquality.hash(priorityStatusEntries)); + connected, + downloading, + uploading, + connecting, + uploadError, + downloadError, + lastSyncedAt, + _statusEquality.hash(priorityStatusEntries), + downloadProgress, + ); } @override String toString() { - return "SyncStatus"; + return "SyncStatus"; } // This should be a ListEquality, but that appears to @@ -266,11 +270,14 @@ final class InternalSyncDownloadProgress { /// Sums the total target and completed operations for all buckets up until /// the given [priority] (inclusive). (int, int) targetAndCompletedCounts(BucketPriority priority) { - return buckets.values.fold((0, 0), (prev, entry) { - final downloaded = entry.sinceLast; - final total = entry.targetCount - entry.atLast; - return (prev.$1 + total, prev.$2 + downloaded); - }); + return buckets.values.where((e) => e.priority >= priority).fold( + (0, 0), + (prev, entry) { + final downloaded = entry.sinceLast; + final total = entry.targetCount - entry.atLast; + return (prev.$1 + total, prev.$2 + downloaded); + }, + ); } InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) { @@ -280,7 +287,7 @@ final class InternalSyncDownloadProgress { newBucketStates[dataForBucket.bucket] = ( priority: previous.priority, atLast: previous.atLast, - sinceLast: previous.sinceLast, + sinceLast: previous.sinceLast + dataForBucket.data.length, targetCount: previous.targetCount, ); } @@ -304,6 +311,13 @@ final class InternalSyncDownloadProgress { _mapEquality.equals(buckets, other.buckets); } + @override + String toString() { + final asView = asSyncDownloadProgress; + final all = asView.untilCompletion; + return 'for total: ${all.completed} / ${all.total}'; + } + static const _mapEquality = MapEquality(); } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index b587715a..19e97dab 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -20,19 +20,14 @@ void main() { late CommonDatabase raw; late PowerSyncDatabase database; late MockSyncService syncService; + late StreamingSync syncClient; var credentialsCallbackCount = 0; - setUp(() async { - credentialsCallbackCount = 0; + void createSyncClient() { final (client, server) = inMemoryServer(); - syncService = MockSyncService(); server.mount(syncService.router.call); - factory = await testUtils.testFactory(); - (raw, database) = await factory.openInMemoryDatabase(); - await database.initialize(); - syncClient = database.connectWithMockService( client, TestConnector(() async { @@ -44,6 +39,16 @@ void main() { ); }), ); + } + + setUp(() async { + credentialsCallbackCount = 0; + syncService = MockSyncService(); + + factory = await testUtils.testFactory(); + (raw, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + createSyncClient(); }); tearDown(() async { @@ -222,7 +227,8 @@ void main() { await expectLater( status, - emits(isSyncStatus(downloading: true, hasSynced: false).having( + emitsThrough( + isSyncStatus(downloading: true, hasSynced: false).having( (e) => e.statusForPriority(BucketPriority(0)).hasSynced, 'status for $prio', isTrue, @@ -240,8 +246,8 @@ void main() { 'checkpoint_complete': {'last_op_id': operationId.toString()} }); - await expectLater( - status, emits(isSyncStatus(downloading: false, hasSynced: true))); + await expectLater(status, + emitsThrough(isSyncStatus(downloading: false, hasSynced: true))); await database.waitForFirstSync(); expect(await database.getAll('SELECT * FROM customers'), hasLength(4)); }); @@ -329,14 +335,238 @@ void main() { expect(nextRequest.headers['Authorization'], 'Token token2'); expect(credentialsCallbackCount, 2); }); + + group('reports progress', () { + var lastOpId = 0; + + setUp(() => lastOpId = 0); + + BucketChecksum bucket(String name, int count, {int priority = 3}) { + return BucketChecksum( + bucket: name, priority: priority, checksum: 0, count: count); + } + + void addDataLine(String bucket, int amount) { + syncService.addLine({ + 'data': { + 'bucket': bucket, + 'data': >[ + for (var i = 0; i < amount; i++) + { + 'op_id': '${++lastOpId}', + 'op': 'PUT', + 'object_type': bucket, + 'object_id': '$lastOpId', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + } + + void addCheckpointComplete([int? priority]) { + if (priority case final partial?) { + syncService.addLine({ + 'partial_checkpoint_complete': { + 'last_op_id': '$lastOpId', + 'priority': partial, + } + }); + } else { + syncService.addLine({ + 'checkpoint_complete': { + 'last_op_id': '$lastOpId', + } + }); + } + } + + Future expectProgress( + StreamQueue status, { + required Object total, + Map priorities = const {}, + }) async { + await expectLater( + status, + emits(isSyncStatus( + downloading: true, + downloadProgress: isSyncDownloadProgress( + progress: total, + priorities: priorities, + ), + )), + ); + } + + test('without priorities', () async { + final status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + + addDataLine('a', 10); + await expectProgress(status, total: progress(10, 10)); + + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + + // Emit new data, progress should be 0/2 instead of 10/12 + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '12', + checksums: [bucket('a', 12)], + ) + }); + await expectProgress(status, total: progress(0, 2)); + addDataLine('a', 2); + await expectProgress(status, total: progress(2, 2)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('interrupted sync', () async { + var status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + addDataLine('a', 5); + await expectProgress(status, total: progress(5, 10)); + + // Emulate the app closing - create a new independent sync client. + await syncClient.abort(); + syncService.endCurrentListener(); + + createSyncClient(); + status = await waitForConnection(); + + // Send same checkpoint again + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + + // Progress should be restored instead of saying e.g 0/5 now. + await expectProgress(status, total: progress(5, 10)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('interrupted sync with new checkpoint', () async { + var status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [bucket('a', 10)], + ) + }); + await expectProgress(status, total: progress(0, 10)); + addDataLine('a', 5); + await expectProgress(status, total: progress(5, 10)); + + // Emulate the app closing - create a new independent sync client. + await syncClient.abort(); + syncService.endCurrentListener(); + + createSyncClient(); + status = await waitForConnection(); + + // Send checkpoint with additional data + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '12', + checksums: [bucket('a', 12)], + ) + }); + + await expectProgress(status, total: progress(5, 12)); + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + + test('different priorities', () async { + var status = await waitForConnection(); + Future checkProgress(Object prio0, Object prio2) async { + await expectProgress( + status, + priorities: { + BucketPriority(0): prio0, + BucketPriority(2): prio2, + }, + total: prio2, + ); + } + + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '10', + checksums: [ + bucket('a', 5, priority: 0), + bucket('b', 5, priority: 2) + ], + ), + }); + await checkProgress(progress(0, 5), progress(0, 10)); + + addDataLine('a', 5); + await checkProgress(progress(5, 5), progress(5, 10)); + + addCheckpointComplete(0); + await checkProgress(progress(5, 5), progress(5, 10)); + + addDataLine('b', 2); + await checkProgress(progress(5, 5), progress(7, 10)); + + // Before syncing b fully, send a new checkpoint + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '14', + checksums: [ + bucket('a', 8, priority: 0), + bucket('b', 6, priority: 2) + ], + ), + }); + await checkProgress(progress(5, 8), progress(7, 14)); + + addDataLine('a', 3); + await checkProgress(progress(8, 8), progress(10, 14)); + + addCheckpointComplete(0); + await checkProgress(progress(8, 8), progress(10, 14)); + + addDataLine('b', 4); + await checkProgress(progress(8, 8), progress(14, 14)); + + addCheckpointComplete(); + await expectLater(status, + emits(isSyncStatus(downloading: false, downloadProgress: isNull))); + }); + }); }); } -TypeMatcher isSyncStatus( - {Object? downloading, - Object? connected, - Object? connecting, - Object? hasSynced}) { +TypeMatcher isSyncStatus({ + Object? downloading, + Object? connected, + Object? connecting, + Object? hasSynced, + Object? downloadProgress, +}) { var matcher = isA(); if (downloading != null) { matcher = matcher.having((e) => e.downloading, 'downloading', downloading); @@ -350,6 +580,30 @@ TypeMatcher isSyncStatus( if (hasSynced != null) { matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); } + if (downloadProgress != null) { + matcher = matcher.having( + (e) => e.downloadProgress, 'downloadProgress', downloadProgress); + } return matcher; } + +TypeMatcher isSyncDownloadProgress({ + required Object progress, + Map priorities = const {}, +}) { + var matcher = isA() + .having((e) => e.untilCompletion, 'untilCompletion', progress); + priorities.forEach((priority, expected) { + matcher = matcher.having( + (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); + }); + + return matcher; +} + +TypeMatcher progress(int completed, int total) { + return isA() + .having((e) => e.completed, 'completed', completed) + .having((e) => e.total, 'total', total); +} diff --git a/packages/powersync_core/test/utils/native_test_utils.dart b/packages/powersync_core/test/utils/native_test_utils.dart index 65916a5f..04bc0e73 100644 --- a/packages/powersync_core/test/utils/native_test_utils.dart +++ b/packages/powersync_core/test/utils/native_test_utils.dart @@ -19,6 +19,12 @@ class TestOpenFactory extends PowerSyncOpenFactory with TestPowerSyncFactory { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { + // Prefer using Homebrew's SQLite which allows loading extensions. + const fromHomebrew = '/opt/homebrew/opt/sqlite/lib/libsqlite3.dylib'; + if (File(fromHomebrew).existsSync()) { + return DynamicLibrary.open(fromHomebrew); + } + return DynamicLibrary.open('libsqlite3.dylib'); }); } From 0ecd9064717a7ab2fb81045d08b1eb884594a69e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 26 Mar 2025 15:33:15 +0200 Subject: [PATCH 10/16] Set count_at_last in SDK --- .../lib/src/sync/bucket_storage.dart | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/powersync_core/lib/src/sync/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart index 78e4d7cc..fe0d1168 100644 --- a/packages/powersync_core/lib/src/sync/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -181,6 +181,21 @@ class BucketStorage { final rs = await tx.execute('SELECT last_insert_rowid() as result'); final result = rs[0]['result']; if (result == 1) { + if (forPartialPriority == null) { + // Reset progress counters. We only do this for a complete sync, as we + // want a download progress to always cover a complete checkpoint + // instead of resetting for partial completions. + await tx.execute(''' +UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE ?1->name IS NOT NULL +''', [ + json.encode({ + for (final bucket in checkpoint.checksums) + if (bucket.count case final count?) bucket.bucket: count, + }), + ]); + } + return true; } else { // can_update_local(db) == false From 86623f4159186fb49df6b2f12f41bc3f5e0ef56e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 09:01:05 +0200 Subject: [PATCH 11/16] Improve docs --- packages/powersync_core/lib/src/sync/sync_status.dart | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index d90d05eb..90b35d39 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -326,6 +326,9 @@ final class InternalSyncDownloadProgress { /// This reports the `total` amount of operations to download, how many of them /// have alreaady been `completed` and finally a `fraction` indicating relative /// progress (as a number between `0.0` and `1.0`) +/// +/// To obtain these values, use [SyncDownloadProgress] available through +/// [SyncStatus.downloadProgress]. typedef ProgressWithOperations = ({ int total, int completed, From 2be5919b55dc99828613d1a05d3029fe0e3f878d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 12:43:04 +0200 Subject: [PATCH 12/16] Send partial checkpoint in sync test --- .../powersync_core/lib/src/sync/protocol.dart | 16 ++++++++++------ .../lib/src/sync/streaming_sync.dart | 4 +++- .../powersync_core/lib/src/sync/sync_status.dart | 2 +- .../powersync_core/test/in_memory_sync_test.dart | 9 +++++---- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/protocol.dart b/packages/powersync_core/lib/src/sync/protocol.dart index 983baa86..4e07334b 100644 --- a/packages/powersync_core/lib/src/sync/protocol.dart +++ b/packages/powersync_core/lib/src/sync/protocol.dart @@ -136,12 +136,7 @@ final class Checkpoint extends StreamingSyncLine { 'write_checkpoint': writeCheckpoint, 'buckets': checksums .where((c) => priority == null || c.priority <= priority) - .map((c) => { - 'bucket': c.bucket, - 'checksum': c.checksum, - 'priority': c.priority, - 'count': c.count, - }) + .map((c) => c.toJson()) .toList(growable: false) }; } @@ -172,6 +167,15 @@ class BucketChecksum { checksum = json['checksum'] as int, count = json['count'] as int?, lastOpId = json['last_op_id'] as String?; + + Map toJson() { + return { + 'bucket': bucket, + 'checksum': checksum, + 'priority': priority, + 'count': count, + }; + } } /// A variant of [Checkpoint] that may be sent when the server has already sent diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 0e4d3397..e30fdca9 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -367,7 +367,6 @@ class StreamingSyncImplementation implements StreamingSync { throw PowerSyncProtocolException( 'Checkpoint diff without previous checkpoint'); } - _state.updateStatus((s) => s.downloading = true); final diff = line; final Map newBuckets = {}; for (var checksum in targetCheckpoint.checksums) { @@ -385,6 +384,9 @@ class StreamingSyncImplementation implements StreamingSync { checksums: [...newBuckets.values], writeCheckpoint: diff.writeCheckpoint); targetCheckpoint = newCheckpoint; + final initialProgress = await adapter.getBucketOperationProgress(); + _state.updateStatus( + (s) => s.applyCheckpointStarted(initialProgress, newCheckpoint)); bucketMap = newBuckets.map((name, checksum) => MapEntry(name, (name: name, priority: checksum.priority))); diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index 90b35d39..509fc235 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -335,7 +335,7 @@ typedef ProgressWithOperations = ({ double fraction, }); -/// Provides realtime progress about how PowerSync is downloading rows. +/// Provides realtime progress on how PowerSync is downloading rows. /// /// The reported progress always reflects the status towards the end of a /// sync iteration (after which a consistent snapshot of all buckets is diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 19e97dab..d18aca90 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -418,10 +418,11 @@ void main() { // Emit new data, progress should be 0/2 instead of 10/12 syncService.addLine({ - 'checkpoint': Checkpoint( - lastOpId: '12', - checksums: [bucket('a', 12)], - ) + 'checkpoint_diff': { + 'last_op_id': '12', + 'updated_buckets': [bucket('a', 12)], + 'removed_buckets': const [], + }, }); await expectProgress(status, total: progress(0, 2)); addDataLine('a', 2); From c9d77556487944d480ad767764d4dacdae6e239a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 11:30:04 +0200 Subject: [PATCH 13/16] Fix hasSynced briefly becoming false --- packages/powersync_core/lib/src/database/core_version.dart | 2 +- packages/powersync_core/lib/src/sync/mutable_sync_status.dart | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/powersync_core/lib/src/database/core_version.dart b/packages/powersync_core/lib/src/database/core_version.dart index b0d5ff1c..dc14f169 100644 --- a/packages/powersync_core/lib/src/database/core_version.dart +++ b/packages/powersync_core/lib/src/database/core_version.dart @@ -60,7 +60,7 @@ extension type const PowerSyncCoreVersion((int, int, int) _tuple) { // Note: When updating this, also update the download URL in // scripts/init_powersync_core_binary.dart and the version ref in // packages/sqlite3_wasm_build/build.sh - static const minimum = PowerSyncCoreVersion((0, 3, 11)); + static const minimum = PowerSyncCoreVersion((0, 3, 12)); /// The first version of the core extensions that this version of the Dart /// SDK doesn't support. diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 3a6663b7..551a8d03 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -88,7 +88,7 @@ final class MutableSyncStatus { downloadProgress: downloadProgress?.asSyncDownloadProgress, priorityStatusEntries: UnmodifiableListView(priorityStatusEntries), lastSyncedAt: lastSyncedAt, - hasSynced: lastSyncedAt != null, + hasSynced: null, // Stream client is not supposed to set this value. uploadError: uploadError, downloadError: downloadError, ); From ccf67ef9d6f0549b3aebf54abdc41482a62c1fe5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 11:31:38 +0200 Subject: [PATCH 14/16] fix typo --- packages/powersync_core/lib/src/sync/mutable_sync_status.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 551a8d03..6da63654 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -79,7 +79,7 @@ final class MutableSyncStatus { } } - SyncStatus immutableSnapsot() { + SyncStatus immutableSnapshot() { return SyncStatus( connected: connected, connecting: connecting, @@ -111,7 +111,7 @@ final class SyncStatusStateStream { return; } - final current = status.immutableSnapsot(); + final current = status.immutableSnapshot(); if (current != _lastPublishedStatus) { _statusStreamController.add(current); _lastPublishedStatus = current; From 6c7bae4d76bb6fc2a3f462fc19fd584cb726c2da Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 13:33:26 +0200 Subject: [PATCH 15/16] Adopt in django todolist demo too --- .../lib/widgets/guard_by_sync.dart | 55 ++++++++++++++++++ .../lib/widgets/lists_page.dart | 58 ++++++------------- demos/django-todolist/macos/Podfile.lock | 4 +- .../xcshareddata/xcschemes/Runner.xcscheme | 1 + 4 files changed, 76 insertions(+), 42 deletions(-) create mode 100644 demos/django-todolist/lib/widgets/guard_by_sync.dart diff --git a/demos/django-todolist/lib/widgets/guard_by_sync.dart b/demos/django-todolist/lib/widgets/guard_by_sync.dart new file mode 100644 index 00000000..a6d6bb48 --- /dev/null +++ b/demos/django-todolist/lib/widgets/guard_by_sync.dart @@ -0,0 +1,55 @@ +import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart' hide Column; +import 'package:powersync_django_todolist_demo/powersync.dart'; + +/// A widget that shows [child] after a complete sync on the database has +/// completed and a progress bar before that. +class GuardBySync extends StatelessWidget { + final Widget child; + + /// When set, wait only for a complete sync within the [BucketPriority] + /// instead of a full sync. + final BucketPriority? priority; + + const GuardBySync({ + super.key, + required this.child, + this.priority, + }); + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, snapshot) { + final status = snapshot.requireData; + final (didSync, progress) = switch (priority) { + null => ( + status.hasSynced ?? false, + status.downloadProgress?.untilCompletion + ), + var priority? => ( + status.statusForPriority(priority).hasSynced ?? false, + status.downloadProgress?.untilPriority(priority) + ), + }; + + if (didSync) { + return child; + } else { + return Center( + child: Column( + children: [ + const Text('Busy with sync...'), + LinearProgressIndicator(value: progress?.fraction), + if (progress case final progress?) + Text('${progress.completed} out of ${progress.total}') + ], + ), + ); + } + }, + ); + } +} diff --git a/demos/django-todolist/lib/widgets/lists_page.dart b/demos/django-todolist/lib/widgets/lists_page.dart index e31c2fc8..17e7f512 100644 --- a/demos/django-todolist/lib/widgets/lists_page.dart +++ b/demos/django-todolist/lib/widgets/lists_page.dart @@ -1,6 +1,5 @@ -import 'dart:async'; - import 'package:flutter/material.dart'; +import 'package:powersync_django_todolist_demo/widgets/guard_by_sync.dart'; import './list_item.dart'; import './list_item_dialog.dart'; @@ -41,48 +40,27 @@ class ListsPage extends StatelessWidget { } } -class ListsWidget extends StatefulWidget { +final class ListsWidget extends StatelessWidget { const ListsWidget({super.key}); - @override - State createState() { - return _ListsWidgetState(); - } -} - -class _ListsWidgetState extends State { - List _data = []; - StreamSubscription? _subscription; - - _ListsWidgetState(); - - @override - void initState() { - super.initState(); - final stream = TodoList.watchListsWithStats(); - _subscription = stream.listen((data) { - if (!context.mounted) { - return; - } - setState(() { - _data = data; - }); - }); - } - - @override - void dispose() { - super.dispose(); - _subscription?.cancel(); - } - @override Widget build(BuildContext context) { - return ListView( - padding: const EdgeInsets.symmetric(vertical: 8.0), - children: _data.map((list) { - return ListItemWidget(list: list); - }).toList(), + return GuardBySync( + child: StreamBuilder( + stream: TodoList.watchListsWithStats(), + builder: (context, snapshot) { + if (snapshot.data case final todoLists?) { + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: todoLists.map((list) { + return ListItemWidget(list: list); + }).toList(), + ); + } else { + return const CircularProgressIndicator(); + } + }, + ), ); } } diff --git a/demos/django-todolist/macos/Podfile.lock b/demos/django-todolist/macos/Podfile.lock index c1d67d1d..ec19cbfa 100644 --- a/demos/django-todolist/macos/Podfile.lock +++ b/demos/django-todolist/macos/Podfile.lock @@ -24,7 +24,7 @@ PODS: - sqlite3_flutter_libs (0.0.1): - Flutter - FlutterMacOS - - sqlite3 (~> 3.49.1) + - sqlite3 (~> 3.49.0) - sqlite3/dbstatvtab - sqlite3/fts5 - sqlite3/perf-threadsafe @@ -61,7 +61,7 @@ SPEC CHECKSUMS: powersync_flutter_libs: 011c1704766d154faf2373bb9c973d26910d322b shared_preferences_foundation: 9e1978ff2562383bd5676f64ec4e9aa8fa06a6f7 sqlite3: fc1400008a9b3525f5914ed715a5d1af0b8f4983 - sqlite3_flutter_libs: f8fc13346870e73fe35ebf6dbb997fbcd156b241 + sqlite3_flutter_libs: 3c323550ef3b928bc0aa9513c841e45a7d242832 PODFILE CHECKSUM: 236401fc2c932af29a9fcf0e97baeeb2d750d367 diff --git a/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme b/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme index 2c526589..a0e59441 100644 --- a/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme +++ b/demos/django-todolist/macos/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme @@ -59,6 +59,7 @@ ignoresPersistentStateOnLaunch = "NO" debugDocumentVersioning = "YES" debugServiceExtension = "internal" + enableGPUValidationMode = "1" allowLocationSimulation = "YES"> From 8271c2a974ea276ccb1f8e2fbc6c13c549467c80 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Apr 2025 13:56:04 +0200 Subject: [PATCH 16/16] Simplify API --- .../lib/widgets/guard_by_sync.dart | 10 +-- .../lib/src/sync/sync_status.dart | 82 ++++++++++--------- .../test/in_memory_sync_test.dart | 8 +- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart index b2514f86..d55ed4e3 100644 --- a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart +++ b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart @@ -25,10 +25,7 @@ class GuardBySync extends StatelessWidget { builder: (context, snapshot) { final status = snapshot.requireData; final (didSync, progress) = switch (priority) { - null => ( - status.hasSynced ?? false, - status.downloadProgress?.untilCompletion - ), + null => (status.hasSynced ?? false, status.downloadProgress), var priority? => ( status.statusForPriority(priority).hasSynced ?? false, status.downloadProgress?.untilPriority(priority) @@ -42,9 +39,10 @@ class GuardBySync extends StatelessWidget { child: Column( children: [ const Text('Busy with sync...'), - LinearProgressIndicator(value: progress?.fraction), + LinearProgressIndicator(value: progress?.downloadedFraction), if (progress case final progress?) - Text('${progress.completed} out of ${progress.total}') + Text( + '${progress.downloadedOperations} out of ${progress.totalOperations}') ], ), ); diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index 509fc235..fabe4322 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -176,13 +176,6 @@ final class SyncStatus { extension type const BucketPriority._(int priorityNumber) { static const _highest = 0; - /// The a bucket priority lower than the lowest priority that will ever be - /// allowed by the sync service. - /// - /// This can be used as a priority that tracks complete syncs instead of - /// partial completions. - static const _sentinel = BucketPriority._(2147483647); - factory BucketPriority(int i) { assert(i >= _highest); return BucketPriority._(i); @@ -236,15 +229,14 @@ typedef BucketProgress = ({ }); @internal -final class InternalSyncDownloadProgress { +final class InternalSyncDownloadProgress extends ProgressWithOperations { final Map buckets; - final int _totalDownloaded; - final int _totalTarget; - InternalSyncDownloadProgress(this.buckets) - : _totalDownloaded = buckets.values.map((e) => e.sinceLast).sum, - _totalTarget = buckets.values.map((e) => e.targetCount - e.atLast).sum; + : super._( + buckets.values.map((e) => e.targetCount - e.atLast).sum, + buckets.values.map((e) => e.sinceLast).sum, + ); factory InternalSyncDownloadProgress.forNewCheckpoint( Map localProgress, Checkpoint target) { @@ -269,8 +261,9 @@ final class InternalSyncDownloadProgress { /// Sums the total target and completed operations for all buckets up until /// the given [priority] (inclusive). - (int, int) targetAndCompletedCounts(BucketPriority priority) { - return buckets.values.where((e) => e.priority >= priority).fold( + ProgressWithOperations untilPriority(BucketPriority priority) { + final (total, downloaded) = + buckets.values.where((e) => e.priority >= priority).fold( (0, 0), (prev, entry) { final downloaded = entry.sinceLast; @@ -278,6 +271,8 @@ final class InternalSyncDownloadProgress { return (prev.$1 + total, prev.$2 + downloaded); }, ); + + return ProgressWithOperations._(total, downloaded); } InternalSyncDownloadProgress incrementDownloaded(SyncDataBatch batch) { @@ -304,18 +299,17 @@ final class InternalSyncDownloadProgress { @override bool operator ==(Object other) { return other is InternalSyncDownloadProgress && - // _totalDownloaded and _totalTarget are derived values, but comparing - // them first helps find a difference faster. - _totalDownloaded == other._totalDownloaded && - _totalTarget == other._totalTarget && + // totalOperations and downloadedOperations are derived values, but + // comparing them first helps find a difference faster. + totalOperations == other.totalOperations && + downloadedOperations == other.downloadedOperations && _mapEquality.equals(buckets, other.buckets); } @override String toString() { - final asView = asSyncDownloadProgress; - final all = asView.untilCompletion; - return 'for total: ${all.completed} / ${all.total}'; + final all = asSyncDownloadProgress; + return 'for total: ${all.downloadedOperations} / ${all.totalOperations}'; } static const _mapEquality = MapEquality(); @@ -329,14 +323,32 @@ final class InternalSyncDownloadProgress { /// /// To obtain these values, use [SyncDownloadProgress] available through /// [SyncStatus.downloadProgress]. -typedef ProgressWithOperations = ({ - int total, - int completed, - double fraction, -}); +final class ProgressWithOperations { + /// How many operations need to be downloaded in total until the current + /// download is complete. + final int totalOperations; + + /// How many operations have already been downloaded since the last complete + /// download. + final int downloadedOperations; + + ProgressWithOperations._(this.totalOperations, this.downloadedOperations); + + /// Relative progress (as a number between `0.0` and `1.0`). + double get downloadedFraction { + return totalOperations == 0 ? 0.0 : downloadedOperations / totalOperations; + } +} /// Provides realtime progress on how PowerSync is downloading rows. /// +/// This type reports progress by implementing [ProgressWithOperations], meaning +/// that [downloadedOperations], [totalOperations] and [downloadedFraction] are +/// available on instances of [SyncDownloadProgress]. +/// Additionally, it's possible to obtain the progress towards a specific +/// priority only (instead of tracking progress for the entire download) by +/// using [untilPriority]. +/// /// The reported progress always reflects the status towards the end of a /// sync iteration (after which a consistent snapshot of all buckets is /// available locally). @@ -351,15 +363,8 @@ typedef ProgressWithOperations = ({ /// counters are unlikely to be updated one-by-one. /// /// [compacting]: https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets -extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { - /// Returns download progress towards a complete checkpoint being received. - /// - /// The returned [ProgressWithOperations] tracks the target amount of - /// operations that need to be downloaded in total and how many of them have - /// already been received. - ProgressWithOperations get untilCompletion => - untilPriority(BucketPriority._sentinel); - +extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) + implements ProgressWithOperations { /// Returns download progress towards all data up until the specified /// [priority] being received. /// @@ -367,9 +372,6 @@ extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) { /// operations that need to be downloaded in total and how many of them have /// already been received. ProgressWithOperations untilPriority(BucketPriority priority) { - final (total, downloaded) = _internal.targetAndCompletedCounts(priority); - final progress = total == 0 ? 0.0 : downloaded / total; - - return (total: total, completed: downloaded, fraction: progress); + return _internal.untilPriority(priority); } } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index d18aca90..4469d576 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -593,8 +593,8 @@ TypeMatcher isSyncDownloadProgress({ required Object progress, Map priorities = const {}, }) { - var matcher = isA() - .having((e) => e.untilCompletion, 'untilCompletion', progress); + var matcher = + isA().having((e) => e, 'untilCompletion', progress); priorities.forEach((priority, expected) { matcher = matcher.having( (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); @@ -605,6 +605,6 @@ TypeMatcher isSyncDownloadProgress({ TypeMatcher progress(int completed, int total) { return isA() - .having((e) => e.completed, 'completed', completed) - .having((e) => e.total, 'total', total); + .having((e) => e.downloadedOperations, 'completed', completed) + .having((e) => e.totalOperations, 'total', total); }