Skip to content

Commit 27623d9

Browse files
committed
Start tracking download operations
1 parent 83e90ee commit 27623d9

15 files changed

+229
-139
lines changed

packages/powersync_core/lib/src/bucket_storage.dart

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import 'package:powersync_core/sqlite3_common.dart';
77

88
import 'crud.dart';
99
import 'schema_logic.dart';
10-
import 'sync_types.dart';
10+
import 'sync/protocol.dart';
1111

1212
const compactOperationInterval = 1000;
1313

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.
1616
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
1717
import 'package:powersync_core/src/schema.dart';
1818
import 'package:powersync_core/src/schema_logic.dart';
19-
import 'package:powersync_core/src/streaming_sync.dart';
19+
import 'package:powersync_core/src/sync/streaming_sync.dart';
2020
import 'package:powersync_core/src/sync_status.dart';
2121
import 'package:sqlite_async/sqlite3_common.dart';
2222
import 'package:sqlite_async/sqlite_async.dart';

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import 'package:powersync_core/src/log.dart';
1111
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
1212
import 'package:powersync_core/src/open_factory/web/web_open_factory.dart';
1313
import 'package:powersync_core/src/schema.dart';
14-
import 'package:powersync_core/src/streaming_sync.dart';
14+
import 'package:powersync_core/src/sync/streaming_sync.dart';
1515
import 'package:sqlite_async/sqlite_async.dart';
1616
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1717

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import 'dart:async';
2+
3+
import 'package:collection/collection.dart';
4+
5+
import '../sync_status.dart';
6+
import 'protocol.dart';
7+
8+
final class MutableSyncStatus {
9+
bool connected = false;
10+
bool connecting = false;
11+
bool downloading = false;
12+
bool uploading = false;
13+
14+
InternalSyncDownloadProgress? downloadProgress;
15+
List<SyncPriorityStatus> priorityStatusEntries = const [];
16+
17+
DateTime? lastSyncedAt;
18+
19+
Object? uploadError;
20+
Object? downloadError;
21+
22+
void setConnectingIfNotConnected() {
23+
if (!connected) {
24+
connecting = true;
25+
}
26+
}
27+
28+
void setConnected() {
29+
connected = true;
30+
connecting = false;
31+
}
32+
33+
void applyDownloadError(Object error) {
34+
connected = false;
35+
connecting = false;
36+
downloading = false;
37+
downloadProgress = null;
38+
downloadError = error;
39+
}
40+
41+
void applyCheckpointReached(Checkpoint applied) {
42+
downloading = false;
43+
downloadError = null;
44+
final now = lastSyncedAt = DateTime.now();
45+
priorityStatusEntries = [
46+
if (applied.checksums.isNotEmpty)
47+
(
48+
hasSynced: true,
49+
lastSyncedAt: now,
50+
priority: maxBy(
51+
applied.checksums.map((cs) => BucketPriority(cs.priority)),
52+
(priority) => priority,
53+
compare: BucketPriority.comparator,
54+
)!,
55+
)
56+
];
57+
}
58+
59+
void applyCheckpointStarted(Checkpoint target) {
60+
downloading = true;
61+
downloadProgress = InternalSyncDownloadProgress.fromZero(target);
62+
}
63+
64+
void applyUploadError(Object error) {
65+
uploading = false;
66+
uploadError = error;
67+
}
68+
69+
SyncStatus immutableSnapsot() {
70+
return SyncStatus(
71+
connected: connected,
72+
connecting: connecting,
73+
downloading: downloading,
74+
uploading: uploading,
75+
downloadProgress: downloadProgress?.asSyncDownloadProgress,
76+
priorityStatusEntries: UnmodifiableListView(priorityStatusEntries),
77+
lastSyncedAt: lastSyncedAt,
78+
hasSynced: lastSyncedAt != null,
79+
uploadError: uploadError,
80+
downloadError: downloadError,
81+
);
82+
}
83+
}
84+
85+
final class SyncStatusStateStream {
86+
final MutableSyncStatus status = MutableSyncStatus();
87+
SyncStatus _lastPublishedStatus = const SyncStatus();
88+
89+
final StreamController<SyncStatus> _statusStreamController =
90+
StreamController<SyncStatus>.broadcast();
91+
92+
Stream<SyncStatus> get statusStream => _statusStreamController.stream;
93+
94+
void updateStatus(void Function(MutableSyncStatus status) change) {
95+
change(status);
96+
97+
if (_statusStreamController.isClosed) {
98+
return;
99+
}
100+
101+
final current = status.immutableSnapsot();
102+
if (current != _lastPublishedStatus) {
103+
_statusStreamController.add(current);
104+
_lastPublishedStatus = current;
105+
}
106+
}
107+
108+
void close() {
109+
_statusStreamController.close();
110+
}
111+
}

packages/powersync_core/lib/src/sync_types.dart renamed to packages/powersync_core/lib/src/sync/protocol.dart

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import 'dart:async';
22
import 'dart:convert';
33

4-
import 'bucket_storage.dart';
4+
import '../bucket_storage.dart';
55

66
/// Messages sent from the sync service.
77
sealed class StreamingSyncLine {

0 commit comments

Comments
 (0)