Skip to content

Commit 75eb7ea

Browse files
update locks in syncing service
1 parent 88dbb16 commit 75eb7ea

File tree

1 file changed

+32
-21
lines changed

1 file changed

+32
-21
lines changed

Sources/PowerSync/attachments/SyncingService.swift

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import Foundation
33

44
/// A service that synchronizes attachments between local and remote storage.
55
///
6-
/// This actor watches for changes to active attachments and performs queued
6+
/// This watches for changes to active attachments and performs queued
77
/// download, upload, and delete operations. Syncs can be triggered manually,
88
/// periodically, or based on database changes.
9-
actor SyncingService {
9+
public class SyncingService {
1010
private let remoteStorage: RemoteStorageAdapter
1111
private let localStorage: LocalStorageAdapter
1212
private let attachmentsService: AttachmentService
@@ -17,6 +17,7 @@ actor SyncingService {
1717
private let syncTriggerSubject = PassthroughSubject<Void, Never>()
1818
private var periodicSyncTimer: Timer?
1919
private var syncTask: Task<Void, Never>?
20+
private let lock: LockActor
2021
let logger: any LoggerProtocol
2122

2223
let logTag = "AttachmentSync"
@@ -47,32 +48,32 @@ actor SyncingService {
4748
self.errorHandler = errorHandler
4849
self.syncThrottle = syncThrottle
4950
self.logger = logger
50-
closed = false
51+
self.closed = false
52+
self.lock = LockActor()
5153
}
5254

5355
/// Starts periodic syncing of attachments.
5456
///
5557
/// - Parameter period: The time interval in seconds between each sync.
5658
public func startSync(period: TimeInterval) async throws {
57-
try guardClosed()
58-
59-
// Close any active sync operations
60-
try await stopSync()
59+
try await lock.withLock {
60+
try guardClosed()
6161

62-
setupSyncFlow()
62+
// Close any active sync operations
63+
try await _stopSync()
6364

64-
periodicSyncTimer = Timer.scheduledTimer(
65-
withTimeInterval: period,
66-
repeats: true
67-
) { [weak self] _ in
68-
guard let self = self else { return }
69-
Task { try? await self.triggerSync() }
65+
setupSyncFlow(period: period)
7066
}
7167
}
7268

7369
public func stopSync() async throws {
74-
try guardClosed()
70+
try await lock.withLock {
71+
try guardClosed()
72+
try await _stopSync()
73+
}
74+
}
7575

76+
private func _stopSync() async throws {
7677
if let timer = periodicSyncTimer {
7778
timer.invalidate()
7879
periodicSyncTimer = nil
@@ -92,10 +93,12 @@ actor SyncingService {
9293

9394
/// Cleans up internal resources and cancels any ongoing syncing.
9495
func close() async throws {
95-
try guardClosed()
96+
try await lock.withLock {
97+
try guardClosed()
9698

97-
try await stopSync()
98-
closed = true
99+
try await _stopSync()
100+
closed = true
101+
}
99102
}
100103

101104
/// Triggers a sync operation. Can be called manually.
@@ -141,13 +144,13 @@ actor SyncingService {
141144
}
142145

143146
/// Sets up the main attachment syncing pipeline and starts watching for changes.
144-
private func setupSyncFlow() {
147+
private func setupSyncFlow(period: TimeInterval) {
145148
syncTask = Task {
146149
do {
147150
try await withThrowingTaskGroup(of: Void.self) { group in
148151
// Handle sync trigger events
149152
group.addTask {
150-
let syncTrigger = await self.createSyncTrigger()
153+
let syncTrigger = self.createSyncTrigger()
151154

152155
for await _ in syncTrigger {
153156
try Task.checkCancellation()
@@ -162,12 +165,20 @@ actor SyncingService {
162165

163166
// Watch attachment records. Trigger a sync on change
164167
group.addTask {
165-
for try await _ in try await self.attachmentsService.watchActiveAttachments() {
168+
for try await _ in try self.attachmentsService.watchActiveAttachments() {
166169
self.syncTriggerSubject.send(())
167170
try Task.checkCancellation()
168171
}
169172
}
170173

174+
group.addTask {
175+
let delay = UInt64(period * 1_000_000_000)
176+
while !Task.isCancelled {
177+
try await Task.sleep(nanoseconds: delay)
178+
try await self.triggerSync()
179+
}
180+
}
181+
171182
// Wait for any task to complete
172183
try await group.next()
173184
}

0 commit comments

Comments
 (0)