Skip to content

Commit 22c96e9

Browse files
improve concurrency and closing of queues
1 parent 4765bb2 commit 22c96e9

File tree

4 files changed

+201
-138
lines changed

4 files changed

+201
-138
lines changed

Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Sources/PowerSync/attachments/AttachmentQueue.swift

Lines changed: 64 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,5 @@
11
import Combine
22
import Foundation
3-
import OSLog
4-
5-
/// A watched attachment record item.
6-
/// This is usually returned from watching all relevant attachment IDs.
7-
public struct WatchedAttachmentItem {
8-
/// Id for the attachment record
9-
public let id: String
10-
11-
/// File extension used to determine an internal filename for storage if no `filename` is provided
12-
public let fileExtension: String?
13-
14-
/// Filename to store the attachment with
15-
public let filename: String?
16-
17-
/// Metadata for the attachment (optional)
18-
public let metaData: String?
19-
20-
/// Initializes a new `WatchedAttachmentItem`
21-
/// - Parameters:
22-
/// - id: Attachment record ID
23-
/// - fileExtension: Optional file extension
24-
/// - filename: Optional filename
25-
/// - metaData: Optional metadata
26-
public init(
27-
id: String,
28-
fileExtension: String? = nil,
29-
filename: String? = nil,
30-
metaData: String? = nil
31-
) {
32-
self.id = id
33-
self.fileExtension = fileExtension
34-
self.filename = filename
35-
self.metaData = metaData
36-
37-
precondition(fileExtension != nil || filename != nil, "Either fileExtension or filename must be provided.")
38-
}
39-
}
403

