diff --git a/CHANGELOG.md b/CHANGELOG.md index a3b7860..828c923 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,18 @@ # Changelog +# 1.0.0-Beta.12 + +- Added attachment sync helpers +- Added support for cancellations in watched queries + # 1.0.0-beta.11 -* Fix deadlock when `connect()` is called immediately after opening a database. +- Fix deadlock when `connect()` is called immediately after opening a database. # 1.0.0-Beta.10 -* Added the ability to specify a custom logging implementation +- Added the ability to specify a custom logging implementation + ```swift let db = PowerSyncDatabase( schema: Schema( @@ -23,69 +29,70 @@ logger: DefaultLogger(minSeverity: .debug) ) ``` -* added `.close()` method on `PowerSyncDatabaseProtocol` -* Update `powersync-kotlin` dependency to version `1.0.0-BETA29`, which fixes these issues: - * Fix potential race condition between jobs in `connect()` and `disconnect()`. - * Fix race condition causing data received during uploads not to be applied. - * Fixed issue where automatic driver migrations would fail with the error: + +- added `.close()` method on `PowerSyncDatabaseProtocol` +- Update `powersync-kotlin` dependency to version `1.0.0-BETA29`, which fixes these issues: + - Fix potential race condition between jobs in `connect()` and `disconnect()`. + - Fix race condition causing data received during uploads not to be applied. + - Fixed issue where automatic driver migrations would fail with the error: + ``` Sqlite operation failure database is locked attempted to run migration and failed. closing connection ``` ## 1.0.0-Beta.9 -* Update PowerSync SQLite core extension to 0.3.12. -* Added queuing protection and warnings when connecting multiple PowerSync clients to the same database file. -* Improved concurrent SQLite connection support. A single write connection and multiple read connections are used for concurrent read queries. -* Internally improved the linking of SQLite. -* Enabled Full Text Search support. -* Added the ability to update the schema for existing PowerSync clients. -* Fixed bug where local only, insert only and view name overrides were not applied for schema tables. +- Update PowerSync SQLite core extension to 0.3.12. +- Added queuing protection and warnings when connecting multiple PowerSync clients to the same database file. +- Improved concurrent SQLite connection support. A single write connection and multiple read connections are used for concurrent read queries. +- Internally improved the linking of SQLite. +- Enabled Full Text Search support. +- Added the ability to update the schema for existing PowerSync clients. +- Fixed bug where local only, insert only and view name overrides were not applied for schema tables. ## 1.0.0-Beta.8 -* Improved watch query internals. Added the ability to throttle watched queries. -* Added support for sync bucket priorities. +- Improved watch query internals. Added the ability to throttle watched queries. +- Added support for sync bucket priorities. ## 1.0.0-Beta.7 -* Fixed an issue where throwing exceptions in the query `mapper` could cause a runtime crash. -* Internally improved type casting. +- Fixed an issue where throwing exceptions in the query `mapper` could cause a runtime crash. +- Internally improved type casting. ## 1.0.0-Beta.6 -* BREAKING CHANGE: `watch` queries are now throwable and therefore will need to be accompanied by a `try` e.g. +- BREAKING CHANGE: `watch` queries are now throwable and therefore will need to be accompanied by a `try` e.g. ```swift try database.watch() ``` -* BREAKING CHANGE: `transaction` functions are now throwable and therefore will need to be accompanied by a `try` e.g. +- BREAKING CHANGE: `transaction` functions are now throwable and therefore will need to be accompanied by a `try` e.g. ```swift try await database.writeTransaction { transaction in try transaction.execute(...) } ``` -* Allow `execute` errors to be handled -* `userId` is now set to `nil` by default and therefore it is no longer required to be set to `nil` when instantiating `PowerSyncCredentials` and can therefore be left out. -## 1.0.0-Beta.5 +- Allow `execute` errors to be handled +- `userId` is now set to `nil` by default and therefore it is no longer required to be set to `nil` when instantiating `PowerSyncCredentials` and can therefore be left out. -* Implement improvements to errors originating in Kotlin so that they can be handled in Swift -* Improve `__fetchCredentials`to log the error but not cause an app crash on error +## 1.0.0-Beta.5 +- Implement improvements to errors originating in Kotlin so that they can be handled in Swift +- Improve `__fetchCredentials`to log the error but not cause an app crash on error ## 1.0.0-Beta.4 -* Allow cursor to use column name to get value by including the following functions that accept a column name parameter: -`getBoolean`,`getBooleanOptional`,`getString`,`getStringOptional`, `getLong`,`getLongOptional`, `getDouble`,`getDoubleOptional` -* BREAKING CHANGE: This should not affect anyone but made `KotlinPowerSyncCredentials`, `KotlinPowerSyncDatabase` and `KotlinPowerSyncBackendConnector` private as these should never have been public. - +- Allow cursor to use column name to get value by including the following functions that accept a column name parameter: + `getBoolean`,`getBooleanOptional`,`getString`,`getStringOptional`, `getLong`,`getLongOptional`, `getDouble`,`getDoubleOptional` +- BREAKING CHANGE: This should not affect anyone but made `KotlinPowerSyncCredentials`, `KotlinPowerSyncDatabase` and `KotlinPowerSyncBackendConnector` private as these should never have been public. ## 1.0.0-Beta.3 -* BREAKING CHANGE: Update underlying powersync-kotlin package to BETA18.0 which requires transactions to become synchronous as opposed to asynchronous. +- BREAKING CHANGE: Update underlying powersync-kotlin package to BETA18.0 which requires transactions to become synchronous as opposed to asynchronous. ```swift try await database.writeTransaction { transaction in try await transaction.execute( @@ -106,8 +113,8 @@ try await database.writeTransaction { transaction in ## 1.0.0-Beta.2 -* Upgrade PowerSyncSqliteCore to 0.3.8 +- Upgrade PowerSyncSqliteCore to 0.3.8 ## 1.0.0-Beta.1 -* Initial Beta release +- Initial Beta release diff --git a/Demo/PowerSyncExample.xcodeproj/project.pbxproj b/Demo/PowerSyncExample.xcodeproj/project.pbxproj index e1035ae..e814d01 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.pbxproj +++ b/Demo/PowerSyncExample.xcodeproj/project.pbxproj @@ -40,6 +40,7 @@ B69F7D862C8EE27400565448 /* AnyCodable in Frameworks */ = {isa = PBXBuildFile; productRef = B69F7D852C8EE27400565448 /* AnyCodable */; }; B6B3698A2C64F4B30033C307 /* Navigation.swift in Sources */ = {isa = PBXBuildFile; fileRef = B6B369892C64F4B30033C307 /* Navigation.swift */; }; B6FFD5322D06DA8000EEE60F /* PowerSync in Frameworks */ = {isa = PBXBuildFile; productRef = B6FFD5312D06DA8000EEE60F /* PowerSync */; }; + BE2F26EC2DA54B2F0080F1AE /* SupabaseRemoteStorage.swift in Sources */ = {isa = PBXBuildFile; fileRef = BE2F26EB2DA54B2A0080F1AE /* SupabaseRemoteStorage.swift */; }; /* End PBXBuildFile section */ /* Begin PBXCopyFilesBuildPhase section */ @@ -106,6 +107,7 @@ B6F421372BC42F450005D0D0 /* core.klib */ = {isa = PBXFileReference; lastKnownFileType = file; path = core.klib; sourceTree = ""; }; B6F4213D2BC42F5B0005D0D0 /* sqlite3.c */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.c; name = sqlite3.c; path = "../powersync-kotlin/core/build/interop/sqlite/sqlite3.c"; sourceTree = ""; }; B6F421402BC430B60005D0D0 /* sqlite3.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = sqlite3.h; path = "../powersync-kotlin/core/build/interop/sqlite/sqlite3.h"; sourceTree = ""; }; + BE2F26EB2DA54B2A0080F1AE /* SupabaseRemoteStorage.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SupabaseRemoteStorage.swift; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -218,6 +220,7 @@ B65C4D6F2C60D58500176007 /* PowerSync */ = { isa = PBXGroup; children = ( + BE2F26EB2DA54B2A0080F1AE /* SupabaseRemoteStorage.swift */, 6A7315BA2B98BDD30004CB17 /* SystemManager.swift */, 6A4AD3842B9EE763005CBFD4 /* SupabaseConnector.swift */, 6ABD78772B9F2D2800558A41 /* Schema.swift */, @@ -564,6 +567,7 @@ B65C4D6D2C60D38B00176007 /* HomeScreen.swift in Sources */, 6A7315882B9854220004CB17 /* PowerSyncExampleApp.swift in Sources */, B666585F2C62115300159A81 /* ListRow.swift in Sources */, + BE2F26EC2DA54B2F0080F1AE /* SupabaseRemoteStorage.swift in Sources */, B66658632C621CA700159A81 /* AddTodoListView.swift in Sources */, B666585D2C620E9E00159A81 /* WifiIcon.swift in Sources */, 6A9669042B9EE6FA00B05DCF /* SignInScreen.swift in Sources */, @@ -705,10 +709,11 @@ CODE_SIGN_STYLE = Automatic; CURRENT_PROJECT_VERSION = 1; DEVELOPMENT_ASSET_PATHS = "\"PowerSyncExample/Preview Content\""; - DEVELOPMENT_TEAM = 6WA62GTJNA; + DEVELOPMENT_TEAM = ZGT7463CVJ; ENABLE_PREVIEWS = YES; ENABLE_USER_SCRIPT_SANDBOXING = NO; GENERATE_INFOPLIST_FILE = YES; + INFOPLIST_KEY_NSCameraUsageDescription = "Take Photos for Todo Completion"; INFOPLIST_KEY_UIApplicationSceneManifest_Generation = YES; INFOPLIST_KEY_UIApplicationSupportsIndirectInputEvents = YES; INFOPLIST_KEY_UILaunchScreen_Generation = YES; @@ -721,7 +726,7 @@ MARKETING_VERSION = 1.0; ONLY_ACTIVE_ARCH = YES; OTHER_LDFLAGS = "$(inherited)"; - PRODUCT_BUNDLE_IDENTIFIER = com.powersync.PowerSyncExample; + PRODUCT_BUNDLE_IDENTIFIER = com.powersync.PowerSyncSwiftExample; PRODUCT_NAME = "$(TARGET_NAME)"; SWIFT_EMIT_LOC_STRINGS = YES; SWIFT_OBJC_BRIDGING_HEADER = "PowerSyncExample/PowerSyncExample-Bridging-Header.h"; @@ -742,10 +747,11 @@ CODE_SIGN_STYLE = Automatic; CURRENT_PROJECT_VERSION = 1; DEVELOPMENT_ASSET_PATHS = "\"PowerSyncExample/Preview Content\""; - DEVELOPMENT_TEAM = 6WA62GTJNA; + DEVELOPMENT_TEAM = ZGT7463CVJ; ENABLE_PREVIEWS = YES; ENABLE_USER_SCRIPT_SANDBOXING = NO; GENERATE_INFOPLIST_FILE = YES; + INFOPLIST_KEY_NSCameraUsageDescription = "Take Photos for Todo Completion"; INFOPLIST_KEY_UIApplicationSceneManifest_Generation = YES; INFOPLIST_KEY_UIApplicationSupportsIndirectInputEvents = YES; INFOPLIST_KEY_UILaunchScreen_Generation = YES; @@ -757,7 +763,7 @@ ); MARKETING_VERSION = 1.0; OTHER_LDFLAGS = "$(inherited)"; - PRODUCT_BUNDLE_IDENTIFIER = com.powersync.PowerSyncExample; + PRODUCT_BUNDLE_IDENTIFIER = com.powersync.PowerSyncSwiftExample; PRODUCT_NAME = "$(TARGET_NAME)"; SWIFT_EMIT_LOC_STRINGS = YES; SWIFT_OBJC_BRIDGING_HEADER = "PowerSyncExample/PowerSyncExample-Bridging-Header.h"; diff --git a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index bdb812a..d2cc323 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -15,8 +15,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-kotlin.git", "state" : { - "revision" : "443df078f4b9352de137000b993d564d4ab019b7", - "version" : "1.0.0-BETA28.0" + "revision" : "633a2924f7893f7ebeb064cbcd9c202937673633", + "version" : "1.0.0-BETA30.0" } }, { diff --git a/Demo/PowerSyncExample.xcodeproj/xcshareddata/xcschemes/PowerSyncExample.xcscheme b/Demo/PowerSyncExample.xcodeproj/xcshareddata/xcschemes/PowerSyncExample.xcscheme index 50e13c1..af00361 100644 --- a/Demo/PowerSyncExample.xcodeproj/xcshareddata/xcschemes/PowerSyncExample.xcscheme +++ b/Demo/PowerSyncExample.xcodeproj/xcshareddata/xcschemes/PowerSyncExample.xcscheme @@ -39,6 +39,7 @@ ignoresPersistentStateOnLaunch = "NO" debugDocumentVersioning = "YES" debugServiceExtension = "internal" + enableGPUValidationMode = "1" allowLocationSimulation = "YES"> diff --git a/Demo/PowerSyncExample/Components/TodoListRow.swift b/Demo/PowerSyncExample/Components/TodoListRow.swift index 2bbf184..f3c3be7 100644 --- a/Demo/PowerSyncExample/Components/TodoListRow.swift +++ b/Demo/PowerSyncExample/Components/TodoListRow.swift @@ -1,36 +1,116 @@ import SwiftUI struct TodoListRow: View { - let todo: Todo - let completeTapped: () -> Void - - var body: some View { - HStack { - Text(todo.description) - Spacer() - Button { - completeTapped() - } label: { - Image(systemName: todo.isComplete ? "checkmark.circle.fill" : "circle") - } - .buttonStyle(.plain) + let todo: Todo + let isCameraAvailable: Bool + let completeTapped: () -> Void + let deletePhotoTapped: () -> Void + let capturePhotoTapped: () -> Void + let selectPhotoTapped: () -> Void + + @State private var image: UIImage? = nil + + var body: some View { + HStack { + Text(todo.description) + Group { + if let image = image { + Image(uiImage: image) + .resizable() + .scaledToFit() + + } else if todo.photoUri != nil { + // Show progress while loading the image + ProgressView() + .onAppear { + Task { + await loadImage() + } + } + } else if todo.photoId != nil { + // Show progres, wait for a URI to be present + ProgressView() + } else { + EmptyView() + } + } + Spacer() + VStack { + if todo.photoId == nil { + HStack { + if isCameraAvailable { + Button { + capturePhotoTapped() + } label: { + Image(systemName: "camera.fill") + } + .buttonStyle(.plain) + } + Button { + selectPhotoTapped() + } label: { + Image(systemName: "photo.on.rectangle") + } + .buttonStyle(.plain) + } + } else { + Button { + deletePhotoTapped() + } label: { + Image(systemName: "trash.fill") + } + .buttonStyle(.plain) + } + Spacer() + Button { + completeTapped() + } label: { + Image(systemName: todo.isComplete ? "checkmark.circle.fill" : "circle") + } + .buttonStyle(.plain) + }.onChange(of: todo.photoId) { _, newPhotoId in + if newPhotoId == nil { + // Clear the image when photoId becomes nil + image = nil + } + } + } } - } -} + private func loadImage() async { + guard let urlString = todo.photoUri else { return } + let url = URL(fileURLWithPath: urlString) + + do { + let data = try Data(contentsOf: url) + if let loadedImage = UIImage(data: data) { + image = loadedImage + } else { + print("Failed to decode image from data.") + } + } catch { + print("Error loading image from disk:", error) + } + } +} #Preview { TodoListRow( - todo: .init( - id: UUID().uuidString.lowercased(), - listId: UUID().uuidString.lowercased(), - photoId: nil, - description: "description", - isComplete: false, - createdAt: "", - completedAt: nil, - createdBy: UUID().uuidString.lowercased(), - completedBy: nil - ) + todo: .init( + id: UUID().uuidString.lowercased(), + listId: UUID().uuidString.lowercased(), + photoId: nil, + description: "description", + isComplete: false, + createdAt: "", + completedAt: nil, + createdBy: UUID().uuidString.lowercased(), + completedBy: nil, + + ), + isCameraAvailable: true, + completeTapped: {}, + deletePhotoTapped: {}, + capturePhotoTapped: {} ) {} } diff --git a/Demo/PowerSyncExample/Components/TodoListView.swift b/Demo/PowerSyncExample/Components/TodoListView.swift index b006cd0..ea7d4f7 100644 --- a/Demo/PowerSyncExample/Components/TodoListView.swift +++ b/Demo/PowerSyncExample/Components/TodoListView.swift @@ -1,5 +1,6 @@ -import SwiftUI +import AVFoundation import IdentifiedCollections +import SwiftUI import SwiftUINavigation struct TodoListView: View { @@ -11,6 +12,12 @@ struct TodoListView: View { @State private var newTodo: NewTodo? @State private var editing: Bool = false + // Called when a photo has been captured. Individual widgets should register the listener + @State private var onMediaSelect: ((_: Data) async throws -> Void)? + @State private var pickMediaType: UIImagePickerController.SourceType = .camera + @State private var showMediaPicker = false + @State private var isCameraAvailable: Bool = false + var body: some View { List { if let error { @@ -18,7 +25,7 @@ struct TodoListView: View { } IfLet($newTodo) { $newTodo in - AddTodoListView(newTodo: $newTodo, listId: listId) { result in + AddTodoListView(newTodo: $newTodo, listId: listId) { _ in withAnimation { self.newTodo = nil } @@ -26,23 +33,64 @@ struct TodoListView: View { } ForEach(todos) { todo in - TodoListRow(todo: todo) { - Task { - try await toggleCompletion(of: todo) + TodoListRow( + todo: todo, + isCameraAvailable: isCameraAvailable, + completeTapped: { + Task { + await toggleCompletion(of: todo) + } + }, + deletePhotoTapped: { + guard let attachments = system.attachments, + let attachmentID = todo.photoId + else { + return + } + Task { + do { + try await attachments.deleteFile(attachmentId: attachmentID) { tx, _ in + _ = try tx.execute(sql: "UPDATE \(TODOS_TABLE) SET photo_id = NULL WHERE id = ?", parameters: [todo.id]) + } + } catch { + self.error = error + } + } + + }, + capturePhotoTapped: { + registerMediaCallback(todo: todo) + pickMediaType = .camera + showMediaPicker = true } + ) { + registerMediaCallback(todo: todo) + pickMediaType = .photoLibrary + showMediaPicker = true } } .onDelete { indexSet in Task { - await delete(at: indexSet) + let selectedItems = indexSet.compactMap { index in + todos.indices.contains(index) ? todos[index] : nil + } + for try todo in selectedItems { + await delete(todo: todo) + } } } } + .sheet(isPresented: $showMediaPicker) { + CameraView( + onMediaSelect: $onMediaSelect, + mediaType: $pickMediaType + ) + } .animation(.default, value: todos) .navigationTitle("Todos") .toolbar { ToolbarItem(placement: .primaryAction) { - if (newTodo == nil) { + if newTodo == nil { Button { withAnimation { newTodo = .init( @@ -63,6 +111,9 @@ struct TodoListView: View { } } } + .onAppear { + checkCameraAvailability() + } .task { await system.watchTodos(listId) { tds in withAnimation { @@ -83,17 +134,53 @@ struct TodoListView: View { } } - func delete(at offset: IndexSet) async { + func delete(todo: Todo) async { do { error = nil - let todosToDelete = offset.map { todos[$0] } - - try await system.deleteTodo(id: todosToDelete[0].id) + try await system.deleteTodo(todo: todo) } catch { self.error = error } } + + /// Registers a callback which saves a photo for the specified Todo item if media is sucessfully loaded. + func registerMediaCallback(todo: Todo) { + // Register a callback for successful image capture + onMediaSelect = { (_ fileData: Data) in + guard let attachments = system.attachments + else { + return + } + + do { + try await attachments.saveFile( + data: fileData, + mediaType: "image/jpeg", + fileExtension: "jpg" + ) { tx, record in + _ = try tx.execute( + sql: "UPDATE \(TODOS_TABLE) SET photo_id = ? WHERE id = ?", + parameters: [record.id, todo.id] + ) + } + } catch { + self.error = error + } + } + } + + private func checkCameraAvailability() { + // https://developer.apple.com/forums/thread/748448 + // On MacOS MetalAPI validation needs to be disabled + +#if targetEnvironment(simulator) + // Camera does not work on the simulator + isCameraAvailable = false +#else + isCameraAvailable = UIImagePickerController.isSourceTypeAvailable(.camera) +#endif + } } #Preview { @@ -103,3 +190,57 @@ struct TodoListView: View { ).environment(SystemManager()) } } + +struct CameraView: UIViewControllerRepresentable { + @Binding var onMediaSelect: ((_: Data) async throws -> Void)? + @Binding var mediaType: UIImagePickerController.SourceType + + @Environment(\.presentationMode) var presentationMode + + func makeUIViewController(context: Context) -> UIImagePickerController { + let picker = UIImagePickerController() + picker.delegate = context.coordinator + picker.sourceType = mediaType + return picker + } + + func updateUIViewController(_: UIImagePickerController, context _: Context) {} + + func makeCoordinator() -> Coordinator { + Coordinator(self) + } + + class Coordinator: NSObject, UINavigationControllerDelegate, UIImagePickerControllerDelegate { + let parent: CameraView + + init(_ parent: CameraView) { + self.parent = parent + } + + func imagePickerController(_: UIImagePickerController, didFinishPickingMediaWithInfo info: [UIImagePickerController.InfoKey: Any]) { + if let image = info[.originalImage] as? UIImage { + // Convert UIImage to Data + if let jpegData = image.jpegData(compressionQuality: 0.8) { + if let photoCapture = parent.onMediaSelect { + Task { + do { + try await photoCapture(jpegData) + } catch { + // The photoCapture method should handle errors + print("Error saving photo: \(error)") + } + } + } + parent.onMediaSelect = nil + } + } + + parent.presentationMode.wrappedValue.dismiss() + } + + func imagePickerControllerDidCancel(_: UIImagePickerController) { + parent.presentationMode.wrappedValue.dismiss() + parent.onMediaSelect = nil + } + } +} diff --git a/Demo/PowerSyncExample/PowerSync/Schema.swift b/Demo/PowerSyncExample/PowerSync/Schema.swift index 7355a31..1865de4 100644 --- a/Demo/PowerSyncExample/PowerSync/Schema.swift +++ b/Demo/PowerSyncExample/PowerSync/Schema.swift @@ -36,4 +36,8 @@ let todos = Table( ] ) -let AppSchema = Schema(lists, todos) +let AppSchema = Schema( + lists, + todos, + createAttachmentTable(name: "attachments") +) diff --git a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift index fe5b184..9c3ef46 100644 --- a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift +++ b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift @@ -1,10 +1,10 @@ +import AnyCodable import Auth -import SwiftUI -import Supabase import PowerSync -import AnyCodable +import Supabase +import SwiftUI -private struct PostgresFatalCodes { +private enum PostgresFatalCodes { /// Postgres Response codes that we cannot recover from by retrying. static let fatalResponseCodes: [String] = [ // Class 22 — Data Exception @@ -14,7 +14,7 @@ private struct PostgresFatalCodes { // Examples include NOT NULL, FOREIGN KEY and UNIQUE violations. "23...", // INSUFFICIENT PRIVILEGE - typically a row-level security violation - "42501" + "42501", ] static func isFatalError(_ code: String) -> Bool { @@ -27,20 +27,20 @@ private struct PostgresFatalCodes { // Look for code: Optional("XXXXX") pattern let errorString = String(describing: error) if let range = errorString.range(of: "code: Optional\\(\"([^\"]+)\"\\)", options: .regularExpression), - let codeRange = errorString[range].range(of: "\"([^\"]+)\"", options: .regularExpression) { - // Extract just the code from within the quotes - let code = errorString[codeRange].dropFirst().dropLast() - return String(code) - } + let codeRange = errorString[range].range(of: "\"([^\"]+)\"", options: .regularExpression) + { + // Extract just the code from within the quotes + let code = errorString[codeRange].dropFirst().dropLast() + return String(code) + } return nil } } - @Observable class SupabaseConnector: PowerSyncBackendConnector { let powerSyncEndpoint: String = Secrets.powerSyncEndpoint - let client: SupabaseClient = SupabaseClient(supabaseURL: Secrets.supabaseURL, supabaseKey: Secrets.supabaseAnonKey) + let client: SupabaseClient = .init(supabaseURL: Secrets.supabaseURL, supabaseKey: Secrets.supabaseAnonKey) var session: Session? private var errorCode: String? @@ -68,20 +68,27 @@ class SupabaseConnector: PowerSyncBackendConnector { return id.uuidString.lowercased() } + func getStorageBucket() -> StorageFileApi? { + guard let bucket = Secrets.supabaseStorageBucket else { + return nil + } + + return client.storage.from(bucket) + } + override func fetchCredentials() async throws -> PowerSyncCredentials? { session = try await client.auth.session - if (self.session == nil) { + if session == nil { throw AuthError.sessionMissing } let token = session!.accessToken - return PowerSyncCredentials(endpoint: self.powerSyncEndpoint, token: token) + return PowerSyncCredentials(endpoint: powerSyncEndpoint, token: token) } override func uploadData(database: PowerSyncDatabaseProtocol) async throws { - guard let transaction = try await database.getNextCrudTransaction() else { return } var lastEntry: CrudEntry? @@ -96,13 +103,13 @@ class SupabaseConnector: PowerSyncBackendConnector { case .put: var data: [String: AnyCodable] = entry.opData?.mapValues { AnyCodable($0) } ?? [:] data["id"] = AnyCodable(entry.id) - try await table.upsert(data).execute(); + try await table.upsert(data).execute() case .patch: guard let opData = entry.opData else { continue } let encodableData = opData.mapValues { AnyCodable($0) } try await table.update(encodableData).eq("id", value: entry.id).execute() case .delete: - try await table.delete().eq( "id", value: entry.id).execute() + try await table.delete().eq("id", value: entry.id).execute() } } @@ -110,18 +117,19 @@ class SupabaseConnector: PowerSyncBackendConnector { } catch { if let errorCode = PostgresFatalCodes.extractErrorCode(from: error), - PostgresFatalCodes.isFatalError(errorCode) { - /// Instead of blocking the queue with these errors, - /// discard the (rest of the) transaction. - /// - /// Note that these errors typically indicate a bug in the application. - /// If protecting against data loss is important, save the failing records - /// elsewhere instead of discarding, and/or notify the user. - print("Data upload error: \(error)") - print("Discarding entry: \(lastEntry!)") - _ = try await transaction.complete.invoke(p1: nil) - return - } + PostgresFatalCodes.isFatalError(errorCode) + { + /// Instead of blocking the queue with these errors, + /// discard the (rest of the) transaction. + /// + /// Note that these errors typically indicate a bug in the application. + /// If protecting against data loss is important, save the failing records + /// elsewhere instead of discarding, and/or notify the user. + print("Data upload error: \(error)") + print("Discarding entry: \(lastEntry!)") + _ = try await transaction.complete.invoke(p1: nil) + return + } print("Data upload error - retrying last entry: \(lastEntry!), \(error)") throw error diff --git a/Demo/PowerSyncExample/PowerSync/SupabaseRemoteStorage.swift b/Demo/PowerSyncExample/PowerSync/SupabaseRemoteStorage.swift new file mode 100644 index 0000000..a79ca95 --- /dev/null +++ b/Demo/PowerSyncExample/PowerSync/SupabaseRemoteStorage.swift @@ -0,0 +1,23 @@ +import Foundation +import PowerSync +import Supabase + +class SupabaseRemoteStorage: RemoteStorageAdapter { + let storage: Supabase.StorageFileApi + + init(storage: Supabase.StorageFileApi) { + self.storage = storage + } + + func uploadFile(fileData: Data, attachment: PowerSync.Attachment) async throws { + try await storage.upload(attachment.filename, data: fileData) + } + + func downloadFile(attachment: PowerSync.Attachment) async throws -> Data { + try await storage.download(path: attachment.filename) + } + + func deleteFile(attachment: PowerSync.Attachment) async throws { + _ = try await storage.remove(paths: [attachment.filename]) + } +} diff --git a/Demo/PowerSyncExample/PowerSync/SystemManager.swift b/Demo/PowerSyncExample/PowerSync/SystemManager.swift index cd0aa0b..1c0e693 100644 --- a/Demo/PowerSyncExample/PowerSync/SystemManager.swift +++ b/Demo/PowerSyncExample/PowerSync/SystemManager.swift @@ -1,26 +1,83 @@ import Foundation import PowerSync +func getAttachmentsDirectoryPath() throws -> String { + guard let documentsURL = FileManager.default.urls( + for: .documentDirectory, + in: .userDomainMask + ).first else { + throw PowerSyncAttachmentError.invalidPath("Could not determine attachments directory path") + } + return documentsURL.appendingPathComponent("attachments").path +} + +let logTag = "SystemManager" + @Observable class SystemManager { let connector = SupabaseConnector() let schema = AppSchema - var db: PowerSyncDatabaseProtocol! + let db: PowerSyncDatabaseProtocol + + var attachments: AttachmentQueue? + + init() { + db = PowerSyncDatabase( + schema: schema, + dbFilename: "powersync-swift.sqlite" + ) + attachments = Self.createAttachmentQueue( + db: db, + connector: connector + ) + } - // openDb must be called before connect - func openDb() { - db = PowerSyncDatabase(schema: schema, dbFilename: "powersync-swift.sqlite") + /// Creates an AttachmentQueue if a Supabase Storage bucket has been specified in the config + private static func createAttachmentQueue( + db: PowerSyncDatabaseProtocol, + connector: SupabaseConnector + ) -> AttachmentQueue? { + guard let bucket = connector.getStorageBucket() else { + db.logger.info("No Supabase Storage bucket specified. Skipping attachment queue setup.", tag: logTag) + return nil + } + + do { + let attachmentsDir = try getAttachmentsDirectoryPath() + + return AttachmentQueue( + db: db, + remoteStorage: SupabaseRemoteStorage(storage: bucket), + attachmentsDirectory: attachmentsDir, + watchAttachments: { try db.watch( + options: WatchOptions( + sql: "SELECT photo_id FROM \(TODOS_TABLE) WHERE photo_id IS NOT NULL", + parameters: [], + mapper: { cursor in + try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) + } + ) + ) } + ) + } catch { + db.logger.error("Failed to initialize attachments queue: \(error)", tag: logTag) + return nil + } } func connect() async { do { try await db.connect(connector: connector) + try await attachments?.startSync() } catch { print("Unexpected error: \(error.localizedDescription)") // Catches any other error } } - func version() async -> String { + func version() async -> String { do { return try await db.getPowerSyncVersion() } catch { @@ -28,14 +85,16 @@ class SystemManager { } } - func signOut() async throws -> Void { + func signOut() async throws { try await db.disconnectAndClear() try await connector.client.auth.signOut() + try await attachments?.stopSyncing() + try await attachments?.clearQueue() } - func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async { + func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void) async { do { - for try await lists in try self.db.watch( + for try await lists in try db.watch( options: WatchOptions( sql: "SELECT * FROM \(LISTS_TABLE)", mapper: { cursor in @@ -56,30 +115,57 @@ class SystemManager { } func insertList(_ list: NewListContent) async throws { - let result = try await self.db.execute( + _ = try await db.execute( sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)", parameters: [list.name, connector.currentUserID] ) } func deleteList(id: String) async throws { - _ = try await db.writeTransaction(callback: { transaction in + let attachmentIds = try await db.writeTransaction(callback: { transaction in + let attachmentIDs = try transaction.getAll( + sql: "SELECT photo_id FROM \(TODOS_TABLE) WHERE list_id = ? AND photo_id IS NOT NULL", + parameters: [id] + ) { cursor in + // FIXME Transactions should allow throwing in the mapper and should use generics correctly + cursor.getString(index: 0) ?? "invalid" // :( + } as? [String] // :( + _ = try transaction.execute( sql: "DELETE FROM \(LISTS_TABLE) WHERE id = ?", parameters: [id] ) + _ = try transaction.execute( sql: "DELETE FROM \(TODOS_TABLE) WHERE list_id = ?", parameters: [id] ) - return + + return attachmentIDs ?? [] // :( }) + + if let attachments { + for id in attachmentIds { + try await attachments.deleteFile( + attachmentId: id + ) { _, _ in } + } + } } - func watchTodos(_ listId: String, _ callback: @escaping (_ todos: [Todo]) -> Void ) async { + func watchTodos(_ listId: String, _ callback: @escaping (_ todos: [Todo]) -> Void) async { do { - for try await todos in try self.db.watch( - sql: "SELECT * FROM \(TODOS_TABLE) WHERE list_id = ?", + for try await todos in try db.watch( + sql: """ + SELECT + t.*, a.local_uri + FROM + \(TODOS_TABLE) t + LEFT JOIN attachments a ON t.photo_id = a.id + WHERE + t.list_id = ? + ORDER BY t.id; + """, parameters: [listId], mapper: { cursor in try Todo( @@ -91,7 +177,8 @@ class SystemManager { createdAt: cursor.getString(name: "created_at"), completedAt: cursor.getStringOptional(name: "completed_at"), createdBy: cursor.getStringOptional(name: "created_by"), - completedBy: cursor.getStringOptional(name: "completed_by") + completedBy: cursor.getStringOptional(name: "completed_by"), + photoUri: cursor.getStringOptional(name: "local_uri") ) } ) { @@ -103,7 +190,7 @@ class SystemManager { } func insertTodo(_ todo: NewTodo, _ listId: String) async throws { - _ = try await self.db.execute( + _ = try await db.execute( sql: "INSERT INTO \(TODOS_TABLE) (id, created_at, created_by, description, list_id, completed) VALUES (uuid(), datetime(), ?, ?, ?, ?)", parameters: [connector.currentUserID, todo.description, listId, todo.isComplete] ) @@ -111,25 +198,43 @@ class SystemManager { func updateTodo(_ todo: Todo) async throws { // Do this to avoid needing to handle date time from Swift to Kotlin - if(todo.isComplete) { - _ = try await self.db.execute( + if todo.isComplete { + _ = try await db.execute( sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = datetime(), completed_by = ? WHERE id = ?", parameters: [todo.description, todo.isComplete, connector.currentUserID, todo.id] ) } else { - _ = try await self.db.execute( + _ = try await db.execute( sql: "UPDATE \(TODOS_TABLE) SET description = ?, completed = ?, completed_at = NULL, completed_by = NULL WHERE id = ?", parameters: [todo.description, todo.isComplete, todo.id] ) } } - func deleteTodo(id: String) async throws { - _ = try await db.writeTransaction(callback: { transaction in - try transaction.execute( - sql: "DELETE FROM \(TODOS_TABLE) WHERE id = ?", - parameters: [id] - ) - }) + func deleteTodo(todo: Todo) async throws { + if let attachments, let photoId = todo.photoId { + try await attachments.deleteFile( + attachmentId: photoId + ) { tx, _ in + try self.deleteTodoInTX( + id: todo.id, + tx: tx + ) + } + } else { + try await db.writeTransaction { tx in + try self.deleteTodoInTX( + id: todo.id, + tx: tx + ) + } + } + } + + private func deleteTodoInTX(id: String, tx: ConnectionContext) throws { + _ = try tx.execute( + sql: "DELETE FROM \(TODOS_TABLE) WHERE id = ?", + parameters: [id] + ) } } diff --git a/Demo/PowerSyncExample/PowerSync/Todos.swift b/Demo/PowerSyncExample/PowerSync/Todos.swift index dd53e31..fce55f3 100644 --- a/Demo/PowerSyncExample/PowerSync/Todos.swift +++ b/Demo/PowerSyncExample/PowerSync/Todos.swift @@ -11,7 +11,8 @@ struct Todo: Identifiable, Hashable, Decodable { var completedAt: String? var createdBy: String? var completedBy: String? - + var photoUri: String? + enum CodingKeys: String, CodingKey { case id case listId = "list_id" @@ -22,7 +23,7 @@ struct Todo: Identifiable, Hashable, Decodable { case createdBy = "created_by" case completedBy = "completed_by" case photoId = "photo_id" - + case photoUri = "photo_uri" } } diff --git a/Demo/PowerSyncExample/RootView.swift b/Demo/PowerSyncExample/RootView.swift index 9450aa1..e4a03e1 100644 --- a/Demo/PowerSyncExample/RootView.swift +++ b/Demo/PowerSyncExample/RootView.swift @@ -18,24 +18,18 @@ struct RootView: View { } .navigationDestination(for: Route.self) { route in switch route { - case .home: - HomeScreen() - case .signIn: - SignInScreen() - case .signUp: - SignUpScreen() - } - } - } - .task { - if(system.db == nil) { - system.openDb() + case .home: + HomeScreen() + case .signIn: + SignInScreen() + case .signUp: + SignUpScreen() + } } } .environment(authModel) .environment(navigationModel) } - } #Preview { diff --git a/Demo/PowerSyncExample/_Secrets.swift b/Demo/PowerSyncExample/_Secrets.swift index 1e1b04e..771af3b 100644 --- a/Demo/PowerSyncExample/_Secrets.swift +++ b/Demo/PowerSyncExample/_Secrets.swift @@ -5,4 +5,6 @@ enum Secrets { static let powerSyncEndpoint = "https://your-id.powersync.journeyapps.com" static let supabaseURL = URL(string: "https://your-id.supabase.co")! static let supabaseAnonKey = "anon-key" + // Optional storage bucket name. Set to nil if you don't want to use storage. + static let supabaseStorageBucket = "media" } \ No newline at end of file diff --git a/README.md b/README.md index 682258c..8e938f6 100644 --- a/README.md +++ b/README.md @@ -83,3 +83,7 @@ The PowerSync Swift SDK currently makes use of the [PowerSync Kotlin Multiplatfo ## Migration from Alpha to Beta See these [developer notes](https://docs.powersync.com/client-sdk-references/swift#migrating-from-the-alpha-to-the-beta-sdk) if you are migrating from the alpha to the beta version of the Swift SDK. + +## Attachments + +See the attachments [README](./Sources/PowerSync/attachments/README.md) for more information. diff --git a/Sources/PowerSync/Kotlin/KotlinAdapter.swift b/Sources/PowerSync/Kotlin/KotlinAdapter.swift index 8f864de..0418709 100644 --- a/Sources/PowerSync/Kotlin/KotlinAdapter.swift +++ b/Sources/PowerSync/Kotlin/KotlinAdapter.swift @@ -1,5 +1,6 @@ import PowerSyncKotlin + internal struct KotlinAdapter { struct Index { static func toKotlin(_ index: IndexProtocol) -> PowerSyncKotlin.Index { diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index ea78b55..874d4ca 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -2,6 +2,8 @@ import Foundation import PowerSyncKotlin final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { + let logger: any LoggerProtocol + private let kotlinDatabase: PowerSyncKotlin.PowerSyncDatabase var currentStatus: SyncStatus { kotlinDatabase.currentStatus } @@ -9,19 +11,16 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { init( schema: Schema, dbFilename: String, - logger: DatabaseLogger? = nil + logger: DatabaseLogger ) { let factory = PowerSyncKotlin.DatabaseDriverFactory() kotlinDatabase = PowerSyncDatabase( factory: factory, schema: KotlinAdapter.Schema.toKotlin(schema), dbFilename: dbFilename, - logger: logger?.kLogger + logger: logger.kLogger ) - } - - init(kotlinDatabase: KotlinPowerSyncDatabase) { - self.kotlinDatabase = kotlinDatabase + self.logger = logger } func waitForFirstSync() async throws { @@ -42,7 +41,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { retryDelayMs: Int64 = 5000, params: [String: JsonParam?] = [:] ) async throws { - let connectorAdapter = PowerSyncBackendConnectorAdapter(swiftBackendConnector: connector) + let connectorAdapter = PowerSyncBackendConnectorAdapter( + swiftBackendConnector: connector, + db: self + ) try await kotlinDatabase.connect( connector: connectorAdapter, @@ -186,7 +188,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { options: WatchOptions ) throws -> AsyncThrowingStream<[RowType], Error> { AsyncThrowingStream { continuation in - Task { + // Create an outer task to monitor cancellation + let task = Task { do { var mapperError: Error? // HACK! @@ -196,34 +199,51 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { // This attempts to EXPLAIN the query before passing it to Kotlin // We could introduce an onChange API in Kotlin which we use to implement watches here. // This would prevent most issues with exceptions. + // EXPLAIN statement to prevent crashes in SKIEE _ = try await self.kotlinDatabase.getAll( sql: "EXPLAIN \(options.sql)", parameters: options.parameters, mapper: { _ in "" } ) + + // Watching for changes in the database for try await values in try self.kotlinDatabase.watch( sql: options.sql, parameters: options.parameters, throttleMs: KotlinLong(value: options.throttleMs), - mapper: { cursor in do { - return try options.mapper(cursor) - } catch { - mapperError = error - // The value here does not matter. We will throw the exception later - // This is not ideal, this is only a workaround until we expose fine grained access to Kotlin SDK internals. - return nil as RowType? - } } + mapper: { cursor in + do { + return try options.mapper(cursor) + } catch { + mapperError = error + return () + } + } ) { + // Check if the outer task is cancelled + try Task.checkCancellation() // This checks if the calling task was cancelled + if mapperError != nil { throw mapperError! } + try continuation.yield(safeCast(values, to: [RowType].self)) } + continuation.finish() } catch { - continuation.finish(throwing: error) + if error is CancellationError { + continuation.finish() + } else { + continuation.finish(throwing: error) + } } } + + // Propagate cancellation from the outer task to the inner task + continuation.onTermination = { @Sendable _ in + task.cancel() // This cancels the inner task when the stream is terminated + } } } @@ -234,8 +254,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { func readTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R { return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self) } - - func close() async throws{ + + func close() async throws { try await kotlinDatabase.close() } } diff --git a/Sources/PowerSync/Kotlin/KotlinTypes.swift b/Sources/PowerSync/Kotlin/KotlinTypes.swift index ffb47a9..23c361f 100644 --- a/Sources/PowerSync/Kotlin/KotlinTypes.swift +++ b/Sources/PowerSync/Kotlin/KotlinTypes.swift @@ -9,3 +9,5 @@ public typealias JsonParam = PowerSyncKotlin.JsonParam public typealias CrudTransaction = PowerSyncKotlin.CrudTransaction typealias KotlinPowerSyncCredentials = PowerSyncKotlin.PowerSyncCredentials typealias KotlinPowerSyncDatabase = PowerSyncKotlin.PowerSyncDatabase +public typealias Transaction = PowerSyncKotlin.PowerSyncTransaction +public typealias ConnectionContext = PowerSyncKotlin.ConnectionContext diff --git a/Sources/PowerSync/PowerSyncBackendConnectorAdapter.swift b/Sources/PowerSync/PowerSyncBackendConnectorAdapter.swift index 199cb06..b41c2b3 100644 --- a/Sources/PowerSync/PowerSyncBackendConnectorAdapter.swift +++ b/Sources/PowerSync/PowerSyncBackendConnectorAdapter.swift @@ -1,12 +1,16 @@ import OSLog -class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { +internal class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { let swiftBackendConnector: PowerSyncBackendConnector - + let db: any PowerSyncDatabaseProtocol + let logTag = "PowerSyncBackendConnector" + init( - swiftBackendConnector: PowerSyncBackendConnector + swiftBackendConnector: PowerSyncBackendConnector, + db: any PowerSyncDatabaseProtocol ) { self.swiftBackendConnector = swiftBackendConnector + self.db = db } override func __fetchCredentials() async throws -> KotlinPowerSyncCredentials? { @@ -14,25 +18,17 @@ class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { let result = try await swiftBackendConnector.fetchCredentials() return result?.kotlinCredentials } catch { - if #available(iOS 14.0, macOS 11.0, *) { - Logger().error("🔴 Failed to fetch credentials: \(error.localizedDescription)") - } else { - print("🔴 Failed to fetch credentials: \(error.localizedDescription)") - } + db.logger.error("Error while fetching credentials", tag: logTag) return nil } } override func __uploadData(database: KotlinPowerSyncDatabase) async throws { - let swiftDatabase = KotlinPowerSyncDatabaseImpl(kotlinDatabase: database) do { - return try await swiftBackendConnector.uploadData(database: swiftDatabase) + // Pass the Swift DB protocal to the connector + return try await swiftBackendConnector.uploadData(database: db) } catch { - if #available(iOS 14.0, macOS 11.0, *) { - Logger().error("🔴 Failed to upload data: \(error)") - } else { - print("🔴 Failed to upload data: \(error)") - } + db.logger.error("Error while uploading data: \(error)", tag: logTag) } } } diff --git a/Sources/PowerSync/PowerSyncDatabaseProtocol.swift b/Sources/PowerSync/PowerSyncDatabaseProtocol.swift index 9de5f00..8fa6a0e 100644 --- a/Sources/PowerSync/PowerSyncDatabaseProtocol.swift +++ b/Sources/PowerSync/PowerSyncDatabaseProtocol.swift @@ -11,6 +11,9 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// The current sync status. var currentStatus: SyncStatus { get } + /// Logger used for PowerSync operations + var logger: any LoggerProtocol { get } + /// Wait for the first sync to occur func waitForFirstSync() async throws diff --git a/Sources/PowerSync/QueriesProtocol.swift b/Sources/PowerSync/QueriesProtocol.swift index 0f1bbab..755707f 100644 --- a/Sources/PowerSync/QueriesProtocol.swift +++ b/Sources/PowerSync/QueriesProtocol.swift @@ -1,6 +1,5 @@ import Combine import Foundation -import PowerSyncKotlin public let DEFAULT_WATCH_THROTTLE_MS = Int64(30) @@ -90,10 +89,10 @@ public protocol Queries { ) throws -> AsyncThrowingStream<[RowType], Error> /// Execute a write transaction with the given callback - func writeTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R + func writeTransaction(callback: @escaping (any Transaction) throws -> R) async throws -> R /// Execute a read transaction with the given callback - func readTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R + func readTransaction(callback: @escaping (any Transaction) throws -> R) async throws -> R } public extension Queries { diff --git a/Sources/PowerSync/attachments/Attachment.swift b/Sources/PowerSync/attachments/Attachment.swift new file mode 100644 index 0000000..6076015 --- /dev/null +++ b/Sources/PowerSync/attachments/Attachment.swift @@ -0,0 +1,132 @@ +/// Enum representing the state of an attachment +public enum AttachmentState: Int { + /// The attachment has been queued for download from the cloud storage + case queuedDownload + /// The attachment has been queued for upload to the cloud storage + case queuedUpload + /// The attachment has been queued for delete in the cloud storage (and locally) + case queuedDelete + /// The attachment has been synced + case synced + /// The attachment has been orphaned, i.e., the associated record has been deleted + case archived + + enum AttachmentStateError: Error { + case invalidState(Int) + } + + static func from(_ rawValue: Int) throws -> AttachmentState { + guard let state = AttachmentState(rawValue: rawValue) else { + throw AttachmentStateError.invalidState(rawValue) + } + return state + } +} + +/// Struct representing an attachment +public struct Attachment { + /// Unique identifier for the attachment + public let id: String + + /// Timestamp for the last record update + public let timestamp: Int + + /// Attachment filename, e.g. `[id].jpg` + public let filename: String + + /// Current attachment state + public let state: AttachmentState + + /// Local URI pointing to the attachment file + public let localUri: String? + + /// Attachment media type (usually a MIME type) + public let mediaType: String? + + /// Attachment byte size + public let size: Int64? + + /// Specifies if the attachment has been synced locally before. + /// This is particularly useful for restoring archived attachments in edge cases. + public let hasSynced: Int? + + /// Extra attachment metadata + public let metaData: String? + + /// Initializes a new `Attachment` instance + public init( + id: String, + filename: String, + state: AttachmentState, + timestamp: Int = 0, + hasSynced: Int? = 0, + localUri: String? = nil, + mediaType: String? = nil, + size: Int64? = nil, + metaData: String? = nil + ) { + self.id = id + self.timestamp = timestamp + self.filename = filename + self.state = state + self.localUri = localUri + self.mediaType = mediaType + self.size = size + self.hasSynced = hasSynced + self.metaData = metaData + } + + /// Returns a new `Attachment` instance with the option to override specific fields. + /// + /// - Parameters: + /// - filename: Optional new filename. + /// - state: Optional new state. + /// - timestamp: Optional new timestamp. + /// - hasSynced: Optional new `hasSynced` flag. + /// - localUri: Optional new local URI. + /// - mediaType: Optional new media type. + /// - size: Optional new size. + /// - metaData: Optional new metadata. + /// - Returns: A new `Attachment` with updated values. + func with( + filename _: String? = nil, + state: AttachmentState? = nil, + timestamp _: Int = 0, + hasSynced: Int?? = 0, + localUri: String?? = .none, + mediaType: String?? = .none, + size: Int64?? = .none, + metaData: String?? = .none + ) -> Attachment { + return Attachment( + id: id, + filename: filename ?? filename, + state: state.map { $0 } ?? self.state, + timestamp: timestamp > 0 ? timestamp : timestamp, + hasSynced: hasSynced.map { $0 } ?? self.hasSynced, + localUri: localUri.map { $0 } ?? self.localUri, + mediaType: mediaType.map { $0 } ?? self.mediaType, + size: size.map { $0 } ?? self.size, + metaData: metaData.map { $0 } ?? self.metaData + ) + } + + /// Constructs an `Attachment` from a `SqlCursor`. + /// + /// - Parameter cursor: The `SqlCursor` containing the attachment data. + /// - Throws: If required fields are missing or of incorrect type. + /// - Returns: A fully constructed `Attachment` instance. + public static func fromCursor(_ cursor: SqlCursor) throws -> Attachment { + return try Attachment( + id: cursor.getString(name: "id"), + filename: cursor.getString(name: "filename"), + state: AttachmentState.from(cursor.getLong(name: "state")), + timestamp: cursor.getLong(name: "timestamp"), + hasSynced: cursor.getLongOptional(name: "has_synced"), + localUri: cursor.getStringOptional(name: "local_uri"), + mediaType: cursor.getStringOptional(name: "media_type"), + size: cursor.getLongOptional(name: "size")?.int64Value, + metaData: cursor.getStringOptional(name: "meta_data") + ) + } +} diff --git a/Sources/PowerSync/attachments/AttachmentContext.swift b/Sources/PowerSync/attachments/AttachmentContext.swift new file mode 100644 index 0000000..c7f01a6 --- /dev/null +++ b/Sources/PowerSync/attachments/AttachmentContext.swift @@ -0,0 +1,220 @@ +import Foundation + +/// Context which performs actions on the attachment records +public class AttachmentContext { + private let db: any PowerSyncDatabaseProtocol + private let tableName: String + private let logger: any LoggerProtocol + private let logTag = "AttachmentService" + private let maxArchivedCount: Int64 + + /// Table used for storing attachments in the attachment queue. + private var table: String { + return tableName + } + + /// Initializes a new `AttachmentContext`. + public init( + db: PowerSyncDatabaseProtocol, + tableName: String, + logger: any LoggerProtocol, + maxArchivedCount: Int64 + ) { + self.db = db + self.tableName = tableName + self.logger = logger + self.maxArchivedCount = maxArchivedCount + } + + /// Deletes the attachment from the attachment queue. + public func deleteAttachment(id: String) async throws { + _ = try await db.execute( + sql: "DELETE FROM \(table) WHERE id = ?", + parameters: [id] + ) + } + + /// Sets the state of the attachment to ignored (archived). + public func ignoreAttachment(id: String) async throws { + _ = try await db.execute( + sql: "UPDATE \(table) SET state = ? WHERE id = ?", + parameters: [AttachmentState.archived.rawValue, id] + ) + } + + /// Gets the attachment from the attachment queue using an ID. + public func getAttachment(id: String) async throws -> Attachment? { + return try await db.getOptional( + sql: "SELECT * FROM \(table) WHERE id = ?", + parameters: [id] + ) { cursor in + try Attachment.fromCursor(cursor) + } + } + + /// Saves the attachment to the attachment queue. + public func saveAttachment(attachment: Attachment) async throws -> Attachment { + return try await db.writeTransaction { ctx in + try self.upsertAttachment(attachment, context: ctx) + } + } + + /// Saves multiple attachments to the attachment queue. + public func saveAttachments(attachments: [Attachment]) async throws { + if attachments.isEmpty { + return + } + + try await db.writeTransaction { tx in + for attachment in attachments { + _ = try self.upsertAttachment(attachment, context: tx) + } + } + } + + /// Gets all the IDs of attachments in the attachment queue. + public func getAttachmentIds() async throws -> [String] { + return try await db.getAll( + sql: "SELECT id FROM \(table) WHERE id IS NOT NULL", + parameters: [] + ) { cursor in + try cursor.getString(name: "id") + } + } + + /// Gets all attachments in the attachment queue. + public func getAttachments() async throws -> [Attachment] { + return try await db.getAll( + sql: """ + SELECT + * + FROM + \(table) + WHERE + id IS NOT NULL + ORDER BY + timestamp ASC + """, + parameters: [] + ) { cursor in + try Attachment.fromCursor(cursor) + } + } + + /// Gets all active attachments that require an operation to be performed. + public func getActiveAttachments() async throws -> [Attachment] { + return try await db.getAll( + sql: """ + SELECT + * + FROM + \(table) + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + """, + parameters: [ + AttachmentState.queuedUpload.rawValue, + AttachmentState.queuedDownload.rawValue, + AttachmentState.queuedDelete.rawValue, + ] + ) { cursor in + try Attachment.fromCursor(cursor) + } + } + + /// Clears the attachment queue. + /// + /// - Note: Currently only used for testing purposes. + public func clearQueue() async throws { + _ = try await db.execute("DELETE FROM \(table)") + } + + /// Deletes attachments that have been archived. + /// + /// - Parameter callback: A callback invoked with the list of archived attachments before deletion. + /// - Returns: `true` if all items have been deleted, `false` if there may be more archived items remaining. + public func deleteArchivedAttachments(callback: @escaping ([Attachment]) async throws -> Void) async throws -> Bool { + let limit = 1000 + let attachments = try await db.getAll( + sql: """ + SELECT + * + FROM + \(table) + WHERE + state = ? + ORDER BY + timestamp DESC + LIMIT ? OFFSET ? + """, + parameters: [ + AttachmentState.archived.rawValue, + limit, + maxArchivedCount, + ] + ) { cursor in + try Attachment.fromCursor(cursor) + } + + try await callback(attachments) + + let ids = try JSONEncoder().encode(attachments.map { $0.id }) + let idsString = String(data: ids, encoding: .utf8)! + + _ = try await db.execute( + sql: "DELETE FROM \(table) WHERE id IN (SELECT value FROM json_each(?));", + parameters: [idsString] + ) + + return attachments.count < limit + } + + /// Upserts an attachment record synchronously using a database transaction context. + /// + /// - Parameters: + /// - attachment: The attachment to upsert. + /// - context: The database transaction context. + /// - Returns: The original attachment. + public func upsertAttachment( + _ attachment: Attachment, + context: ConnectionContext + ) throws -> Attachment { + let timestamp = Int(Date().timeIntervalSince1970 * 1000) + let updatedRecord = Attachment( + id: attachment.id, + filename: attachment.filename, + state: attachment.state, + timestamp: timestamp, + hasSynced: attachment.hasSynced, + localUri: attachment.localUri, + mediaType: attachment.mediaType, + size: attachment.size + ) + + try context.execute( + sql: """ + INSERT OR REPLACE INTO + \(table) (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + parameters: [ + updatedRecord.id, + updatedRecord.timestamp, + updatedRecord.filename, + updatedRecord.localUri ?? NSNull(), + updatedRecord.mediaType ?? NSNull(), + updatedRecord.size ?? NSNull(), + updatedRecord.state.rawValue, + updatedRecord.hasSynced ?? 0, + updatedRecord.metaData ?? NSNull() + ] + ) + + return attachment + } +} diff --git a/Sources/PowerSync/attachments/AttachmentQueue.swift b/Sources/PowerSync/attachments/AttachmentQueue.swift new file mode 100644 index 0000000..b56164b --- /dev/null +++ b/Sources/PowerSync/attachments/AttachmentQueue.swift @@ -0,0 +1,431 @@ +import Combine +import Foundation + +/// Class used to implement the attachment queue +/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments. +public class AttachmentQueue { + /// Default name of the attachments table + public static let defaultTableName = "attachments" + + let logTag = "AttachmentQueue" + + /// PowerSync database client + public let db: PowerSyncDatabaseProtocol + + /// Remote storage adapter + public let remoteStorage: RemoteStorageAdapter + + /// Directory name for attachments + private let attachmentsDirectory: String + + /// Closure which creates a Stream of ``WatchedAttachmentItem`` + private let watchAttachments: () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error> + + /// Local file system adapter + public let localStorage: LocalStorageAdapter + + /// Attachments table name in SQLite + private let attachmentsQueueTableName: String + + /// Optional sync error handler + private let errorHandler: SyncErrorHandler? + + /// Interval between periodic syncs + private let syncInterval: TimeInterval + + /// Limit on number of archived attachments + private let archivedCacheLimit: Int64 + + /// Duration for throttling sync operations + private let syncThrottleDuration: TimeInterval + + /// Subdirectories to be created in attachments directory + private let subdirectories: [String]? + + /// Whether to allow downloading of attachments + private let downloadAttachments: Bool + + /** + * Logging interface used for all log operations + */ + public let logger: any LoggerProtocol + + /// Attachment service for interacting with attachment records + public let attachmentsService: AttachmentService + + private var syncStatusTask: Task? + private var cancellables = Set() + + /// Indicates whether the queue has been closed + public private(set) var closed: Bool = false + + /// Syncing service instance + private(set) lazy var syncingService: SyncingService = .init( + remoteStorage: self.remoteStorage, + localStorage: self.localStorage, + attachmentsService: self.attachmentsService, + logger: self.logger, + getLocalUri: { [weak self] filename in + guard let self = self else { return filename } + return self.getLocalUri(filename) + }, + errorHandler: self.errorHandler, + syncThrottle: self.syncThrottleDuration + ) + + private let lock: LockActor + + /// Initializes the attachment queue + /// - Parameters match the stored properties + public init( + db: PowerSyncDatabaseProtocol, + remoteStorage: RemoteStorageAdapter, + attachmentsDirectory: String, + watchAttachments: @escaping () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error>, + localStorage: LocalStorageAdapter = FileManagerStorageAdapter(), + attachmentsQueueTableName: String = defaultTableName, + errorHandler: SyncErrorHandler? = nil, + syncInterval: TimeInterval = 30.0, + archivedCacheLimit: Int64 = 100, + syncThrottleDuration: TimeInterval = 1.0, + subdirectories: [String]? = nil, + downloadAttachments: Bool = true, + logger: (any LoggerProtocol)? = nil + ) { + self.db = db + self.remoteStorage = remoteStorage + self.attachmentsDirectory = attachmentsDirectory + self.watchAttachments = watchAttachments + self.localStorage = localStorage + self.attachmentsQueueTableName = attachmentsQueueTableName + self.errorHandler = errorHandler + self.syncInterval = syncInterval + self.archivedCacheLimit = archivedCacheLimit + self.syncThrottleDuration = syncThrottleDuration + self.subdirectories = subdirectories + self.downloadAttachments = downloadAttachments + self.logger = logger ?? db.logger + self.attachmentsService = AttachmentService( + db: db, + tableName: attachmentsQueueTableName, + logger: self.logger, + maxArchivedCount: archivedCacheLimit + ) + self.lock = LockActor() + } + + /// Starts the attachment sync process + public func startSync() async throws { + try await lock.withLock { + try guardClosed() + + // Stop any active syncing before starting new Tasks + try await _stopSyncing() + + // Ensure the directory where attachments are downloaded exists + try await localStorage.makeDir(path: attachmentsDirectory) + + if let subdirectories = subdirectories { + for subdirectory in subdirectories { + let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path + try await localStorage.makeDir(path: path) + } + } + + // Verify initial state + try await attachmentsService.withContext { context in + try await self.verifyAttachments(context: context) + } + + try await syncingService.startSync(period: syncInterval) + + syncStatusTask = Task { + do { + try await withThrowingTaskGroup(of: Void.self) { group in + // Add connectivity monitoring task + group.addTask { + var previousConnected = self.db.currentStatus.connected + for await status in self.db.currentStatus.asFlow() { + try Task.checkCancellation() + if !previousConnected && status.connected { + try await self.syncingService.triggerSync() + } + previousConnected = status.connected + } + } + + // Add attachment watching task + group.addTask { + for try await items in try self.watchAttachments() { + try await self.processWatchedAttachments(items: items) + } + } + + // Wait for any task to complete (which should only happen on cancellation) + try await group.next() + } + } catch { + if !(error is CancellationError) { + logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag) + } + } + } + } + } + + /// Stops active syncing tasks. Syncing can be resumed with ``startSync()`` + public func stopSyncing() async throws { + try await lock.withLock { + try await _stopSyncing() + } + } + + private func _stopSyncing() async throws { + try guardClosed() + + syncStatusTask?.cancel() + // Wait for the task to actually complete + do { + _ = try await syncStatusTask?.value + } catch { + // Task completed with error (likely cancellation) + // This is okay + } + syncStatusTask = nil + + try await syncingService.stopSync() + } + + /// Closes the attachment queue and cancels all sync tasks + public func close() async throws { + try await lock.withLock { + try guardClosed() + + try await _stopSyncing() + try await syncingService.close() + closed = true + } + } + + /// Resolves the filename for a new attachment + /// - Parameters: + /// - attachmentId: Attachment ID + /// - fileExtension: File extension + /// - Returns: Resolved filename + public func resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String? + ) -> String { + return "\(attachmentId).\(fileExtension ?? "attachment")" + } + + /// Processes watched attachment items and updates sync state + /// - Parameter items: List of watched attachment items + public func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws { + // Need to get all the attachments which are tracked in the DB. + // We might need to restore an archived attachment. + try await attachmentsService.withContext { context in + let currentAttachments = try await context.getAttachments() + var attachmentUpdates = [Attachment]() + + for item in items { + let existingQueueItem = currentAttachments.first { $0.id == item.id } + + if existingQueueItem == nil { + if !self.downloadAttachments { + continue + } + // This item should be added to the queue + // This item is assumed to be coming from an upstream sync + // Locally created new items should be persisted using saveFile before + // this point. + let filename = self.resolveNewAttachmentFilename( + attachmentId: item.id, + fileExtension: item.fileExtension + ) + + attachmentUpdates.append( + Attachment( + id: item.id, + filename: filename, + state: AttachmentState.queuedDownload + ) + ) + } else if existingQueueItem!.state == AttachmentState.archived { + // The attachment is present again. Need to queue it for sync. + // We might be able to optimize this in future + if existingQueueItem!.hasSynced == 1 { + // No remote action required, we can restore the record (avoids deletion) + attachmentUpdates.append( + existingQueueItem!.with(state: AttachmentState.synced) + ) + } else { + // The localURI should be set if the record was meant to be downloaded + // and has been synced. If it's missing and hasSynced is false then + // it must be an upload operation + let newState = existingQueueItem!.localUri == nil ? + AttachmentState.queuedDownload : + AttachmentState.queuedUpload + + attachmentUpdates.append( + existingQueueItem!.with(state: newState) + ) + } + } + } + + /** + * Archive any items not specified in the watched items except for items pending delete. + */ + for attachment in currentAttachments { + if attachment.state != AttachmentState.queuedDelete, + items.first(where: { $0.id == attachment.id }) == nil + { + attachmentUpdates.append( + attachment.with(state: AttachmentState.archived) + ) + } + } + + if !attachmentUpdates.isEmpty { + try await context.saveAttachments(attachments: attachmentUpdates) + } + } + } + + /// Saves a new file and schedules it for upload + /// - Parameters: + /// - data: File data + /// - mediaType: MIME type + /// - fileExtension: File extension + /// - updateHook: Hook to assign attachment relationships in the same transaction + /// - Returns: The created attachment + @discardableResult + public func saveFile( + data: Data, + mediaType: String, + fileExtension: String?, + updateHook: @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + let id = try await db.get(sql: "SELECT uuid() as id", parameters: [], mapper: { cursor in + try cursor.getString(name: "id") + }) + + let filename = resolveNewAttachmentFilename(attachmentId: id, fileExtension: fileExtension) + let localUri = getLocalUri(filename) + + // Write the file to the filesystem + let fileSize = try await localStorage.saveFile(filePath: localUri, data: data) + + return try await attachmentsService.withContext { context in + // Start a write transaction. The attachment record and relevant local relationship + // assignment should happen in the same transaction. + try await self.db.writeTransaction { tx in + let attachment = Attachment( + id: id, + filename: filename, + state: AttachmentState.queuedUpload, + localUri: localUri, + mediaType: mediaType, + size: fileSize + ) + + // Allow consumers to set relationships to this attachment id + try updateHook(tx, attachment) + + return try context.upsertAttachment(attachment, context: tx) + } + } + } + + /// Queues a file for deletion + /// - Parameters: + /// - attachmentId: ID of the attachment to delete + /// - updateHook: Hook to perform additional DB updates in the same transaction + @discardableResult + public func deleteFile( + attachmentId: String, + updateHook: @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + try await attachmentsService.withContext { context in + guard let attachment = try await context.getAttachment(id: attachmentId) else { + throw PowerSyncAttachmentError.notFound("Attachment record with id \(attachmentId) was not found.") + } + + let result = try await self.db.writeTransaction { tx in + try updateHook(tx, attachment) + + let updatedAttachment = Attachment( + id: attachment.id, + filename: attachment.filename, + state: AttachmentState.queuedDelete, + hasSynced: attachment.hasSynced, + localUri: attachment.localUri, + mediaType: attachment.mediaType, + size: attachment.size + ) + + return try context.upsertAttachment(updatedAttachment, context: tx) + } + return result + } + } + + /// Returns the local URI where a file is stored based on filename + /// - Parameter filename: The name of the file + /// - Returns: The file path + public func getLocalUri(_ filename: String) -> String { + return URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(filename).path + } + + /// Removes all archived items + public func expireCache() async throws { + try await attachmentsService.withContext { context in + var done = false + repeat { + done = try await self.syncingService.deleteArchivedAttachments(context) + } while !done + } + } + + /// Clears the attachment queue and deletes all attachment files + public func clearQueue() async throws { + try await attachmentsService.withContext { context in + try await context.clearQueue() + // Remove the attachments directory + try await self.localStorage.rmDir(path: self.attachmentsDirectory) + } + } + + /// Verifies attachment records are present in the filesystem + private func verifyAttachments(context: AttachmentContext) async throws { + let attachments = try await context.getAttachments() + var updates: [Attachment] = [] + + for attachment in attachments { + guard let localUri = attachment.localUri else { + continue + } + + let exists = try await localStorage.fileExists(filePath: localUri) + if attachment.state == AttachmentState.synced || + attachment.state == AttachmentState.queuedUpload && + !exists + { + // The file must have been removed from the local storage + updates.append(attachment.with( + state: .archived, + localUri: .some(nil) // Clears the value + )) + } + } + + try await context.saveAttachments(attachments: updates) + } + + private func guardClosed() throws { + if closed { + throw PowerSyncAttachmentError.closed("Attachment queue is closed") + } + } +} diff --git a/Sources/PowerSync/attachments/AttachmentService.swift b/Sources/PowerSync/attachments/AttachmentService.swift new file mode 100644 index 0000000..b5736d4 --- /dev/null +++ b/Sources/PowerSync/attachments/AttachmentService.swift @@ -0,0 +1,65 @@ +import Foundation + +/// Service which manages attachment records. +public class AttachmentService { + private let db: any PowerSyncDatabaseProtocol + private let tableName: String + private let logger: any LoggerProtocol + private let logTag = "AttachmentService" + + private let context: AttachmentContext + private let lock: LockActor + + /// Initializes the attachment service with the specified database, table name, logger, and max archived count. + public init( + db: PowerSyncDatabaseProtocol, + tableName: String, + logger: any LoggerProtocol, + maxArchivedCount: Int64 + ) { + self.db = db + self.tableName = tableName + self.logger = logger + context = AttachmentContext( + db: db, + tableName: tableName, + logger: logger, + maxArchivedCount: maxArchivedCount + ) + lock = LockActor() + } + + /// Watches for changes to the attachments table. + public func watchActiveAttachments() throws -> AsyncThrowingStream<[String], Error> { + logger.info("Watching attachments...", tag: logTag) + + return try db.watch( + sql: """ + SELECT + id + FROM + \(tableName) + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + """, + parameters: [ + AttachmentState.queuedUpload.rawValue, + AttachmentState.queuedDownload.rawValue, + AttachmentState.queuedDelete.rawValue, + ] + ) { cursor in + try cursor.getString(name: "id") + } + } + + /// Executes a callback with exclusive access to the attachment context. + public func withContext(callback: @Sendable @escaping (AttachmentContext) async throws -> R) async throws -> R { + try await lock.withLock { + try await callback(context) + } + } +} diff --git a/Sources/PowerSync/attachments/AttachmentTable.swift b/Sources/PowerSync/attachments/AttachmentTable.swift new file mode 100644 index 0000000..d70c166 --- /dev/null +++ b/Sources/PowerSync/attachments/AttachmentTable.swift @@ -0,0 +1,17 @@ +/// Creates a PowerSync Schema table for attachment state +public func createAttachmentTable(name: String) -> Table { + return Table( + name: name, + columns: [ + .integer("timestamp"), + .integer("state"), + .text("filename"), + .integer("has_synced"), + .text("local_uri"), + .text("media_type"), + .integer("size"), + .text("meta_data"), + ], + localOnly: true + ) +} diff --git a/Sources/PowerSync/attachments/FileManagerLocalStorage.swift b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift new file mode 100644 index 0000000..cc3915e --- /dev/null +++ b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift @@ -0,0 +1,93 @@ +import Foundation + +/** + * Implementation of LocalStorageAdapter using FileManager + */ +public class FileManagerStorageAdapter: LocalStorageAdapter { + private let fileManager = FileManager.default + + public init() {} + + public func saveFile(filePath: String, data: Data) async throws -> Int64 { + return try await Task { + let url = URL(fileURLWithPath: filePath) + + // Make sure the parent directory exists + try fileManager.createDirectory(at: url.deletingLastPathComponent(), + withIntermediateDirectories: true) + + // Write data to file + try data.write(to: url) + + // Return the size of the data + return Int64(data.count) + }.value + } + + public func readFile(filePath: String, mediaType _: String?) async throws -> Data { + return try await Task { + let url = URL(fileURLWithPath: filePath) + + if !fileManager.fileExists(atPath: filePath) { + throw PowerSyncAttachmentError.fileNotFound(filePath) + } + + // Read data from file + do { + return try Data(contentsOf: url) + } catch { + throw PowerSyncAttachmentError.ioError(error) + } + }.value + } + + public func deleteFile(filePath: String) async throws { + try await Task { + if fileManager.fileExists(atPath: filePath) { + try fileManager.removeItem(atPath: filePath) + } + }.value + } + + public func fileExists(filePath: String) async throws -> Bool { + return await Task { + fileManager.fileExists(atPath: filePath) + }.value + } + + public func makeDir(path: String) async throws { + try await Task { + try fileManager.createDirectory(atPath: path, + withIntermediateDirectories: true, + attributes: nil) + }.value + } + + public func rmDir(path: String) async throws { + try await Task { + if fileManager.fileExists(atPath: path) { + try fileManager.removeItem(atPath: path) + } + }.value + } + + public func copyFile(sourcePath: String, targetPath: String) async throws { + try await Task { + if !fileManager.fileExists(atPath: sourcePath) { + throw PowerSyncAttachmentError.fileNotFound(sourcePath) + } + + // Ensure target directory exists + let targetUrl = URL(fileURLWithPath: targetPath) + try fileManager.createDirectory(at: targetUrl.deletingLastPathComponent(), + withIntermediateDirectories: true) + + // If target already exists, remove it first + if fileManager.fileExists(atPath: targetPath) { + try fileManager.removeItem(atPath: targetPath) + } + + try fileManager.copyItem(atPath: sourcePath, toPath: targetPath) + }.value + } +} diff --git a/Sources/PowerSync/attachments/LocalStorage.swift b/Sources/PowerSync/attachments/LocalStorage.swift new file mode 100644 index 0000000..071e522 --- /dev/null +++ b/Sources/PowerSync/attachments/LocalStorage.swift @@ -0,0 +1,97 @@ +import Foundation + +/// Error type for PowerSync operations +public enum PowerSyncAttachmentError: Error { + /// A general error with an associated message + case generalError(String) + + /// Indicates no matching attachment record could be found + case notFound(String) + + /// Indicates that a file was not found at the given path + case fileNotFound(String) + + /// An I/O error occurred + case ioError(Error) + + /// The given file or directory path was invalid + case invalidPath(String) + + /// The attachments queue or sub services have been closed + case closed(String) +} + +/// Protocol defining an adapter interface for local file storage +public protocol LocalStorageAdapter { + /// Saves data to a file at the specified path. + /// + /// - Parameters: + /// - filePath: The full path where the file should be saved. + /// - data: The binary data to save. + /// - Returns: The byte size of the saved file. + /// - Throws: `PowerSyncAttachmentError` if saving fails. + func saveFile( + filePath: String, + data: Data + ) async throws -> Int64 + + /// Reads a file from the specified path. + /// + /// - Parameters: + /// - filePath: The full path to the file. + /// - mediaType: An optional media type (MIME type) to help determine how to handle the file. + /// - Returns: The contents of the file as `Data`. + /// - Throws: `PowerSyncAttachmentError` if reading fails or the file doesn't exist. + func readFile( + filePath: String, + mediaType: String? + ) async throws -> Data + + /// Deletes a file at the specified path. + /// + /// - Parameter filePath: The full path to the file to delete. + /// - Throws: `PowerSyncAttachmentError` if deletion fails or file doesn't exist. + func deleteFile(filePath: String) async throws + + /// Checks if a file exists at the specified path. + /// + /// - Parameter filePath: The path to the file. + /// - Returns: `true` if the file exists, `false` otherwise. + /// - Throws: `PowerSyncAttachmentError` if checking fails. + func fileExists(filePath: String) async throws -> Bool + + /// Creates a directory at the specified path. + /// + /// - Parameter path: The full path to the directory. + /// - Throws: `PowerSyncAttachmentError` if creation fails. + func makeDir(path: String) async throws + + /// Removes a directory at the specified path. + /// + /// - Parameter path: The full path to the directory. + /// - Throws: `PowerSyncAttachmentError` if removal fails. + func rmDir(path: String) async throws + + /// Copies a file from the source path to the target path. + /// + /// - Parameters: + /// - sourcePath: The original file path. + /// - targetPath: The destination file path. + /// - Throws: `PowerSyncAttachmentError` if the copy operation fails. + func copyFile( + sourcePath: String, + targetPath: String + ) async throws +} + +/// Extension providing a default implementation of `readFile` without a media type +public extension LocalStorageAdapter { + /// Reads a file from the specified path without specifying a media type. + /// + /// - Parameter filePath: The full path to the file. + /// - Returns: The contents of the file as `Data`. + /// - Throws: `PowerSyncAttachmentError` if reading fails. + func readFile(filePath: String) async throws -> Data { + return try await readFile(filePath: filePath, mediaType: nil) + } +} diff --git a/Sources/PowerSync/attachments/LockActor.swift b/Sources/PowerSync/attachments/LockActor.swift new file mode 100644 index 0000000..94f41db --- /dev/null +++ b/Sources/PowerSync/attachments/LockActor.swift @@ -0,0 +1,48 @@ +import Foundation + +actor LockActor { + private var isLocked = false + private var waiters: [(id: UUID, continuation: CheckedContinuation)] = [] + + func withLock(_ operation: @Sendable () async throws -> T) async throws -> T { + try await waitUntilUnlocked() + + isLocked = true + defer { unlockNext() } + + try Task.checkCancellation() // cancellation check after acquiring lock + return try await operation() + } + + private func waitUntilUnlocked() async throws { + if !isLocked { return } + + let id = UUID() + + // Use withTaskCancellationHandler to manage cancellation + await withTaskCancellationHandler { + await withCheckedContinuation { continuation in + waiters.append((id: id, continuation: continuation)) + } + } onCancel: { + // Cancellation logic: remove the waiter when cancelled + Task { + await self.removeWaiter(id: id) + } + } + } + + private func removeWaiter(id: UUID) async { + // Safely remove the waiter from the actor's waiters list + waiters.removeAll { $0.id == id } + } + + private func unlockNext() { + if let next = waiters.first { + waiters.removeFirst() + next.continuation.resume() + } else { + isLocked = false + } + } +} diff --git a/Sources/PowerSync/attachments/README.md b/Sources/PowerSync/attachments/README.md new file mode 100644 index 0000000..9d85fca --- /dev/null +++ b/Sources/PowerSync/attachments/README.md @@ -0,0 +1,262 @@ +# PowerSync Attachment Helpers + +A [PowerSync](https://powersync.com) library to manage attachments (such as images or files) in Swift apps. + +### Alpha Release + +Attachment helpers are currently in an alpha state, intended strictly for testing. Expect breaking changes and instability as development continues. + +Do not rely on this package for production use. + +## Usage + +An `AttachmentQueue` is used to manage and sync attachments in your app. The attachments' state is stored in a local-only attachments table. + +### Key Assumptions + +- Each attachment is identified by a unique ID +- Attachments are immutable once created +- Relational data should reference attachments using a foreign key column +- Relational data should reflect the holistic state of attachments at any given time. An existing local attachment will deleted locally if no relational data references it. + +### Example Implementation + +See the [PowerSync Example Demo](../../../Demo/PowerSyncExample) for a basic example of attachment syncing. + +In the example below, the user captures photos when checklist items are completed as part of an inspection workflow. + +1. First, define your schema including the `checklist` table and the local-only attachments table: + +```swift +let checklists = Table( + name: "checklists", + columns: [ + Column.text("description"), + Column.integer("completed"), + Column.text("photo_id"), + ] +) + +let schema = Schema( + tables: [ + checklists, + // Add the local-only table which stores attachment states + // Learn more about this function below + createAttachmentTable(name: "attachments") + ] +) +``` + +2. Create an `AttachmentQueue` instance. This class provides default syncing utilities and implements a default sync strategy. It can be subclassed for custom functionality: + +```swift +func getAttachmentsDirectoryPath() throws -> String { + guard let documentsURL = FileManager.default.urls( + for: .documentDirectory, + in: .userDomainMask + ).first else { + throw PowerSyncAttachmentError.attachmentError("Could not determine attachments directory path") + } + return documentsURL.appendingPathComponent("attachments").path +} + +let queue = AttachmentQueue( + db: db, + attachmentsDirectory: try getAttachmentsDirectoryPath(), + remoteStorage: RemoteStorage(), + watchAttachments: { try db.watch( + options: WatchOptions( + sql: "SELECT photo_id FROM checklists WHERE photo_id IS NOT NULL", + parameters: [], + mapper: { cursor in + try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) + } + ) + ) } +) +``` +- The `attachmentsDirectory` specifies where local attachment files should be stored. `FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.appendingPathComponent("attachments")` is a good choice. +- The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` protocol definition. +- `watchAttachments` is closure which generates a publisher of `WatchedAttachmentItem`. These items represent the attachments that should be present in the application. + +3. Implement a `RemoteStorageAdapter` which interfaces with a remote storage provider. This will be used for downloading, uploading, and deleting attachments. + +```swift +class RemoteStorage: RemoteStorageAdapter { + func uploadFile(data: Data, attachment: Attachment) async throws { + // TODO: Make a request to the backend + } + + func downloadFile(attachment: Attachment) async throws -> Data { + // TODO: Make a request to the backend + } + + func deleteFile(attachment: Attachment) async throws { + // TODO: Make a request to the backend + } +} +``` + +4. Start the sync process: + +```swift +queue.startSync() +``` + +5. Create and save attachments using `saveFile()`. This method will + save the file to the local storage, create an attachment record which queues the file for upload + to the remote storage and allows assigning the newly created attachment ID to a checklist item: + +```swift +try await queue.saveFile( + data: Data(), // The attachment's data + mediaType: "image/jpg", + fileExtension: "jpg" +) { tx, attachment in + // Assign the attachment ID to a checklist item in the same transaction + try tx.execute( + sql: """ + UPDATE + checklists + SET + photo_id = ? + WHERE + id = ? + """, + arguments: [attachment.id, checklistId] + ) +} +``` + +## Implementation Details + +### Attachment Table Structure + +The `createAttachmentsTable` function creates a local-only table for tracking attachment states. + +An attachments table definition can be created with the following options: + +| Option | Description | Default | +|--------|-----------------------|---------------| +| `name` | The name of the table | `attachments` | + +The default columns are: + +| Column Name | Type | Description | +| ------------ | --------- | ------------------------------------------------------------------------------------------------------------------ | +| `id` | `TEXT` | The ID of the attachment record | +| `filename` | `TEXT` | The filename of the attachment | +| `media_type` | `TEXT` | The media type of the attachment | +| `state` | `INTEGER` | The state of the attachment, one of `AttachmentState` enum values | +| `timestamp` | `INTEGER` | The timestamp of the last update to the attachment record | +| `size` | `INTEGER` | The size of the attachment in bytes | +| `has_synced` | `INTEGER` | Internal tracker which tracks if the attachment has ever been synced. This is used for caching/archiving purposes. | +| `meta_data` | `TEXT` | Any extra meta data for the attachment. JSON is usually a good choice. | + +### Attachment States + +Attachments are managed through the following states: + +| State | Description | +| ----------------- | ------------------------------------------------------------------------------ | +| `QUEUED_UPLOAD` | The attachment has been queued for upload to the cloud storage | +| `QUEUED_DELETE` | The attachment has been queued for delete in the cloud storage (and locally) | +| `QUEUED_DOWNLOAD` | The attachment has been queued for download from the cloud storage | +| `SYNCED` | The attachment has been synced | +| `ARCHIVED` | The attachment has been orphaned, i.e., the associated record has been deleted | + +### Sync Process + + +The `AttachmentQueue` implements a sync process with these components: + +1. **State Monitoring**: The queue watches the attachments table for records in `QUEUED_UPLOAD`, `QUEUED_DELETE`, and `QUEUED_DOWNLOAD` states. An event loop triggers calls to the remote storage for these operations. + +2. **Periodic Sync**: By default, the queue triggers a sync every 30 seconds to retry failed uploads/downloads, in particular after the app was offline. This interval can be configured by setting `syncInterval` in the `AttachmentQueue` constructor options, or disabled by setting the interval to `0`. + +3. **Watching State**: The `watchedAttachments` flow in the `AttachmentQueue` constructor is used to maintain consistency between local and remote states: + - New items trigger downloads - see the Download Process below. + - Missing items trigger archiving - see Cache Management below. + +#### Upload Process + +The `saveFile` method handles attachment creation and upload: + +1. The attachment is saved to local storage +2. An `AttachmentRecord` is created with `QUEUED_UPLOAD` state, linked to the local file using `localURI` +3. The attachment must be assigned to relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +4. The `RemoteStorage` `uploadFile` function is called +5. On successful upload, the state changes to `SYNCED` +6. If upload fails, the record stays in `QUEUED_UPLOAD` state for retry + +#### Download Process + +Attachments are scheduled for download when the `watchedAttachments` flow emits a new item that is not present locally: + +1. An `AttachmentRecord` is created with `QUEUED_DOWNLOAD` state +2. The `RemoteStorage` `downloadFile` function is called +3. The received data is saved to local storage +4. On successful download, the state changes to `SYNCED` +5. If download fails, the operation is retried in the next sync cycle + +### Delete Process + +The `deleteFile` method deletes attachments from both local and remote storage: + +1. The attachment record moves to `QUEUED_DELETE` state +2. The attachment must be unassigned from relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +3. On successful deletion, the record is removed +4. If deletion fails, the operation is retried in the next sync cycle + +### Cache Management + +The `AttachmentQueue` implements a caching system for archived attachments: + +1. Local attachments are marked as `ARCHIVED` if the `watchedAttachments` flow no longer references them +2. Archived attachments are kept in the cache for potential future restoration +3. The cache size is controlled by the `archivedCacheLimit` parameter in the `AttachmentQueue` constructor +4. By default, the queue keeps the last 100 archived attachment records +5. When the cache limit is reached, the oldest archived attachments are permanently deleted +6. If an archived attachment is referenced again while still in the cache, it can be restored +7. The cache limit can be configured in the `AttachmentQueue` constructor + +### Error Handling + +1. **Automatic Retries**: + - Failed uploads/downloads/deletes are automatically retried + - The sync interval (default 30 seconds) ensures periodic retry attempts + - Retries continue indefinitely until successful + +2. **Custom Error Handling**: + - A `SyncErrorHandler` can be implemented to customize retry behavior (see example below) + - The handler can decide whether to retry or archive failed operations + - Different handlers can be provided for upload, download, and delete operations + +Example of a custom `SyncErrorHandler`: + +```swift +class ErrorHandler: SyncErrorHandler { + func onDownloadError(attachment: Attachment, error: Error) async -> Bool { + // TODO: Return if the attachment sync should be retried + } + + func onUploadError(attachment: Attachment, error: Error) async -> Bool { + // TODO: Return if the attachment sync should be retried + } + + func onDeleteError(attachment: Attachment, error: Error) async -> Bool { + // TODO: Return if the attachment sync should be retried + } +} + +// Pass the handler to the queue constructor +let queue = AttachmentQueue( + db: db, + attachmentsDirectory: attachmentsDirectory, + remoteStorage: remoteStorage, + errorHandler: ErrorHandler() +) +``` \ No newline at end of file diff --git a/Sources/PowerSync/attachments/RemoteStorage.swift b/Sources/PowerSync/attachments/RemoteStorage.swift new file mode 100644 index 0000000..bd94a42 --- /dev/null +++ b/Sources/PowerSync/attachments/RemoteStorage.swift @@ -0,0 +1,28 @@ +import Foundation + +/// Adapter for interfacing with remote attachment storage. +public protocol RemoteStorageAdapter { + /// Uploads a file to remote storage. + /// + /// - Parameters: + /// - fileData: The binary content of the file to upload. + /// - attachment: The associated `Attachment` metadata describing the file. + /// - Throws: An error if the upload fails. + func uploadFile( + fileData: Data, + attachment: Attachment + ) async throws + + /// Downloads a file from remote storage. + /// + /// - Parameter attachment: The `Attachment` describing the file to download. + /// - Returns: The binary data of the downloaded file. + /// - Throws: An error if the download fails or the file is not found. + func downloadFile(attachment: Attachment) async throws -> Data + + /// Deletes a file from remote storage. + /// + /// - Parameter attachment: The `Attachment` describing the file to delete. + /// - Throws: An error if the deletion fails or the file does not exist. + func deleteFile(attachment: Attachment) async throws +} diff --git a/Sources/PowerSync/attachments/SyncErrorHandler.swift b/Sources/PowerSync/attachments/SyncErrorHandler.swift new file mode 100644 index 0000000..b9a2b55 --- /dev/null +++ b/Sources/PowerSync/attachments/SyncErrorHandler.swift @@ -0,0 +1,64 @@ +import Foundation + +/// Handles attachment operation errors. +/// +/// The handlers defined in this protocol specify whether corresponding attachment +/// operations (download, upload, delete) should be retried upon failure. +/// +/// If an operation fails and should not be retried, the attachment record is archived. +public protocol SyncErrorHandler { + /// Handles a download error for a specific attachment. + /// + /// - Parameters: + /// - attachment: The `Attachment` that failed to download. + /// - error: The error encountered during the download operation. + /// - Returns: `true` if the operation should be retried, `false` if it should be archived. + func onDownloadError( + attachment: Attachment, + error: Error + ) async -> Bool + + /// Handles an upload error for a specific attachment. + /// + /// - Parameters: + /// - attachment: The `Attachment` that failed to upload. + /// - error: The error encountered during the upload operation. + /// - Returns: `true` if the operation should be retried, `false` if it should be archived. + func onUploadError( + attachment: Attachment, + error: Error + ) async -> Bool + + /// Handles a delete error for a specific attachment. + /// + /// - Parameters: + /// - attachment: The `Attachment` that failed to be deleted. + /// - error: The error encountered during the delete operation. + /// - Returns: `true` if the operation should be retried, `false` if it should be archived. + func onDeleteError( + attachment: Attachment, + error: Error + ) async -> Bool +} + +/// Default implementation of `SyncErrorHandler`. +/// +/// By default, all operations return `false`, indicating no retry. +public class DefaultSyncErrorHandler: SyncErrorHandler { + public init() {} + + public func onDownloadError(attachment _: Attachment, error _: Error) async -> Bool { + // Default: do not retry failed downloads + return false + } + + public func onUploadError(attachment _: Attachment, error _: Error) async -> Bool { + // Default: do not retry failed uploads + return false + } + + public func onDeleteError(attachment _: Attachment, error _: Error) async -> Bool { + // Default: do not retry failed deletions + return false + } +} diff --git a/Sources/PowerSync/attachments/SyncingService.swift b/Sources/PowerSync/attachments/SyncingService.swift new file mode 100644 index 0000000..e9ca92f --- /dev/null +++ b/Sources/PowerSync/attachments/SyncingService.swift @@ -0,0 +1,297 @@ +import Combine +import Foundation + +/// A service that synchronizes attachments between local and remote storage. +/// +/// This watches for changes to active attachments and performs queued +/// download, upload, and delete operations. Syncs can be triggered manually, +/// periodically, or based on database changes. +public class SyncingService { + private let remoteStorage: RemoteStorageAdapter + private let localStorage: LocalStorageAdapter + private let attachmentsService: AttachmentService + private let getLocalUri: (String) async -> String + private let errorHandler: SyncErrorHandler? + private let syncThrottle: TimeInterval + private var cancellables = Set() + private let syncTriggerSubject = PassthroughSubject() + private var periodicSyncTimer: Timer? + private var syncTask: Task? + private let lock: LockActor + let logger: any LoggerProtocol + + let logTag = "AttachmentSync" + var closed: Bool + + /// Initializes a new instance of `SyncingService`. + /// + /// - Parameters: + /// - remoteStorage: Adapter for remote storage access. + /// - localStorage: Adapter for local storage access. + /// - attachmentsService: Service for querying and updating attachments. + /// - getLocalUri: Callback used to resolve a local path for saving downloaded attachments. + /// - errorHandler: Optional handler to determine if sync errors should be retried. + /// - syncThrottle: Throttle interval to control frequency of sync triggers. + init( + remoteStorage: RemoteStorageAdapter, + localStorage: LocalStorageAdapter, + attachmentsService: AttachmentService, + logger: any LoggerProtocol, + getLocalUri: @escaping (String) async -> String, + errorHandler: SyncErrorHandler? = nil, + syncThrottle: TimeInterval = 5.0 + ) { + self.remoteStorage = remoteStorage + self.localStorage = localStorage + self.attachmentsService = attachmentsService + self.getLocalUri = getLocalUri + self.errorHandler = errorHandler + self.syncThrottle = syncThrottle + self.logger = logger + self.closed = false + self.lock = LockActor() + } + + /// Starts periodic syncing of attachments. + /// + /// - Parameter period: The time interval in seconds between each sync. + public func startSync(period: TimeInterval) async throws { + try await lock.withLock { + try guardClosed() + + // Close any active sync operations + try await _stopSync() + + setupSyncFlow(period: period) + } + } + + public func stopSync() async throws { + try await lock.withLock { + try guardClosed() + try await _stopSync() + } + } + + private func _stopSync() async throws { + if let timer = periodicSyncTimer { + timer.invalidate() + periodicSyncTimer = nil + } + + syncTask?.cancel() + + // Wait for the task to actually complete + _ = await syncTask?.value + syncTask = nil + + for cancellable in cancellables { + cancellable.cancel() + } + cancellables.removeAll() + } + + /// Cleans up internal resources and cancels any ongoing syncing. + func close() async throws { + try await lock.withLock { + try guardClosed() + + try await _stopSync() + closed = true + } + } + + /// Triggers a sync operation. Can be called manually. + func triggerSync() async throws { + try guardClosed() + syncTriggerSubject.send(()) + } + + /// Deletes attachments marked as archived that exist on local storage. + /// + /// - Returns: `true` if any deletions occurred, `false` otherwise. + func deleteArchivedAttachments(_ context: AttachmentContext) async throws -> Bool { + return try await context.deleteArchivedAttachments { pendingDelete in + for attachment in pendingDelete { + guard let localUri = attachment.localUri else { continue } + if try await !self.localStorage.fileExists(filePath: localUri) { continue } + try await self.localStorage.deleteFile(filePath: localUri) + } + } + } + + private func guardClosed() throws { + if closed { + throw PowerSyncAttachmentError.closed("Syncing service is closed") + } + } + + private func createSyncTrigger() -> AsyncStream { + AsyncStream { continuation in + let cancellable = syncTriggerSubject + .throttle( + for: .seconds(syncThrottle), + scheduler: DispatchQueue.global(), + latest: true + ) + .sink { _ in continuation.yield(()) } + + continuation.onTermination = { _ in + cancellable.cancel() + } + self.cancellables.insert(cancellable) + } + } + + /// Sets up the main attachment syncing pipeline and starts watching for changes. + private func setupSyncFlow(period: TimeInterval) { + syncTask = Task { + do { + try await withThrowingTaskGroup(of: Void.self) { group in + // Handle sync trigger events + group.addTask { + let syncTrigger = self.createSyncTrigger() + + for await _ in syncTrigger { + try Task.checkCancellation() + + try await self.attachmentsService.withContext { context in + let attachments = try await context.getActiveAttachments() + try await self.handleSync(context: context, attachments: attachments) + _ = try await self.deleteArchivedAttachments(context) + } + } + } + + // Watch attachment records. Trigger a sync on change + group.addTask { + for try await _ in try self.attachmentsService.watchActiveAttachments() { + try Task.checkCancellation() + self.syncTriggerSubject.send(()) + } + } + + group.addTask { + let delay = UInt64(period * 1_000_000_000) + while !Task.isCancelled { + try await Task.sleep(nanoseconds: delay) + try await self.triggerSync() + } + } + + // Wait for any task to complete + try await group.next() + } + } catch { + if !(error is CancellationError) { + logger.error("Sync error: \(error)", tag: logTag) + } + } + } + } + + /// Handles syncing for a given list of attachments. + /// + /// This includes queued downloads, uploads, and deletions. + /// + /// - Parameter attachments: The attachments to process. + private func handleSync(context: AttachmentContext, attachments: [Attachment]) async throws { + var updatedAttachments = [Attachment]() + + for attachment in attachments { + switch attachment.state { + case .queuedDownload: + let updated = try await downloadAttachment(attachment: attachment) + updatedAttachments.append(updated) + case .queuedUpload: + let updated = try await uploadAttachment(attachment: attachment) + updatedAttachments.append(updated) + case .queuedDelete: + let updated = try await deleteAttachment(attachment: attachment) + updatedAttachments.append(updated) + default: + break + } + } + + try await context.saveAttachments(attachments: updatedAttachments) + } + + /// Uploads an attachment to remote storage. + /// + /// - Parameter attachment: The attachment to upload. + /// - Returns: The updated attachment with new sync state. + private func uploadAttachment(attachment: Attachment) async throws -> Attachment { + logger.info("Uploading attachment \(attachment.filename)", tag: logTag) + do { + guard let localUri = attachment.localUri else { + throw PowerSyncAttachmentError.generalError("No localUri for attachment \(attachment.id)") + } + + let fileData = try await localStorage.readFile(filePath: localUri) + try await remoteStorage.uploadFile(fileData: fileData, attachment: attachment) + + return attachment.with(state: AttachmentState.synced, hasSynced: 1) + } catch { + if let errorHandler = errorHandler { + let shouldRetry = await errorHandler.onUploadError(attachment: attachment, error: error) + if !shouldRetry { + return attachment.with(state: AttachmentState.archived) + } + } + return attachment + } + } + + /// Downloads an attachment from remote storage and stores it locally. + /// + /// - Parameter attachment: The attachment to download. + /// - Returns: The updated attachment with new sync state. + private func downloadAttachment(attachment: Attachment) async throws -> Attachment { + logger.info("Downloading attachment \(attachment.filename)", tag: logTag) + do { + let attachmentPath = await getLocalUri(attachment.filename) + let fileData = try await remoteStorage.downloadFile(attachment: attachment) + _ = try await localStorage.saveFile(filePath: attachmentPath, data: fileData) + + return attachment.with( + state: AttachmentState.synced, + hasSynced: 1, + localUri: attachmentPath + ) + } catch { + if let errorHandler = errorHandler { + let shouldRetry = await errorHandler.onDownloadError(attachment: attachment, error: error) + if !shouldRetry { + return attachment.with(state: AttachmentState.archived) + } + } + return attachment + } + } + + /// Deletes an attachment from remote and local storage. + /// + /// - Parameter attachment: The attachment to delete. + /// - Returns: The updated attachment with archived state. + private func deleteAttachment(attachment: Attachment) async throws -> Attachment { + logger.info("Deleting attachment \(attachment.filename)", tag: logTag) + do { + try await remoteStorage.deleteFile(attachment: attachment) + + if let localUri = attachment.localUri { + try await localStorage.deleteFile(filePath: localUri) + } + + return attachment.with(state: AttachmentState.archived) + } catch { + if let errorHandler = errorHandler { + let shouldRetry = await errorHandler.onDeleteError(attachment: attachment, error: error) + if !shouldRetry { + return attachment.with(state: AttachmentState.archived) + } + } + return attachment + } + } +} diff --git a/Sources/PowerSync/attachments/WatchedAttachmentItem.swift b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift new file mode 100644 index 0000000..b4cddc7 --- /dev/null +++ b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift @@ -0,0 +1,39 @@ + +import Combine +import Foundation + +/// A watched attachment record item. +/// This is usually returned from watching all relevant attachment IDs. +public struct WatchedAttachmentItem { + /// Id for the attachment record + public let id: String + + /// File extension used to determine an internal filename for storage if no `filename` is provided + public let fileExtension: String? + + /// Filename to store the attachment with + public let filename: String? + + /// Metadata for the attachment (optional) + public let metaData: String? + + /// Initializes a new `WatchedAttachmentItem` + /// - Parameters: + /// - id: Attachment record ID + /// - fileExtension: Optional file extension + /// - filename: Optional filename + /// - metaData: Optional metadata + public init( + id: String, + fileExtension: String? = nil, + filename: String? = nil, + metaData: String? = nil + ) { + self.id = id + self.fileExtension = fileExtension + self.filename = filename + self.metaData = metaData + + precondition(fileExtension != nil || filename != nil, "Either fileExtension or filename must be provided.") + } +} diff --git a/Tests/PowerSyncTests/AttachmentTests.swift b/Tests/PowerSyncTests/AttachmentTests.swift new file mode 100644 index 0000000..cecc6f5 --- /dev/null +++ b/Tests/PowerSyncTests/AttachmentTests.swift @@ -0,0 +1,208 @@ + +@testable import PowerSync +import XCTest + +final class AttachmentTests: XCTestCase { + private var database: PowerSyncDatabaseProtocol! + private var schema: Schema! + + override func setUp() async throws { + try await super.setUp() + schema = Schema(tables: [ + Table(name: "users", columns: [ + .text("name"), + .text("email"), + .text("photo_id") + ]), + createAttachmentTable(name: "attachments") + ]) + + database = PowerSyncDatabase( + schema: schema, + dbFilename: ":memory:" + ) + try await database.disconnectAndClear() + } + + override func tearDown() async throws { + try await database.disconnectAndClear() + database = nil + try await super.tearDown() + } + + func getAttachmentDirectory() -> String { + URL(fileURLWithPath: NSTemporaryDirectory()).appendingPathComponent("attachments").path + } + + func testAttachmentDownload() async throws { + let queue = AttachmentQueue( + db: database, + remoteStorage: { + struct MockRemoteStorage: RemoteStorageAdapter { + func uploadFile( + fileData: Data, + attachment: Attachment + ) async throws {} + + /** + * Download a file from remote storage + */ + func downloadFile(attachment: Attachment) async throws -> Data { + return Data([1, 2, 3]) + } + + /** + * Delete a file from remote storage + */ + func deleteFile(attachment: Attachment) async throws {} + } + + return MockRemoteStorage() + }(), + attachmentsDirectory: getAttachmentDirectory(), + watchAttachments: { try self.database.watch(options: WatchOptions( + sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", + mapper: { cursor in try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) } + )) } + ) + + try await queue.startSync() + + // Create a user which has a photo_id associated. + // This will be treated as a download since no attachment record was created. + // saveFile creates the attachment record before the updates are made. + _ = try await database.execute( + sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'steven', 'steven@example.com', uuid())", + parameters: [] + ) + + let attachmentsWatch = try database.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )).makeAsyncIterator() + + let attachmentRecord = try await waitForMatch( + iterator: attachmentsWatch, + where: { results in results.first?.state == AttachmentState.synced }, + timeout: 5 + ).first + + // The file should exist + let localData = try await queue.localStorage.readFile(filePath: attachmentRecord!.localUri!) + XCTAssertEqual(localData.count, 3) + + try await queue.clearQueue() + try await queue.close() + } + + func testAttachmentUpload() async throws { + class MockRemoteStorage: RemoteStorageAdapter { + public var uploadCalled = false + + func uploadFile( + fileData: Data, + attachment: Attachment + ) async throws { + uploadCalled = true + } + + /** + * Download a file from remote storage + */ + func downloadFile(attachment: Attachment) async throws -> Data { + return Data([1, 2, 3]) + } + + /** + * Delete a file from remote storage + */ + func deleteFile(attachment: Attachment) async throws {} + } + + let mockedRemote = MockRemoteStorage() + + let queue = AttachmentQueue( + db: database, + remoteStorage: mockedRemote, + attachmentsDirectory: getAttachmentDirectory(), + watchAttachments: { try self.database.watch(options: WatchOptions( + sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", + mapper: { cursor in try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) } + )) } + ) + + try await queue.startSync() + + let attachmentsWatch = try database.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )).makeAsyncIterator() + + _ = try await queue.saveFile( + data: Data([3, 4, 5]), + mediaType: "image/jpg", + fileExtension: "jpg" + ) { tx, attachment in + _ = try tx.execute( + sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'john', 'j@j.com', ?)", + parameters: [attachment.id] + ) + } + + _ = try await waitForMatch( + iterator: attachmentsWatch, + where: { results in results.first?.state == AttachmentState.synced }, + timeout: 5 + ).first + + // Upload should have been called + XCTAssertTrue(mockedRemote.uploadCalled) + + try await queue.clearQueue() + try await queue.close() + } +} + +enum WaitForMatchError: Error { + case timeout +} + +func waitForMatch( + iterator: AsyncThrowingStream.Iterator, + where predicate: @escaping (T) -> Bool, + timeout: TimeInterval +) async throws -> T { + let timeoutNanoseconds = UInt64(timeout * 1_000_000_000) + + return try await withThrowingTaskGroup(of: T.self) { group in + // Task to wait for a matching value + group.addTask { + var localIterator = iterator + while let value = try await localIterator.next() { + if predicate(value) { + return value + } + } + throw WaitForMatchError.timeout // stream ended before match + } + + // Task to enforce timeout + group.addTask { + try await Task.sleep(nanoseconds: timeoutNanoseconds) + throw WaitForMatchError.timeout + } + + // First one to succeed or fail + let result = try await group.next() + group.cancelAll() + return result! + } +} diff --git a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift index a97ff9f..4eef1b6 100644 --- a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift +++ b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift @@ -11,7 +11,8 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { Table(name: "users", columns: [ .text("name"), .text("email"), - ]), + .text("photo_id") + ]) ]) database = KotlinPowerSyncDatabaseImpl( @@ -70,6 +71,8 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { XCTAssertEqual(user.1, "Test User") XCTAssertEqual(user.2, "test@example.com") } + + func testGetError() async throws { do { diff --git a/Tests/PowerSyncTests/Kotlin/SqlCursorTests.swift b/Tests/PowerSyncTests/Kotlin/SqlCursorTests.swift index b8ae92b..26fa833 100644 --- a/Tests/PowerSyncTests/Kotlin/SqlCursorTests.swift +++ b/Tests/PowerSyncTests/Kotlin/SqlCursorTests.swift @@ -33,7 +33,8 @@ final class SqlCursorTests: XCTestCase { database = KotlinPowerSyncDatabaseImpl( schema: schema, - dbFilename: ":memory:" + dbFilename: ":memory:", + logger: DatabaseLogger(DefaultLogger()) ) try await database.disconnectAndClear() }