Skip to content

Commit 7d47427

Browse files
committed
Restore has_synced on iteration start
1 parent a06684b commit 7d47427

File tree

4 files changed

+81
-7
lines changed

4 files changed

+81
-7
lines changed

crates/core/src/sync/storage_adapter.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use crate::{
1010
operations::delete_bucket,
1111
schema::Schema,
1212
state::DatabaseState,
13-
sync::checkpoint::{validate_checkpoint, ChecksumMismatch},
13+
sync::{
14+
checkpoint::{validate_checkpoint, ChecksumMismatch},
15+
sync_status::SyncPriorityStatus,
16+
},
1417
sync_local::{PartialSyncOperation, SyncOperation},
1518
};
1619

@@ -68,6 +71,32 @@ impl StorageAdapter {
6871
Ok(requests)
6972
}
7073

74+
pub fn collect_sync_state(&self) -> Result<Vec<SyncPriorityStatus>, PowerSyncError> {
75+
// language=SQLite
76+
let statement = self
77+
.db
78+
.prepare_v2(
79+
"SELECT priority, unixepoch(last_synced_at) FROM ps_sync_state ORDER BY priority",
80+
)
81+
.into_db_result(self.db)?;
82+
83+
let mut items = Vec::<SyncPriorityStatus>::new();
84+
while statement.step()? == ResultCode::ROW {
85+
let priority = BucketPriority {
86+
number: statement.column_int(0),
87+
};
88+
let timestamp = statement.column_int64(1);
89+
90+
items.push(SyncPriorityStatus {
91+
priority,
92+
last_synced_at: Some(Timestamp(timestamp)),
93+
has_synced: Some(true),
94+
});
95+
}
96+
97+
return Ok(items);
98+
}
99+
71100
pub fn delete_buckets<'a>(
72101
&self,
73102
buckets: impl IntoIterator<Item = &'a str>,

crates/core/src/sync/streaming_sync.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,11 @@ impl StreamingSyncIteration {
577577
));
578578
};
579579

580-
self.status
581-
.update(|s| s.start_connecting(), &mut event.instructions);
580+
let sync_state = self.adapter.collect_sync_state()?;
581+
self.status.update(
582+
move |s| s.start_connecting(sync_state),
583+
&mut event.instructions,
584+
);
582585

583586
let requests = self.adapter.collect_bucket_requests()?;
584587
let local_bucket_names: Vec<String> = requests.iter().map(|s| s.name.clone()).collect();

crates/core/src/sync/sync_status.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ impl DownloadSyncStatus {
4646
self.downloading = None;
4747
}
4848

49-
pub fn start_connecting(&mut self) {
49+
pub fn start_connecting(&mut self, status: Vec<SyncPriorityStatus>) {
5050
self.connected = false;
5151
self.downloading = None;
5252
self.connecting = true;
53+
self.priority_status = status;
54+
self.debug_assert_priority_status_is_sorted();
5355
}
5456

5557
pub fn mark_connected(&mut self) {
@@ -161,9 +163,9 @@ pub struct Timestamp(pub i64);
161163

162164
#[derive(Serialize, Hash)]
163165
pub struct SyncPriorityStatus {
164-
priority: BucketPriority,
165-
last_synced_at: Option<Timestamp>,
166-
has_synced: Option<bool>,
166+
pub priority: BucketPriority,
167+
pub last_synced_at: Option<Timestamp>,
168+
pub has_synced: Option<bool>,
167169
}
168170

169171
/// Per-bucket download progress information.

dart/test/sync_test.dart

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,46 @@ void _syncTests<T>({
301301
}
302302
});
303303

304+
syncTest('remembers sync state', (controller) {
305+
invokeControl('start', null);
306+
307+
pushCheckpoint(buckets: priorityBuckets);
308+
pushCheckpointComplete();
309+
310+
controller.elapse(Duration(minutes: 10));
311+
pushCheckpoint(buckets: priorityBuckets);
312+
pushCheckpointComplete(priority: 2);
313+
invokeControl('stop', null);
314+
315+
final instructions = invokeControl('start', null);
316+
expect(
317+
instructions,
318+
contains(
319+
containsPair(
320+
'UpdateSyncStatus',
321+
containsPair(
322+
'status',
323+
containsPair(
324+
'priority_status',
325+
[
326+
{
327+
'priority': 2,
328+
'last_synced_at': 1740823800,
329+
'has_synced': true
330+
},
331+
{
332+
'priority': 2147483647,
333+
'last_synced_at': 1740823200,
334+
'has_synced': true
335+
},
336+
],
337+
),
338+
),
339+
),
340+
),
341+
);
342+
});
343+
304344
test('clearing database clears sync status', () {
305345
invokeControl('start', null);
306346
pushCheckpoint(buckets: priorityBuckets);

0 commit comments

Comments
 (0)