414
/// Class used to implement the attachment queue
425
/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments.
@@ -151,9 +114,10 @@ public actor AttachmentQueue {
151114

152115
/// Starts the attachment sync process
153116
public func startSync() async throws {
154-
if closed {
155-
throw PowerSyncAttachmentError.closed("Cannot start syncing on closed attachment queue")
156-
}
117+
try guardClosed()
118+
119+
// Stop any active syncing before starting new Tasks
120+
try await stopSyncing()
157121

158122
// Ensure the directory where attachments are downloaded exists
159123
try await localStorage.makeDir(path: attachmentsDirectory)
@@ -165,48 +129,63 @@ public actor AttachmentQueue {
165129
}
166130
}
167131

168-
await syncingService.startPeriodicSync(period: syncInterval)
132+
try await syncingService.startSync(period: syncInterval)
169133

170134
syncStatusTask = Task {
171135
do {
172-
// Create a task for watching connectivity changes
173-
let connectivityTask = Task {
174-
var previousConnected = db.currentStatus.connected
175-
176-
for await status in db.currentStatus.asFlow() {
177-
if !previousConnected && status.connected {
178-
await syncingService.triggerSync()
136+
try await withThrowingTaskGroup(of: Void.self) { group in
137+
// Add connectivity monitoring task
138+
group.addTask {
139+
var previousConnected = self.db.currentStatus.connected
140+
for await status in self.db.currentStatus.asFlow() {
141+
if !previousConnected && status.connected {
142+
try await self.syncingService.triggerSync()
143+
}
144+
previousConnected = status.connected
179145
}
180-
previousConnected = status.connected
181146
}
182-
}
183147

184-
// Create a task for watching attachment changes
185-
let watchTask = Task {
186-
for try await items in self.watchedAttachments {
187-
try await self.processWatchedAttachments(items: items)
148+
// Add attachment watching task
149+
group.addTask {
150+
for try await items in self.watchedAttachments {
151+
try await self.processWatchedAttachments(items: items)
152+
}
188153
}
189-
}
190154

191-
// Wait for both tasks to complete (they shouldn't unless cancelled)
192-
await connectivityTask.value
193-
try await watchTask.value
155+
// Wait for any task to complete (which should only happen on cancellation)
156+
try await group.next()
157+
}
194158
} catch {
195159
if !(error is CancellationError) {
196-
logger.error("Error in sync job: \(error.localizedDescription)", tag: logTag)
160+
logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag)
197161
}
198162
}
199163
}
200164
}
201165

166+
/// Stops active syncing tasks. Syncing can be resumed with ``startSync()``
167+
public func stopSyncing() async throws {
168+
try guardClosed()
169+
170+
syncStatusTask?.cancel()
171+
// Wait for the task to actually complete
172+
do {
173+
_ = try await syncStatusTask?.value
174+
} catch {
175+
// Task completed with error (likely cancellation)
176+
// This is okay
177+
}
178+
syncStatusTask = nil
179+
180+
try await syncingService.stopSync()
181+
}
182+
202183
/// Closes the attachment queue and cancels all sync tasks
203184
public func close() async throws {
204-
if closed {
205-
return
206-
}
185+
try guardClosed()
207186

208-
syncStatusTask?.cancel()
209-
await syncingService.close()
187+
try await stopSyncing()
188+
try await syncingService.close()
210189
closed = true
211190
}
212191

@@ -219,7 +198,7 @@ public actor AttachmentQueue {
219198
attachmentId: String,
220199
fileExtension: String?
221200
) -> String {
222-
return "\(attachmentId).\(fileExtension ?? "")"
201+
return "\(attachmentId).\(fileExtension ?? "attachment")"
223202
}
224203

225204
/// Processes watched attachment items and updates sync state
@@ -230,10 +209,10 @@ public actor AttachmentQueue {
230209
try await attachmentsService.withLock { context in
231210
let currentAttachments = try await context.getAttachments()
232211
var attachmentUpdates = [Attachment]()
233-
212+
234213
for item in items {
235214
let existingQueueItem = currentAttachments.first { $0.id == item.id }
236-
215+
237216
if existingQueueItem == nil {
238217
if !self.downloadAttachments {
239218
continue
@@ -246,7 +225,7 @@ public actor AttachmentQueue {
246225
attachmentId: item.id,
247226
fileExtension: item.fileExtension
248227
)
249-
228+
250229
attachmentUpdates.append(
251230
Attachment(
252231
id: item.id,
@@ -267,29 +246,29 @@ public actor AttachmentQueue {
267246
// and has been synced. If it's missing and hasSynced is false then
268247
// it must be an upload operation
269248
let newState = existingQueueItem!.localUri == nil ?
270-
AttachmentState.queuedDownload :
271-
AttachmentState.queuedUpload
272-
249+
AttachmentState.queuedDownload :
250+
AttachmentState.queuedUpload
251+
273252
attachmentUpdates.append(
274253
existingQueueItem!.with(state: newState)
275254
)
276255
}
277256
}
278257
}
279-
280-
258+
281259
/**
282260
* Archive any items not specified in the watched items except for items pending delete.
283261
*/
284262
for attachment in currentAttachments {
285-
if attachment.state != AttachmentState.queuedDelete &&
286-
items.first(where: { $0.id == attachment.id }) == nil {
263+
if attachment.state != AttachmentState.queuedDelete,
264+
items.first(where: { $0.id == attachment.id }) == nil
265+
{
287266
attachmentUpdates.append(
288267
attachment.with(state: AttachmentState.archived)
289268
)
290269
}
291270
}
292-
271+
293272
if !attachmentUpdates.isEmpty {
294273
try await context.saveAttachments(attachments: attachmentUpdates)
295274
}
@@ -319,11 +298,11 @@ public actor AttachmentQueue {
319298

320299
// Write the file to the filesystem
321300
let fileSize = try await localStorage.saveFile(filePath: localUri, data: data)
322-
301+
323302
return try await attachmentsService.withLock { context in
324303
// Start a write transaction. The attachment record and relevant local relationship
325304
// assignment should happen in the same transaction.
326-
return try await self.db.writeTransaction { tx in
305+
try await self.db.writeTransaction { tx in
327306
let attachment = Attachment(
328307
id: id,
329308
filename: filename,
@@ -385,7 +364,7 @@ public actor AttachmentQueue {
385364

386365
/// Removes all archived items
387366
public func expireCache() async throws {
388-
try await attachmentsService.withLock { context in
367+
try await attachmentsService.withLock { context in
389368
var done = false
390369
repeat {
391370
done = try await self.syncingService.deleteArchivedAttachments(context)
@@ -401,4 +380,10 @@ public actor AttachmentQueue {
401380
try await self.localStorage.rmDir(path: self.attachmentsDirectory)
402381
}
403382
}
383+
384+
private func guardClosed() throws {
385+
if closed {
386+
throw PowerSyncAttachmentError.closed("Attachment queue is closed")
387+
}
388+
}
404389
}

0 commit comments

Comments
 (0)