Skip to content

Sync progress status #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions demos/supabase-todolist/lib/widgets/guard_by_sync.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import 'package:flutter/material.dart';
import 'package:powersync/powersync.dart' hide Column;
import 'package:powersync_flutter_demo/powersync.dart';

/// A widget that shows [child] after a complete sync on the database has
/// completed and a progress bar before that.
class GuardBySync extends StatelessWidget {
final Widget child;

/// When set, wait only for a complete sync within the [BucketPriority]
/// instead of a full sync.
final BucketPriority? priority;

const GuardBySync({
super.key,
required this.child,
this.priority,
});

@override
Widget build(BuildContext context) {
return StreamBuilder<SyncStatus>(
stream: db.statusStream,
initialData: db.currentStatus,
builder: (context, snapshot) {
final status = snapshot.requireData;
final (didSync, progress) = switch (priority) {
null => (
status.hasSynced ?? false,
status.downloadProgress?.untilCompletion
),
var priority? => (
status.statusForPriority(priority).hasSynced ?? false,
status.downloadProgress?.untilPriority(priority)
),
};

if (didSync) {
return child;
} else {
return Center(
child: Column(
children: [
const Text('Busy with sync...'),
LinearProgressIndicator(value: progress?.fraction),
if (progress case final progress?)
Text('${progress.completed} out of ${progress.total}')
],
),
);
}
},
);
}
}
42 changes: 18 additions & 24 deletions demos/supabase-todolist/lib/widgets/lists_page.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'package:flutter/material.dart';
import 'package:powersync/powersync.dart';
import 'package:powersync_flutter_demo/powersync.dart';
import 'package:powersync_flutter_demo/widgets/guard_by_sync.dart';

import './list_item.dart';
import './list_item_dialog.dart';
Expand Down Expand Up @@ -46,29 +46,23 @@ final class ListsWidget extends StatelessWidget {

@override
Widget build(BuildContext context) {
return FutureBuilder(
future: db.waitForFirstSync(priority: _listsPriority),
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.done) {
return StreamBuilder(
stream: TodoList.watchListsWithStats(),
builder: (context, snapshot) {
if (snapshot.data case final todoLists?) {
return ListView(
padding: const EdgeInsets.symmetric(vertical: 8.0),
children: todoLists.map((list) {
return ListItemWidget(list: list);
}).toList(),
);
} else {
return const CircularProgressIndicator();
}
},
);
} else {
return const Text('Busy with sync...');
}
},
return GuardBySync(
priority: _listsPriority,
child: StreamBuilder(
stream: TodoList.watchListsWithStats(),
builder: (context, snapshot) {
if (snapshot.data case final todoLists?) {
return ListView(
padding: const EdgeInsets.symmetric(vertical: 8.0),
children: todoLists.map((list) {
return ListItemWidget(list: list);
}).toList(),
);
} else {
return const CircularProgressIndicator();
}
},
),
);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/powersync_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ export 'src/exceptions.dart';
export 'src/log.dart';
export 'src/open_factory.dart';
export 'src/schema.dart';
export 'src/sync_status.dart';
export 'src/sync_status.dart' hide InternalSyncDownloadProgress;
export 'src/uuid.dart';
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'package:powersync_core/sqlite3_common.dart';

import 'crud.dart';
import 'schema_logic.dart';
import 'sync_types.dart';
import 'sync/protocol.dart';

const compactOperationInterval = 1000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/schema_logic.dart';
import 'package:powersync_core/src/streaming_sync.dart';
import 'package:powersync_core/src/sync/streaming_sync.dart';
import 'package:powersync_core/src/sync_status.dart';
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/sqlite_async.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import 'package:powersync_core/src/log.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'package:powersync_core/src/open_factory/web/web_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/streaming_sync.dart';
import 'package:powersync_core/src/sync/streaming_sync.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;

Expand Down
124 changes: 124 additions & 0 deletions packages/powersync_core/lib/src/sync/mutable_sync_status.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import 'dart:async';

import 'package:collection/collection.dart';

import '../sync_status.dart';
import 'protocol.dart';

final class MutableSyncStatus {
bool connected = false;
bool connecting = false;
bool downloading = false;
bool uploading = false;

InternalSyncDownloadProgress? downloadProgress;
List<SyncPriorityStatus> priorityStatusEntries = const [];

DateTime? lastSyncedAt;

Object? uploadError;
Object? downloadError;

void setConnectingIfNotConnected() {
if (!connected) {
connecting = true;
}
}

void setConnected() {
connected = true;
connecting = false;
}

void applyDownloadError(Object error) {
connected = false;
connecting = false;
downloading = false;
downloadProgress = null;
downloadError = error;
}

void applyCheckpointReached(Checkpoint applied) {
downloading = false;
downloadError = null;
final now = lastSyncedAt = DateTime.now();
priorityStatusEntries = [
if (applied.checksums.isNotEmpty)
(
hasSynced: true,
lastSyncedAt: now,
priority: maxBy(
applied.checksums.map((cs) => BucketPriority(cs.priority)),
(priority) => priority,
compare: BucketPriority.comparator,
)!,
)
];
}

void applyCheckpointStarted(Checkpoint target) {
downloading = true;
// TODO: Include pending ops from interrupted download, if any...
downloadProgress = InternalSyncDownloadProgress.fromZero(target);
}

void applyUploadError(Object error) {
uploading = false;
uploadError = error;
}

void applyBatchReceived(
Map<String, BucketDescription?> currentBuckets, SyncDataBatch batch) {
downloading = true;
if (downloadProgress case final previousProgress?) {
downloadProgress = previousProgress.incrementDownloaded([
for (final bucket in batch.buckets)
if (currentBuckets[bucket.bucket] case final knownBucket?)
(BucketPriority(knownBucket.priority), bucket.data.length),
]);
}
}

SyncStatus immutableSnapsot() {
return SyncStatus(
connected: connected,
connecting: connecting,
downloading: downloading,
uploading: uploading,
downloadProgress: downloadProgress?.asSyncDownloadProgress,
priorityStatusEntries: UnmodifiableListView(priorityStatusEntries),
lastSyncedAt: lastSyncedAt,
hasSynced: lastSyncedAt != null,
uploadError: uploadError,
downloadError: downloadError,
);
}
}

final class SyncStatusStateStream {
final MutableSyncStatus status = MutableSyncStatus();
SyncStatus _lastPublishedStatus = const SyncStatus();

final StreamController<SyncStatus> _statusStreamController =
StreamController<SyncStatus>.broadcast();

Stream<SyncStatus> get statusStream => _statusStreamController.stream;

void updateStatus(void Function(MutableSyncStatus status) change) {
change(status);

if (_statusStreamController.isClosed) {
return;
}

final current = status.immutableSnapsot();
if (current != _lastPublishedStatus) {
_statusStreamController.add(current);
_lastPublishedStatus = current;
}
}

void close() {
_statusStreamController.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import 'dart:async';
import 'dart:convert';

import 'bucket_storage.dart';
import '../bucket_storage.dart';

/// Messages sent from the sync service.
sealed class StreamingSyncLine {
Expand Down
Loading
Loading