Skip to content

Commit 7d7d9a9

Browse files
update locks
1 parent 2bdef94 commit 7d7d9a9

File tree

3 files changed

+99
-89
lines changed

3 files changed

+99
-89
lines changed

Sources/PowerSync/attachments/AttachmentQueue.swift

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import Foundation
33

44
/// Class used to implement the attachment queue
55
/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments.
6-
public actor AttachmentQueue {
6+
public class AttachmentQueue {
77
/// Default name of the attachments table
88
public static let defaultTableName = "attachments"
99

@@ -67,12 +67,14 @@ public actor AttachmentQueue {
6767
logger: self.logger,
6868
getLocalUri: { [weak self] filename in
6969
guard let self = self else { return filename }
70-
return await self.getLocalUri(filename)
70+
return self.getLocalUri(filename)
7171
},
7272
errorHandler: self.errorHandler,
7373
syncThrottle: self.syncThrottleDuration
7474
)
7575

76+
private let lock: LockActor
77+
7678
/// Initializes the attachment queue
7779
/// - Parameters match the stored properties
7880
public init(
@@ -103,73 +105,81 @@ public actor AttachmentQueue {
103105
self.subdirectories = subdirectories
104106
self.downloadAttachments = downloadAttachments
105107
self.logger = logger ?? db.logger
106-
107-
attachmentsService = AttachmentService(
108+
self.attachmentsService = AttachmentService(
108109
db: db,
109110
tableName: attachmentsQueueTableName,
110111
logger: self.logger,
111112
maxArchivedCount: archivedCacheLimit
112113
)
114+
self.lock = LockActor()
113115
}
114116

115117
/// Starts the attachment sync process
116118
public func startSync() async throws {
117-
try guardClosed()
119+
try await lock.withLock {
120+
try guardClosed()
118121

119-
// Stop any active syncing before starting new Tasks
120-
try await stopSyncing()
122+
// Stop any active syncing before starting new Tasks
123+
try await _stopSyncing()
121124

122-
// Ensure the directory where attachments are downloaded exists
123-
try await localStorage.makeDir(path: attachmentsDirectory)
125+
// Ensure the directory where attachments are downloaded exists
126+
try await localStorage.makeDir(path: attachmentsDirectory)
124127

125-
if let subdirectories = subdirectories {
126-
for subdirectory in subdirectories {
127-
let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path
128-
try await localStorage.makeDir(path: path)
128+
if let subdirectories = subdirectories {
129+
for subdirectory in subdirectories {
130+
let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path
131+
try await localStorage.makeDir(path: path)
132+
}
129133
}
130-
}
131-
132-
// Verify initial state
133-
try await attachmentsService.withLock {context in
134-
try await self.verifyAttachments(context: context)
135-
}
136134

137-
try await syncingService.startSync(period: syncInterval)
138-
139-
syncStatusTask = Task {
140-
do {
141-
try await withThrowingTaskGroup(of: Void.self) { group in
142-
// Add connectivity monitoring task
143-
group.addTask {
144-
var previousConnected = self.db.currentStatus.connected
145-
for await status in self.db.currentStatus.asFlow() {
146-
if !previousConnected && status.connected {
147-
try await self.syncingService.triggerSync()
135+
// Verify initial state
136+
try await attachmentsService.withLock { context in
137+
try await self.verifyAttachments(context: context)
138+
}
139+
140+
try await syncingService.startSync(period: syncInterval)
141+
142+
syncStatusTask = Task {
143+
do {
144+
try await withThrowingTaskGroup(of: Void.self) { group in
145+
// Add connectivity monitoring task
146+
group.addTask {
147+
var previousConnected = self.db.currentStatus.connected
148+
for await status in self.db.currentStatus.asFlow() {
149+
if !previousConnected && status.connected {
150+
try await self.syncingService.triggerSync()
151+
}
152+
previousConnected = status.connected
148153
}
149-
previousConnected = status.connected
150154
}
151-
}
152155

153-
// Add attachment watching task
154-
group.addTask {
155-
for try await items in try self.watchAttachments() {
156-
try await self.processWatchedAttachments(items: items)
156+
// Add attachment watching task
157+
group.addTask {
158+
for try await items in try self.watchAttachments() {
159+
try await self.processWatchedAttachments(items: items)
160+
}
157161
}
158-
}
159162

160-
// Wait for any task to complete (which should only happen on cancellation)
161-
try await group.next()
162-
}
163-
} catch {
164-
if !(error is CancellationError) {
165-
logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag)
163+
// Wait for any task to complete (which should only happen on cancellation)
164+
try await group.next()
165+
}
166+
} catch {
167+
if !(error is CancellationError) {
168+
logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag)
169+
}
166170
}
167171
}
168172
}
169173
}
170174

171175
/// Stops active syncing tasks. Syncing can be resumed with ``startSync()``
172176
public func stopSyncing() async throws {
177+
try await lock.withLock {
178+
try await _stopSyncing()
179+
}
180+
}
181+
182+
private func _stopSyncing() async throws {
173183
try guardClosed()
174184

175185
syncStatusTask?.cancel()
@@ -187,11 +197,13 @@ public actor AttachmentQueue {
187197

188198
/// Closes the attachment queue and cancels all sync tasks
189199
public func close() async throws {
190-
try guardClosed()
200+
try await lock.withLock {
201+
try guardClosed()
191202

192-
try await stopSyncing()
193-
try await syncingService.close()
194-
closed = true
203+
try await _stopSyncing()
204+
try await syncingService.close()
205+
closed = true
206+
}
195207
}
196208

197209
/// Resolves the filename for a new attachment
@@ -226,7 +238,7 @@ public actor AttachmentQueue {
226238
// This item is assumed to be coming from an upstream sync
227239
// Locally created new items should be persisted using saveFile before
228240
// this point.
229-
let filename = await self.resolveNewAttachmentFilename(
241+
let filename = self.resolveNewAttachmentFilename(
230242
attachmentId: item.id,
231243
fileExtension: item.fileExtension
232244
)
@@ -385,29 +397,30 @@ public actor AttachmentQueue {
385397
try await self.localStorage.rmDir(path: self.attachmentsDirectory)
386398
}
387399
}
388-
400+
389401
/// Verifies attachment records are present in the filesystem
390402
private func verifyAttachments(context: AttachmentContext) async throws {
391403
let attachments = try await context.getAttachments()
392404
var updates: [Attachment] = []
393-
405+
394406
for attachment in attachments {
395407
guard let localUri = attachment.localUri else {
396408
continue
397409
}
398-
410+
399411
let exists = try await localStorage.fileExists(filePath: localUri)
400412
if attachment.state == AttachmentState.synced ||
401413
attachment.state == AttachmentState.queuedUpload &&
402-
!exists {
414+
!exists
415+
{
403416
// The file must have been removed from the local storage
404417
updates.append(attachment.with(
405418
state: .archived,
406419
localUri: .some(nil) // Clears the value
407420
))
408421
}
409422
}
410-
423+
411424
try await context.saveAttachments(attachments: updates)
412425
}
413426

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
import Foundation
22

33
/// Service which manages attachment records.
4-
public actor AttachmentService {
4+
public class AttachmentService {
55
private let db: any PowerSyncDatabaseProtocol
66
private let tableName: String
77
private let logger: any LoggerProtocol
88
private let logTag = "AttachmentService"
99

1010
private let context: AttachmentContext
11-
12-
/// Actor isolation does not automatically queue [withLock] async operations
13-
/// These variables are used to ensure FIFO serial queing
14-
private var lockQueue: [CheckedContinuation<Void, Never>] = []
15-
private var isLocked = false
11+
private let lock: LockActor
1612

1713
/// Initializes the attachment service with the specified database, table name, logger, and max archived count.
1814
public init(
@@ -30,6 +26,7 @@ public actor AttachmentService {
3026
logger: logger,
3127
maxArchivedCount: maxArchivedCount
3228
)
29+
lock = LockActor()
3330
}
3431

3532
/// Watches for changes to the attachments table.
@@ -61,34 +58,8 @@ public actor AttachmentService {
6158

6259
/// Executes a callback with exclusive access to the attachment context.
6360
public func withLock<R>(callback: @Sendable @escaping (AttachmentContext) async throws -> R) async throws -> R {
64-
// If locked, join the queue
65-
if isLocked {
66-
await withCheckedContinuation { continuation in
67-
lockQueue.append(continuation)
68-
}
69-
}
70-
71-
// Now we have the lock
72-
isLocked = true
73-
74-
do {
75-
let result = try await callback(context)
76-
// Release lock and notify next in queue
77-
releaseLock()
78-
return result
79-
} catch {
80-
// Release lock and notify next in queue
81-
releaseLock()
82-
throw error
83-
}
84-
}
85-
86-
private func releaseLock() {
87-
if let next = lockQueue.first {
88-
lockQueue.removeFirst()
89-
next.resume()
90-
} else {
91-
isLocked = false
92-
}
93-
}
61+
try await lock.withLock {
62+
try await callback(context)
63+
}
64+
}
9465
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
internal actor LockActor {
3+
private var isLocked = false
4+
private var queue: [CheckedContinuation<Void, Never>] = []
5+
6+
func withLock<T>(_ execute: @Sendable () async throws -> T) async throws -> T {
7+
if isLocked {
8+
await withCheckedContinuation { continuation in
9+
queue.append(continuation)
10+
}
11+
}
12+
13+
isLocked = true
14+
defer { unlockNext() }
15+
return try await execute()
16+
}
17+
18+
private func unlockNext() {
19+
if let next = queue.first {
20+
queue.removeFirst()
21+
next.resume(returning: ())
22+
} else {
23+
isLocked = false
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)