diff --git a/CHANGELOG.md b/CHANGELOG.md index bfac0821..94996f76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA31 + +* Added helpers for Attachment syncing. + ## 1.0.0-BETA30 * Fix a deadlock when calling `connect()` immediately after opening a database. @@ -10,22 +14,31 @@ * Fix potential race condition between jobs in `connect()` and `disconnect()`. * [JVM Windows] Fixed PowerSync Extension temporary file deletion error on process shutdown. * [iOS] Fixed issue where automatic driver migrations would fail with the error: + ``` Sqlite operation failure database is locked attempted to run migration and failed. closing connection ``` + * Fix race condition causing data received during uploads not to be applied. ## 1.0.0-BETA28 * Update PowerSync SQLite core extension to 0.3.12. -* Added queing protection and warnings when connecting multiple PowerSync clients to the same database file. -* Improved concurrent SQLite connection support accross various platforms. All platforms now use a single write connection and multiple read connections for concurrent read queries. -* Added the ability to open a SQLite database given a custom `dbDirectory` path. This is currently not supported on iOS due to internal driver restrictions. +* Added queing protection and warnings when connecting multiple PowerSync clients to the same + database file. +* Improved concurrent SQLite connection support accross various platforms. All platforms now use a + single write connection and multiple read connections for concurrent read queries. +* Added the ability to open a SQLite database given a custom `dbDirectory` path. This is currently + not supported on iOS due to internal driver restrictions. * Internaly improved the linking of SQLite for iOS. * Enabled Full Text Search on iOS platforms. * Added the ability to update the schema for existing PowerSync clients. -* Fixed bug where local only, insert only and view name overrides were not applied for schema tables. -* The Android SQLite driver now uses the [Xerial JDBC library](https://github.com/xerial/sqlite-jdbc). This removes the requirement for users to add the jitpack Maven repository to their projects. +* Fixed bug where local only, insert only and view name overrides were not applied for schema + tables. +* The Android SQLite driver now uses + the [Xerial JDBC library](https://github.com/xerial/sqlite-jdbc). This removes the requirement for + users to add the jitpack Maven repository to their projects. + ```diff // settings.gradle.kts example repositories { @@ -53,8 +66,10 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA24 -* Improve internal handling of watch queries to avoid issues where updates are not being received due to transaction commits occurring after the query is run. -* Fix issue in JVM build where `columnNames` was throwing an error due to the index of the JDBC driver starting at 1 instead of 0 as in the other drivers/ +* Improve internal handling of watch queries to avoid issues where updates are not being received + due to transaction commits occurring after the query is run. +* Fix issue in JVM build where `columnNames` was throwing an error due to the index of the JDBC + driver starting at 1 instead of 0 as in the other drivers/ * Throw and not just catch `CancellationExceptions` in `runWrappedSuspending` ## 1.0.0-BETA23 @@ -72,14 +87,18 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA20 -* Add cursor optional functions: `getStringOptional`, `getLongOptional`, `getDoubleOptional`, `getBooleanOptional` and `getBytesOptional` when using the column name which allow for optional return types +* Add cursor optional functions: `getStringOptional`, `getLongOptional`, `getDoubleOptional`, + `getBooleanOptional` and `getBytesOptional` when using the column name which allow for optional + return types * Throw errors for invalid column on all cursor functions -* `getString`, `getLong`, `getBytes`, `getDouble` and `getBoolean` used with the column name will now throw an error for non-null values and expect a non optional return type +* `getString`, `getLong`, `getBytes`, `getDouble` and `getBoolean` used with the column name will + now throw an error for non-null values and expect a non optional return type ## 1.0.0-BETA19 * Allow cursor to get values by column name e.g. `getStringOptional("id")` -* BREAKING CHANGE: If you were using `SqlCursor` from SqlDelight previously for your own custom mapper then you must now change to `SqlCursor` exported by the PowerSync module. +* BREAKING CHANGE: If you were using `SqlCursor` from SqlDelight previously for your own custom + mapper then you must now change to `SqlCursor` exported by the PowerSync module. Previously you would import it like this: @@ -95,7 +114,8 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA18 -* BREAKING CHANGE: Move from async sqldelight calls to synchronous calls. This will only affect `readTransaction` and `writeTransaction`where the callback function is no longer asynchronous. +* BREAKING CHANGE: Move from async sqldelight calls to synchronous calls. This will only affect + `readTransaction` and `writeTransaction`where the callback function is no longer asynchronous. ## 1.0.0-BETA17 @@ -104,7 +124,8 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA16 * Add `close` method to database methods -* Throw when error is a `CancellationError` and remove invalidation for all errors in `streamingSync` catch. +* Throw when error is a `CancellationError` and remove invalidation for all errors in + `streamingSync` catch. ## 1.0.0-BETA15 @@ -118,7 +139,8 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA13 -* Move iOS database driver to use IO dispatcher which should avoid race conditions and improve performance. +* Move iOS database driver to use IO dispatcher which should avoid race conditions and improve + performance. ## 1.0.0-BETA12 @@ -135,7 +157,8 @@ Sqlite operation failure database is locked attempted to run migration and faile ## 1.0.0-BETA9 * Re-enable SKIE `SuspendInterop` -* Move transaction functions out of `PowerSyncTransactionFactory` to avoid threading issues in Swift SDK +* Move transaction functions out of `PowerSyncTransactionFactory` to avoid threading issues in Swift + SDK ## 1.0.0-BETA8 @@ -164,23 +187,27 @@ Sqlite operation failure database is locked attempted to run migration and faile * Add `waitForFirstSync` function - which resolves after the initial sync is completed * Upgrade to Kotlin 2.0.20 - should not cause any issues with users who are still on Kotlin 1.9 * Upgrade `powersync-sqlite-core` to 0.3.0 - improves incremental sync performance -* Add client sync parameters - which allows you specify sync parameters from the client https://docs.powersync.com/usage/sync-rules/advanced-topics/client-parameters-beta +* Add client sync parameters - which allows you specify sync parameters from the + client https://docs.powersync.com/usage/sync-rules/advanced-topics/client-parameters-beta + ```kotlin val params = JsonParam.Map( - mapOf( - "name" to JsonParam.String("John Doe"), - "age" to JsonParam.Number(30), - "isStudent" to JsonParam.Boolean(false) - ) + mapOf( + "name" to JsonParam.String("John Doe"), + "age" to JsonParam.Number(30), + "isStudent" to JsonParam.Boolean(false) + ) ) connect( -... - params = params + ... +params = params ) ``` + * Add schema validation when schema is generated -* Add warning message if there is a crudItem in the queue that has not yet been synced and after a delay rerun the upload +* Add warning message if there is a crudItem in the queue that has not yet been synced and after a + delay rerun the upload ## 1.0.0-BETA2 @@ -188,13 +215,15 @@ connect( ## 1.0.0-BETA1 -* Improve API by changing from Builder pattern to simply instantiating the database `PowerSyncDatabase` +* Improve API by changing from Builder pattern to simply instantiating the database + `PowerSyncDatabase` E.g. `val db = PowerSyncDatabase(factory, schema)` * Use callback context in transactions E.g. `db.writeTransaction{ ctx -> ctx.execute(...) }` * Removed unnecessary expiredAt field * Added table max column validation as there is a hard limit of 63 columns * Moved SQLDelight models to a separate module to reduce export size -* Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to more easily use and/or change Logger settings +* Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to + more easily use and/or change Logger settings * Add `retryDelay` and `crudThrottle` options when setting up database connection * Changed `_viewNameOverride` to `viewNameOverride` diff --git a/connectors/supabase/build.gradle.kts b/connectors/supabase/build.gradle.kts index 984d62b6..5763e39d 100644 --- a/connectors/supabase/build.gradle.kts +++ b/connectors/supabase/build.gradle.kts @@ -28,6 +28,7 @@ kotlin { implementation(libs.kotlinx.coroutines.core) implementation(libs.supabase.client) api(libs.supabase.auth) + api(libs.supabase.storage) } } } diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt index f4c33adb..388b70e6 100644 --- a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt @@ -17,6 +17,9 @@ import io.github.jan.supabase.auth.user.UserSession import io.github.jan.supabase.createSupabaseClient import io.github.jan.supabase.postgrest.Postgrest import io.github.jan.supabase.postgrest.from +import io.github.jan.supabase.storage.BucketApi +import io.github.jan.supabase.storage.Storage +import io.github.jan.supabase.storage.storage import io.ktor.client.plugins.HttpSend import io.ktor.client.plugins.plugin import io.ktor.client.statement.bodyAsText @@ -31,6 +34,7 @@ import kotlinx.serialization.json.Json public class SupabaseConnector( public val supabaseClient: SupabaseClient, public val powerSyncEndpoint: String, + private val storageBucket: String? = null, ) : PowerSyncBackendConnector() { private var errorCode: String? = null @@ -52,17 +56,29 @@ public class SupabaseConnector( } } + public fun storageBucket(): BucketApi { + if (storageBucket == null) { + throw Exception("No bucket has been specified") + } + return supabaseClient.storage[storageBucket] + } + public constructor( supabaseUrl: String, supabaseKey: String, powerSyncEndpoint: String, + storageBucket: String? = null, ) : this( supabaseClient = createSupabaseClient(supabaseUrl, supabaseKey) { install(Auth) install(Postgrest) + if (storageBucket != null) { + install(Storage) + } }, powerSyncEndpoint = powerSyncEndpoint, + storageBucket = storageBucket, ) init { @@ -81,7 +97,10 @@ public class SupabaseConnector( val responseText = response.bodyAsText() try { - val error = Json { coerceInputValues = true }.decodeFromString>(responseText) + val error = + Json { coerceInputValues = true }.decodeFromString>( + responseText, + ) errorCode = error["code"] } catch (e: Exception) { Logger.e("Failed to parse error response: $e") @@ -139,7 +158,9 @@ public class SupabaseConnector( check(supabaseClient.auth.sessionStatus.value is SessionStatus.Authenticated) { "Supabase client is not authenticated" } // Use Supabase token for PowerSync - val session = supabaseClient.auth.currentSessionOrNull() ?: error("Could not fetch Supabase credentials") + val session = + supabaseClient.auth.currentSessionOrNull() + ?: error("Could not fetch Supabase credentials") check(session.user != null) { "No user data" } diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt new file mode 100644 index 00000000..423f039d --- /dev/null +++ b/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt @@ -0,0 +1,57 @@ +package com.powersync.connector.supabase + +import com.powersync.attachments.Attachment +import com.powersync.attachments.RemoteStorage +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOf + +/** + * Implementation of [RemoteStorage] that uses Supabase as the backend storage provider. + * + * @property connector The Supabase connector used to interact with the Supabase storage bucket. + */ +public class SupabaseRemoteStorage( + public val connector: SupabaseConnector, +) : RemoteStorage { + /** + * Uploads a file to the Supabase storage bucket. + * + * @param fileData A [Flow] of [ByteArray] representing the file data to be uploaded. + * @param attachment The [Attachment] metadata associated with the file. + * @throws IllegalStateException If the attachment size is not specified. + */ + override suspend fun uploadFile( + fileData: Flow, + attachment: Attachment, + ) { + val byteSize = + attachment.size?.toInt() ?: error("Cannot upload a file with no byte size specified") + // Supabase wants a single ByteArray + val buffer = ByteArray(byteSize) + var position = 0 + fileData.collect { + it.copyInto(buffer, destinationOffset = position) + position += it.size + } + + connector.storageBucket().upload(attachment.filename, buffer) + } + + /** + * Downloads a file from the Supabase storage bucket. + * + * @param attachment The [Attachment] record associated with the file to be downloaded. + * @return A [Flow] of [ByteArray] representing the file data. + */ + override suspend fun downloadFile(attachment: Attachment): Flow = + flowOf(connector.storageBucket().downloadAuthenticated(attachment.filename)) + + /** + * Deletes a file from the Supabase storage bucket. + * + * @param attachment The [Attachment] record associated with the file to be deleted. + */ + override suspend fun deleteFile(attachment: Attachment) { + connector.storageBucket().delete(attachment.filename) + } +} diff --git a/core/README.md b/core/README.md index db89fbb3..ee09bf8a 100644 --- a/core/README.md +++ b/core/README.md @@ -24,8 +24,14 @@ structure: ## Note on SQLDelight The PowerSync core module, internally makes use -of [SQLDelight](https://sqldelight.github.io/sqldelight/latest/) for it database API and typesafe database +of [SQLDelight](https://sqldelight.github.io/sqldelight/latest/) for it database API and typesafe +database query generation. The PowerSync core module does not currently support integrating with SQLDelight from client applications. + +## Attachment Helpers + +This module contains attachment helpers under the `com.powersync.attachments` package. See +the [Attachment Helpers README](./src/commonMain/kotlin/com/powersync/attachments/README.md) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 802ecf0b..c1165245 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -4,10 +4,8 @@ import de.undercouch.gradle.tasks.download.Download import org.gradle.api.tasks.testing.logging.TestExceptionFormat import org.gradle.internal.os.OperatingSystem import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget -import org.jetbrains.kotlin.gradle.plugin.mpp.TestExecutable import org.jetbrains.kotlin.gradle.targets.jvm.tasks.KotlinJvmTest import org.jetbrains.kotlin.gradle.tasks.KotlinTest -import org.jetbrains.kotlin.konan.target.Family plugins { @@ -18,6 +16,7 @@ plugins { alias(libs.plugins.downloadPlugin) alias(libs.plugins.kotlinter) id("com.powersync.plugins.sonatype") + id("com.powersync.plugins.sharedbuild") alias(libs.plugins.mokkery) alias(libs.plugins.kotlin.atomicfu) } @@ -71,36 +70,6 @@ val downloadPowersyncDesktopBinaries by tasks.registering(Download::class) { onlyIfModified(true) } -val downloadPowersyncFramework by tasks.registering(Download::class) { - val coreVersion = - libs.versions.powersync.core - .get() - val framework = - "https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v$coreVersion/powersync-sqlite-core.xcframework.zip" - - src(framework) - dest(binariesFolder.map { it.file("framework/powersync-sqlite-core.xcframework.zip") }) - onlyIfModified(true) -} - -val unzipPowersyncFramework by tasks.registering(Exec::class) { - dependsOn(downloadPowersyncFramework) - - val zipfile = downloadPowersyncFramework.get().dest - inputs.file(zipfile) - val destination = File(zipfile.parentFile, "extracted") - doFirst { - destination.deleteRecursively() - destination.mkdir() - } - - // We're using unzip here because the Gradle copy task doesn't support symlinks. - executable = "unzip" - args(zipfile.absolutePath) - workingDir(destination) - outputs.dir(destination) -} - val sqliteJDBCFolder = project.layout.buildDirectory .dir("jdbc") @@ -165,33 +134,6 @@ kotlin { } } - if (konanTarget.family == Family.IOS && konanTarget.name.contains("simulator")) { - binaries.withType().configureEach { - linkTaskProvider.configure { dependsOn(unzipPowersyncFramework) } - linkerOpts("-framework", "powersync-sqlite-core") - val frameworkRoot = - binariesFolder - .map { it.dir("framework/extracted/powersync-sqlite-core.xcframework/ios-arm64_x86_64-simulator") } - .get() - .asFile.path - - linkerOpts("-F", frameworkRoot) - linkerOpts("-rpath", frameworkRoot) - } - } else if (konanTarget.family == Family.OSX) { - binaries.withType().configureEach { - linkTaskProvider.configure { dependsOn(unzipPowersyncFramework) } - linkerOpts("-framework", "powersync-sqlite-core") - var frameworkRoot = - binariesFolder - .map { it.dir("framework/extracted/powersync-sqlite-core.xcframework/macos-arm64_x86_64") } - .get() - .asFile.path - - linkerOpts("-F", frameworkRoot) - linkerOpts("-rpath", frameworkRoot) - } - } /* If we ever need macOS support: { @@ -317,8 +259,8 @@ android { } androidComponents.onVariants { - tasks.named("preBuild") { - dependsOn(moveJDBCJNIFiles) + tasks.named("preBuild") { + dependsOn(moveJDBCJNIFiles) } } diff --git a/core/src/appleTest/kotlin/com/powersync/testutils/TestUtils.apple.kt b/core/src/appleTest/kotlin/com/powersync/testutils/TestUtils.apple.kt index 1a4e93ea..396be545 100644 --- a/core/src/appleTest/kotlin/com/powersync/testutils/TestUtils.apple.kt +++ b/core/src/appleTest/kotlin/com/powersync/testutils/TestUtils.apple.kt @@ -3,6 +3,7 @@ package com.powersync.testutils import com.powersync.DatabaseDriverFactory import kotlinx.io.files.Path import kotlinx.io.files.SystemFileSystem +import platform.Foundation.NSTemporaryDirectory actual val factory: DatabaseDriverFactory get() = DatabaseDriverFactory() @@ -14,10 +15,6 @@ actual fun cleanup(path: String) { } } -/** - * We could use SystemTemporaryDirectory here in future, but we return null here - * to skip tests which rely on a temporary directory for iOS. - * The reason for skipping these tests is that the SQLiteR library does not currently - * support opening DB paths for custom directories. - */ -actual fun getTempDir(): String? = null +actual fun getTempDir(): String = NSTemporaryDirectory() + +actual fun isIOS(): Boolean = true diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt new file mode 100644 index 00000000..151e8560 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt @@ -0,0 +1,586 @@ +package com.powersync + +import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi +import com.powersync.attachments.Attachment +import com.powersync.attachments.AttachmentQueue +import com.powersync.attachments.AttachmentState +import com.powersync.attachments.RemoteStorage +import com.powersync.attachments.SyncErrorHandler +import com.powersync.attachments.WatchedAttachmentItem +import com.powersync.attachments.createAttachmentsTable +import com.powersync.db.getString +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table +import com.powersync.testutils.MockedRemoteStorage +import com.powersync.testutils.UserRow +import com.powersync.testutils.databaseTest +import com.powersync.testutils.getTempDir +import com.powersync.testutils.waitFor +import dev.mokkery.answering.throws +import dev.mokkery.everySuspend +import dev.mokkery.matcher.ArgMatchersScope +import dev.mokkery.matcher.any +import dev.mokkery.matcher.matching +import dev.mokkery.mock +import dev.mokkery.spy +import dev.mokkery.verifySuspend +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import kotlinx.coroutines.flow.flowOf +import kotlinx.io.files.Path +import kotlin.test.Test +import kotlin.time.Duration.Companion.seconds + +@OptIn(ExperimentalKermitApi::class) +class AttachmentsTest { + fun watchAttachments(database: PowerSyncDatabase) = + database.watch( + // language=SQL + sql = + """ + SELECT + photo_id + FROM + users + WHERE + photo_id IS NOT NULL + """, + ) { + WatchedAttachmentItem( + id = it.getString("photo_id"), + fileExtension = "jpg", + ) + } + + suspend fun updateSchema(db: PowerSyncDatabase) { + db.updateSchema( + Schema( + tables = + listOf( + UserRow.table, + createAttachmentsTable("attachments"), + ), + ), + ) + } + + fun getAttachmentsDir() = Path(getTempDir(), "attachments").toString() + + @Test + fun testAttachmentDownload() = + databaseTest { + turbineScope { + updateSchema(database) + + val remote = spy(MockedRemoteStorage()) + + // Monitor the attachments table for testing + val attachmentQuery = + database + // language=SQL + .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } + .testIn(this) + + val queue = + AttachmentQueue( + db = database, + remoteStorage = remote, + attachmentsDirectory = getAttachmentsDir(), + watchAttachments = { watchAttachments(database) }, + /** + * Sets the cache limit to zero for this test. Archived records will + * immediately be deleted. + */ + archivedCacheLimit = 0, + ) + + doOnCleanup { + attachmentQuery.cancel() + queue.stopSyncing() + queue.clearQueue() + queue.close() + } + + queue.startSync() + + val result = attachmentQuery.awaitItem() + + // There should not be any attachment records here + result.size shouldBe 0 + + // Create a user with a photo_id specified. + // This code did not save an attachment before assigning a photo_id. + // This is equivalent to requiring an attachment download + database.execute( + // language=SQL + """ + INSERT INTO + users (id, name, email, photo_id) + VALUES + (uuid(), "steven", "steven@journeyapps.com", uuid()) + """, + ) + + var attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord shouldNotBe null + + /** + * The timing of the watched query resolving might differ slightly. + * We might get a watched query result where the attachment is QUEUED_DOWNLOAD + * or we could get the result once it has been DOWNLOADED. + * We should assert that the download happens eventually. + */ + + if (attachmentRecord.state == AttachmentState.QUEUED_DOWNLOAD) { + // Wait for the download to be triggered + attachmentRecord = attachmentQuery.awaitItem().first() + } + + attachmentRecord.state shouldBe AttachmentState.SYNCED + + // A download should have been attempted for this file + verifySuspend { remote.downloadFile(attachmentMatcher(attachmentRecord)) } + + // A file should now exist + val localUri = attachmentRecord.localUri!! + queue.localStorage.fileExists(localUri) shouldBe true + + // Now clear the user's photo_id. The attachment should be archived + database.execute( + // language=SQL + """ + UPDATE + users + SET + photo_id = NULL + """, + ) + + /** + * The timing of the watched query resolving might differ slightly. + * We might get a watched query result where the attachment is ARCHIVED + * or we could get the result once it has been deleted. + * The file should be deleted eventually + */ + var nextRecord: Attachment? = attachmentQuery.awaitItem().first() + if (nextRecord?.state == AttachmentState.ARCHIVED) { + nextRecord = attachmentQuery.awaitItem().getOrNull(0) + } + + // The record should have been deleted + nextRecord shouldBe null + + // The file should have been deleted from storage + val exists = queue.localStorage.fileExists(localUri) + exists shouldBe false + + attachmentQuery.cancel() + } + } + + @Test + fun testAttachmentUpload() = + databaseTest { + turbineScope { + updateSchema(database) + val remote = spy(MockedRemoteStorage()) + + // Monitor the attachments table for testing + val attachmentQuery = + database + // language=SQL + .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } + .testIn(this) + + val queue = + AttachmentQueue( + db = database, + remoteStorage = remote, + attachmentsDirectory = getAttachmentsDir(), + watchAttachments = { watchAttachments(database) }, + /** + * Sets the cache limit to zero for this test. Archived records will + * immediately be deleted. + */ + archivedCacheLimit = 0, + ) + + doOnCleanup { + attachmentQuery.cancel() + queue.stopSyncing() + queue.clearQueue() + queue.close() + } + + queue.startSync() + + val result = attachmentQuery.awaitItem() + + // There should not be any attachment records here + result.size shouldBe 0 + + /** + * Creates an attachment given a flow of bytes (the file data) then assigns this to + * a newly created user. + */ + val record = + queue.saveFile( + data = flowOf(ByteArray(1)), + mediaType = "image/jpg", + fileExtension = "jpg", + ) { tx, attachment -> + // Set the photo_id of a new user to the attachment id + tx.execute( + // language=SQL + """ + INSERT INTO + users (id, name, email, photo_id) + VALUES + (uuid(), "steven", "steven@steven.com", ?) + """, + listOf(attachment.id), + ) + } + + var attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord shouldNotBe null + + if (attachmentRecord.state == AttachmentState.QUEUED_UPLOAD) { + // Wait for it to be synced + attachmentRecord = attachmentQuery.awaitItem().first() + } + + attachmentRecord.state shouldBe AttachmentState.SYNCED + + // A download should have been attempted for this file + verifySuspend { + remote.uploadFile( + any(), + attachmentMatcher(attachmentRecord), + ) + } + + val localUri = attachmentRecord.localUri!! + queue.localStorage.fileExists(localUri) shouldBe true + + // Now clear the user's photo_id. The attachment should be archived + database.execute( + // language=SQL + """ + UPDATE + users + SET + photo_id = NULL + """, + ) + + var nextRecord: Attachment? = attachmentQuery.awaitItem().first() + if (nextRecord?.state == AttachmentState.ARCHIVED) { + nextRecord = attachmentQuery.awaitItem().getOrNull(0) + } + + // The record should have been deleted + nextRecord shouldBe null + + // The file should have been deleted from storage + queue.localStorage.fileExists(localUri) shouldBe false + + attachmentQuery.cancel() + } + } + + @Test + fun testAttachmentDelete() = + databaseTest { + turbineScope { + updateSchema(database) + val remote = spy(MockedRemoteStorage()) + + // Monitor the attachments table for testing + val attachmentQuery = + database + // language=SQL + .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } + .testIn(this) + + val queue = + AttachmentQueue( + db = database, + remoteStorage = remote, + attachmentsDirectory = getAttachmentsDir(), + watchAttachments = { watchAttachments(database) }, + /** + * Sets the cache limit to zero for this test. Archived records will + * immediately be deleted. + */ + archivedCacheLimit = 0, + syncThrottleDuration = 0.seconds, + ) + + doOnCleanup { + queue.stopSyncing() + queue.clearQueue() + queue.close() + attachmentQuery.cancel() + } + + queue.startSync() + + val result = attachmentQuery.awaitItem() + + // There should not be any attachment records here + result.size shouldBe 0 + + // Create an attachment (simulates a download) + database.execute( + // language=SQL + """ + INSERT INTO + users (id, name, email, photo_id) + VALUES + (uuid(), "steven", "steven@steven.com", uuid()) + """, + ) + + // language=SQL + val attachmentID = + database.get("SELECT photo_id FROM users") { it.getString("photo_id") } + + // Wait for the record to be synced (mocked backend will allow it) + waitFor { + val record = attachmentQuery.awaitItem().first() + record shouldNotBe null + record.state shouldBe AttachmentState.SYNCED + } + + queue.deleteFile( + attachmentId = attachmentID, + ) { tx, attachment -> + tx.execute( + // language=SQL + """ + UPDATE + users + SET + photo_id = NULL + WHERE + photo_id = ? + """.trimIndent(), + listOf(attachment.id), + ) + } + + waitFor { + // Record should be deleted + val record = attachmentQuery.awaitItem().firstOrNull() + record shouldBe null + } + + // A delete should have been attempted for this file + verifySuspend { + remote.deleteFile( + any(), + ) + } + + attachmentQuery.cancel() + } + } + + @Test + fun testAttachmentCachedDownload() = + databaseTest { + turbineScope { + updateSchema(database) + + val remote = spy(MockedRemoteStorage()) + + // Monitor the attachments table for testing + val attachmentQuery = + database + // language=SQL + .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } + .testIn(this) + + val queue = + AttachmentQueue( + db = database, + remoteStorage = remote, + attachmentsDirectory = getAttachmentsDir(), + watchAttachments = { watchAttachments(database) }, + /** + * Keep some items in the cache + */ + archivedCacheLimit = 10, + ) + + doOnCleanup { + queue.stopSyncing() + queue.clearQueue() + queue.close() + attachmentQuery.cancel() + } + + queue.startSync() + + val result = attachmentQuery.awaitItem() + + // There should not be any attachment records here + result.size shouldBe 0 + + // Create a user with a photo_id specified. + // This code did not save an attachment before assigning a photo_id. + // This is equivalent to requiring an attachment download + database.execute( + // language=SQL + """ + INSERT INTO + users (id, name, email, photo_id) + VALUES + (uuid(), "steven", "steven@journeyapps.com", uuid()) + """, + ) + + var attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord shouldNotBe null + + /** + * The timing of the watched query resolving might differ slightly. + * We might get a watched query result where the attachment is QUEUED_DOWNLOAD + * or we could get the result once it has been DOWNLOADED. + * We should assert that the download happens eventually. + */ + + if (attachmentRecord.state == AttachmentState.QUEUED_DOWNLOAD) { + // Wait for the download to be triggered + attachmentRecord = attachmentQuery.awaitItem().first() + } + + attachmentRecord.state shouldBe AttachmentState.SYNCED + + // A download should have been attempted for this file + verifySuspend { remote.downloadFile(attachmentMatcher(attachmentRecord)) } + + // A file should now exist + val localUri = attachmentRecord.localUri!! + queue.localStorage.fileExists(localUri) shouldBe true + + // Now clear the user's photo_id. The attachment should be archived + database.execute( + // language=SQL + """ + UPDATE + users + SET + photo_id = NULL + """, + ) + + attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord.state shouldBe AttachmentState.ARCHIVED + + // Now if we set the photo_id, the archived record should be restored + database.execute( + // language=SQL + """ + UPDATE + users + SET + photo_id = ? + """, + listOf(attachmentRecord.id), + ) + + attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord.state shouldBe AttachmentState.SYNCED + + attachmentQuery.cancel() + } + } + + @Test + fun testSkipFailedDownload() = + databaseTest { + turbineScope { + updateSchema(database) + + val remote = + mock { + everySuspend { downloadFile(any()) } throws (Exception("Test error")) + } + + // Monitor the attachments table for testing + val attachmentQuery = + database + .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } + .testIn(this) + + val queue = + AttachmentQueue( + db = database, + remoteStorage = remote, + attachmentsDirectory = getAttachmentsDir(), + watchAttachments = { watchAttachments(database) }, + archivedCacheLimit = 0, + errorHandler = + object : SyncErrorHandler { + override suspend fun onDownloadError( + attachment: Attachment, + exception: Exception, + ): Boolean = false + + override suspend fun onUploadError( + attachment: Attachment, + exception: Exception, + ): Boolean = false + + override suspend fun onDeleteError( + attachment: Attachment, + exception: Exception, + ): Boolean = false + }, + ) + doOnCleanup { + queue.stopSyncing() + queue.clearQueue() + queue.close() + attachmentQuery.cancel() + } + + queue.startSync() + + val result = attachmentQuery.awaitItem() + + // There should not be any attachment records here + result.size shouldBe 0 + + // Create a user with a photo_id specified. + // This code did not save an attachment before assigning a photo_id. + // This is equivalent to requiring an attachment download + database.execute( + """ + INSERT INTO + users (id, name, email, photo_id) + VALUES + (uuid(), "steven", "steven@journeyapps.com", uuid()) + """, + ) + + var attachmentRecord = attachmentQuery.awaitItem().first() + attachmentRecord shouldNotBe null + + attachmentRecord.state shouldBe AttachmentState.QUEUED_DOWNLOAD + + // The download should fail. We don't specify a retry. The record should be archived. + attachmentRecord = attachmentQuery.awaitItem().first() + + attachmentRecord.state shouldBe AttachmentState.ARCHIVED + + attachmentQuery.cancel() + } + } +} + +fun ArgMatchersScope.attachmentMatcher(attachment: Attachment): Attachment = + matching(toString = { "attachment($attachment)" }, predicate = { it.id == attachment.id }) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index d65d6eb4..63f1478d 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -7,6 +7,7 @@ import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest import com.powersync.testutils.getTempDir +import com.powersync.testutils.isIOS import com.powersync.testutils.waitFor import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.collections.shouldHaveSize @@ -231,9 +232,16 @@ class DatabaseTest { fun openDBWithDirectory() = databaseTest { val tempDir = - getTempDir() - ?: // SQLiteR, which is used on iOS, does not support opening dbs from directories - return@databaseTest + if (isIOS()) { + null + } else { + getTempDir() + } + + if (tempDir == null) { + // SQLiteR, which is used on iOS, does not support opening dbs from directories + return@databaseTest + } // On platforms that support it, openDatabase() from our test utils should use a temporary // location. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/MockedRemoteStorage.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/MockedRemoteStorage.kt new file mode 100644 index 00000000..8baa0ed0 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/MockedRemoteStorage.kt @@ -0,0 +1,24 @@ +package com.powersync.testutils + +import com.powersync.attachments.Attachment +import com.powersync.attachments.RemoteStorage +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow + +class MockedRemoteStorage : RemoteStorage { + override suspend fun uploadFile( + fileData: Flow, + attachment: Attachment, + ) { + // No op + } + + override suspend fun downloadFile(attachment: Attachment): Flow = + flow { + emit(ByteArray(1)) + } + + override suspend fun deleteFile(attachment: Attachment) { + // No op + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 88e2382b..9dc16317 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -29,7 +29,9 @@ expect val factory: DatabaseDriverFactory expect fun cleanup(path: String) -expect fun getTempDir(): String? +expect fun getTempDir(): String + +expect fun isIOS(): Boolean fun generatePrintLogWriter() = object : LogWriter() { @@ -136,6 +138,8 @@ internal class ActiveDatabaseTest( } suspend fun cleanup() { + // Execute in reverse order + cleanupItems.reverse() for (item in cleanupItems) { item() } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt index caf65765..78f339f5 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt @@ -2,6 +2,7 @@ package com.powersync.testutils import com.powersync.db.SqlCursor import com.powersync.db.getString +import com.powersync.db.getStringOptional import com.powersync.db.schema.Column import com.powersync.db.schema.Table @@ -9,6 +10,7 @@ data class UserRow( val id: String, val name: String, val email: String, + val photo_id: String?, ) { companion object { fun from(cursor: SqlCursor): UserRow = @@ -16,8 +18,18 @@ data class UserRow( id = cursor.getString("id"), name = cursor.getString("name"), email = cursor.getString("email"), + photo_id = cursor.getStringOptional("photo_id"), ) - val table = Table(name = "users", columns = listOf(Column.text("name"), Column.text("email"))) + val table = + Table( + name = "users", + columns = + listOf( + Column.text("name"), + Column.text("email"), + Column.text("photo_id"), + ), + ) } } diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/Attachment.kt b/core/src/commonMain/kotlin/com/powersync/attachments/Attachment.kt new file mode 100644 index 00000000..377055e9 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/Attachment.kt @@ -0,0 +1,98 @@ +package com.powersync.attachments + +import com.powersync.db.SqlCursor +import com.powersync.db.getLong +import com.powersync.db.getLongOptional +import com.powersync.db.getString +import com.powersync.db.getStringOptional + +/** + * Represents the state of an attachment. + */ +public enum class AttachmentState { + /** + * The attachment is queued for download from the remote storage. + */ + QUEUED_DOWNLOAD, + + /** + * The attachment is queued for upload to the remote storage. + */ + QUEUED_UPLOAD, + + /** + * The attachment is queued for deletion from the remote storage. + */ + QUEUED_DELETE, + + /** + * The attachment is fully synchronized with the remote storage. + */ + SYNCED, + + /** + * The attachment is archived and no longer actively synchronized. + */ + ARCHIVED, + + ; + + public companion object { + /** + * Constructs an [AttachmentState] from the corresponding integer value. + * + * @param value The integer value representing the ordinal of the enum. + * @return The corresponding [AttachmentState]. + * @throws IllegalArgumentException If the value does not match any [AttachmentState]. + */ + public fun fromLong(value: Long): AttachmentState = + entries.getOrNull(value.toInt()) + ?: throw IllegalArgumentException("Invalid value for AttachmentState: $value") + } +} + +/** + * Represents an attachment with metadata and state information. + * + * @property id Unique identifier for the attachment. + * @property timestamp Timestamp of the last record update. + * @property filename Name of the attachment file, e.g., `[id].jpg`. + * @property state Current state of the attachment, represented as an ordinal of [AttachmentState]. + * @property localUri Local URI pointing to the attachment file, if available. + * @property mediaType Media type of the attachment, typically represented as a MIME type. + * @property size Size of the attachment in bytes, if available. + * @property hasSynced Indicates whether the attachment has been synced locally before. + * @property metaData Additional metadata associated with the attachment. + */ +public data class Attachment( + val id: String, + val timestamp: Long = 0, + val filename: String, + val state: AttachmentState = AttachmentState.QUEUED_DOWNLOAD, + val localUri: String? = null, + val mediaType: String? = null, + val size: Long? = null, + val hasSynced: Int = 0, + val metaData: String? = null, +) { + public companion object { + /** + * Creates an [Attachment] instance from a database cursor. + * + * @param cursor The [SqlCursor] containing attachment data. + * @return An [Attachment] instance populated with data from the cursor. + */ + public fun fromCursor(cursor: SqlCursor): Attachment = + Attachment( + id = cursor.getString(name = "id"), + timestamp = cursor.getLong("timestamp"), + filename = cursor.getString(name = "filename"), + localUri = cursor.getStringOptional(name = "local_uri"), + mediaType = cursor.getStringOptional(name = "media_type"), + size = cursor.getLongOptional("size"), + state = AttachmentState.fromLong(cursor.getLong("state")), + hasSynced = cursor.getLong("has_synced").toInt(), + metaData = cursor.getStringOptional("meta_data"), + ) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt new file mode 100644 index 00000000..8c897086 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt @@ -0,0 +1,545 @@ +package com.powersync.attachments + +import co.touchlab.kermit.Logger +import com.powersync.PowerSyncDatabase +import com.powersync.PowerSyncException +import com.powersync.attachments.implementation.AttachmentServiceImpl +import com.powersync.attachments.storage.IOLocalStorageAdapter +import com.powersync.attachments.sync.SyncingService +import com.powersync.db.getString +import com.powersync.db.internal.ConnectionContext +import com.powersync.db.runWrappedSuspending +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.io.files.Path +import kotlin.coroutines.cancellation.CancellationException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * A watched attachment record item. + * This is usually returned from watching all relevant attachment IDs. + */ +public data class WatchedAttachmentItem( + /** + * Id for the attachment record. + */ + public val id: String, + /** + * File extension used to determine an internal filename for storage if no [filename] is provided. + */ + public val fileExtension: String? = null, + /** + * Filename to store the attachment with. + */ + public val filename: String? = null, + /** + * Optional metadata for the attachment record. + */ + public val metaData: String? = null, +) { + init { + require(fileExtension != null || filename != null) { + "Either fileExtension or filename must be provided." + } + } +} + +/** + * Class used to implement the attachment queue. + * Requires a PowerSyncDatabase, an implementation of + * AbstractRemoteStorageAdapter, and an attachment directory name which will + * determine which folder attachments are stored into. + */ +public open class AttachmentQueue + @OptIn(DelicateCoroutinesApi::class) + constructor( + /** + * PowerSync database client. + */ + public val db: PowerSyncDatabase, + /** + * Adapter which interfaces with the remote storage backend. + */ + public val remoteStorage: RemoteStorage, + /** + * Directory name where attachment files will be written to disk. + * This will be created if it does not exist. + */ + private val attachmentsDirectory: String, + /** + * A flow generator for the current state of local attachments. + * Example: + * ```kotlin + * watchAttachments = { + * db.watch( + * sql = """ + * SELECT + * photo_id as id, + * 'jpg' as fileExtension + * FROM + * checklists + * WHERE + * photo_id IS NOT NULL + * """, + * ) { cursor -> + * WatchedAttachmentItem( + * id = cursor.getString("id"), + * fileExtension = "jpg", + * ) + * } + * } + * ``` + */ + private val watchAttachments: () -> Flow>, + /** + * Provides access to local filesystem storage methods. + */ + public val localStorage: LocalStorage = IOLocalStorageAdapter(), + /** + * SQLite table where attachment state will be recorded. + */ + private val attachmentsQueueTableName: String = DEFAULT_TABLE_NAME, + /** + * Attachment operation error handler. Specifies if failed attachment operations + * should be retried. + */ + private val errorHandler: SyncErrorHandler? = null, + /** + * Periodic interval to trigger attachment sync operations. + */ + private val syncInterval: Duration = 30.seconds, + /** + * Archived attachments can be used as a cache which can be restored if an attachment ID + * reappears after being removed. This parameter defines how many archived records are retained. + * Records are deleted once the number of items exceeds this value. + */ + private val archivedCacheLimit: Long = 100, + /** + * Throttles remote sync operations triggering. + */ + private val syncThrottleDuration: Duration = 1.seconds, + /** + * Creates a list of subdirectories in the [attachmentsDirectory] directory. + */ + private val subdirectories: List? = null, + /** + * Should attachments be downloaded. + */ + private val downloadAttachments: Boolean = true, + /** + * Logging interface used for all log operations. + */ + public val logger: Logger = Logger, + /** + * Optional scope to launch syncing jobs in. + */ + private val coroutineScope: CoroutineScope? = null, + ) { + public companion object { + /** + * Default table name for attachments. + */ + public const val DEFAULT_TABLE_NAME: String = "attachments" + + /** + * Default directory name for attachments. + */ + public const val DEFAULT_ATTACHMENTS_DIRECTORY_NAME: String = "attachments" + } + + /** + * Service which provides access to attachment records. + * Use this to: + * - Query all current attachment records. + * - Create new attachment records for upload/download. + */ + public val attachmentsService: AttachmentService = + AttachmentServiceImpl( + db, + attachmentsQueueTableName, + logger, + maxArchivedCount = archivedCacheLimit, + ) + + private val syncScope = coroutineScope ?: CoroutineScope(SupervisorJob() + Dispatchers.IO) + + private var syncStatusJob: Job? = null + private val mutex = Mutex() + + /** + * Syncing service for this attachment queue. + * This processes attachment records and performs relevant upload, download, and delete + * operations. + */ + private val syncingService = + SyncingService( + remoteStorage, + localStorage, + attachmentsService, + ::getLocalUri, + errorHandler, + logger, + syncScope, + syncThrottleDuration, + ) + + public var closed: Boolean = false + + /** + * Initialize the attachment queue by: + * 1. Creating the attachments directory. + * 2. Adding watches for uploads, downloads, and deletes. + * 3. Adding a trigger to run uploads, downloads, and deletes when the device is online after being offline. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun startSync(): Unit = + runWrappedSuspending { + mutex.withLock { + if (closed) { + throw Exception("Attachment queue has been closed") + } + + stopSyncingInternal() + + // Ensure the directory where attachments are downloaded exists. + localStorage.makeDir(attachmentsDirectory) + + subdirectories?.forEach { subdirectory -> + localStorage.makeDir(Path(attachmentsDirectory, subdirectory).toString()) + } + + attachmentsService.withContext { context -> + verifyAttachments(context) + } + + syncingService.startSync(syncInterval) + + // Listen for connectivity changes. + syncStatusJob = + syncScope.launch { + launch { + var previousConnected = db.currentStatus.connected + db.currentStatus.asFlow().collect { status -> + if (!previousConnected && status.connected) { + syncingService.triggerSync() + } + previousConnected = status.connected + } + } + + launch { + // Watch local attachment relationships and sync the attachment records. + watchAttachments().collect { items -> + processWatchedAttachments(items) + } + } + } + } + } + + /** + * Stops syncing. Syncing may be resumed with [startSync]. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun stopSyncing(): Unit = + mutex.withLock { + stopSyncingInternal() + } + + private suspend fun stopSyncingInternal(): Unit = + runWrappedSuspending { + if (closed) { + return@runWrappedSuspending + } + + syncStatusJob?.cancelAndJoin() + syncStatusJob = null + syncingService.stopSync() + } + + /** + * Closes the queue. + * The queue cannot be used after closing. + * A new queue should be created. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun close(): Unit = + runWrappedSuspending { + mutex.withLock { + if (closed) { + return@runWrappedSuspending + } + + syncStatusJob?.cancelAndJoin() + syncingService.close() + if (coroutineScope == null) { + // Only cancel the internal scope if we created it. + syncScope.coroutineContext[Job]?.cancelAndJoin() + } + closed = true + } + } + + /** + * Resolves the filename for new attachment items. + * A new attachment from [watchAttachments] might not include a filename. + * Concatenates the attachment ID and extension by default. + * This method can be overridden for custom behavior. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public open suspend fun resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String?, + ): String = "$attachmentId.$fileExtension" + + /** + * Processes attachment items returned from [watchAttachments]. + * The default implementation asserts the items returned from [watchAttachments] as the definitive + * state for local attachments. + * + * Records currently in the attachment queue which are not present in the items are deleted from + * the queue. + * + * Received items which are not currently in the attachment queue are assumed scheduled for + * download. This requires that locally created attachments should be created with [saveFile] + * before assigning the attachment ID to the relevant watched tables. + * + * This method can be overridden for custom behavior. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public open suspend fun processWatchedAttachments(items: List): Unit = + runWrappedSuspending { + /** + * Use a lock here to prevent conflicting state updates. + */ + attachmentsService.withContext { attachmentsContext -> + /** + * Need to get all the attachments which are tracked in the DB. + * We might need to restore an archived attachment. + */ + val currentAttachments = attachmentsContext.getAttachments() + val attachmentUpdates = mutableListOf() + + for (item in items) { + val existingQueueItem = currentAttachments.find { it.id == item.id } + + if (existingQueueItem == null) { + if (!downloadAttachments) { + continue + } + // This item should be added to the queue. + // This item is assumed to be coming from an upstream sync. + // Locally created new items should be persisted using [saveFile] before + // this point. + val filename = + item.filename ?: resolveNewAttachmentFilename( + attachmentId = item.id, + fileExtension = item.fileExtension, + ) + + attachmentUpdates.add( + Attachment( + id = item.id, + filename = filename, + state = AttachmentState.QUEUED_DOWNLOAD, + metaData = item.metaData, + ), + ) + } else if + (existingQueueItem.state == AttachmentState.ARCHIVED) { + // The attachment is present again. Need to queue it for sync. + // We might be able to optimize this in future. + if (existingQueueItem.hasSynced == 1) { + // No remote action required, we can restore the record (avoids deletion). + attachmentUpdates.add( + existingQueueItem.copy(state = AttachmentState.SYNCED), + ) + } else { + /** + * The localURI should be set if the record was meant to be downloaded + * and has been synced. If it's missing and hasSynced is false then + * it must be an upload operation. + */ + attachmentUpdates.add( + existingQueueItem.copy( + state = + if (existingQueueItem.localUri == null) { + AttachmentState.QUEUED_DOWNLOAD + } else { + AttachmentState.QUEUED_UPLOAD + }, + ), + ) + } + } + } + + /** + * Archive any items not specified in the watched items except for items pending delete. + */ + currentAttachments + .filter { + it.state != AttachmentState.QUEUED_DELETE && + it.state != AttachmentState.QUEUED_UPLOAD && + null == items.find { update -> update.id == it.id } + }.forEach { + attachmentUpdates.add(it.copy(state = AttachmentState.ARCHIVED)) + } + + attachmentsContext.saveAttachments(attachmentUpdates) + } + } + + /** + * A function which creates a new attachment locally. This new attachment is queued for upload + * after creation. + * + * The filename is resolved using [resolveNewAttachmentFilename]. + * + * A [updateHook] is provided which should be used when assigning relationships to the newly + * created attachment. This hook is executed in the same write transaction which creates the + * attachment record. + * + * This method can be overridden for custom behavior. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public open suspend fun saveFile( + data: Flow, + mediaType: String, + fileExtension: String? = null, + metaData: String? = null, + updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit, + ): Attachment = + runWrappedSuspending { + val id = db.get("SELECT uuid() as id") { it.getString("id") } + val filename = + resolveNewAttachmentFilename(attachmentId = id, fileExtension = fileExtension) + val localUri = getLocalUri(filename) + + // Write the file to the filesystem. + val fileSize = localStorage.saveFile(localUri, data) + + /** + * Starts a write transaction. The attachment record and relevant local relationship + * assignment should happen in the same transaction. + */ + attachmentsService.withContext { attachmentContext -> + db.writeTransaction { tx -> + val attachment = + Attachment( + id = id, + filename = filename, + size = fileSize, + mediaType = mediaType, + state = AttachmentState.QUEUED_UPLOAD, + localUri = localUri, + metaData = metaData, + ) + + /** + * Allow consumers to set relationships to this attachment ID. + */ + updateHook.invoke(tx, attachment) + + return@writeTransaction attachmentContext.upsertAttachment( + attachment, + tx, + ) + } + } + } + + /** + * A function which creates an attachment delete operation locally. This operation is queued + * for delete. + * The default implementation assumes the attachment record already exists locally. An exception + * is thrown if the record does not exist locally. + * This method can be overridden for custom behavior. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public open suspend fun deleteFile( + attachmentId: String, + updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit, + ): Attachment = + runWrappedSuspending { + attachmentsService.withContext { attachmentContext -> + val attachment = + attachmentContext.getAttachment(attachmentId) + ?: throw error("Attachment record with id $attachmentId was not found.") + + db.writeTransaction { tx -> + updateHook.invoke(tx, attachment) + return@writeTransaction attachmentContext.upsertAttachment( + attachment.copy(state = AttachmentState.QUEUED_DELETE), + tx, + ) + } + } + } + + /** + * Returns the user's storage directory with the attachment path used to load the file. + * Example: filePath: "attachment-1.jpg" returns "/data/user/0/com.yourdomain.app/files/attachments/attachment-1.jpg". + */ + public open fun getLocalUri(filename: String): String = Path(attachmentsDirectory, filename).toString() + + /** + * Removes all archived items. + */ + public suspend fun expireCache() { + var done: Boolean + attachmentsService.withContext { context -> + do { + done = syncingService.deleteArchivedAttachments(context) + } while (!done) + } + } + + /** + * Clears the attachment queue and deletes all attachment files. + */ + public suspend fun clearQueue() { + attachmentsService.withContext { + it.clearQueue() + } + // Remove the attachments directory. + localStorage.rmDir(attachmentsDirectory) + } + + /** + * Cleans up stale attachments. + */ + private suspend fun verifyAttachments(context: AttachmentContext) { + val attachments = context.getActiveAttachments() + val updates = mutableListOf() + + for (attachment in attachments) { + if (attachment.localUri == null) { + continue + } + val exists = localStorage.fileExists(attachment.localUri) + if ( + attachment.state == AttachmentState.SYNCED || attachment.state == AttachmentState.QUEUED_UPLOAD && !exists + ) { + updates.add( + attachment.copy( + state = AttachmentState.ARCHIVED, + localUri = null, + ), + ) + } + } + + context.saveAttachments(updates) + } + } diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentService.kt b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentService.kt new file mode 100644 index 00000000..d1b013b0 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentService.kt @@ -0,0 +1,88 @@ +package com.powersync.attachments + +import com.powersync.db.internal.ConnectionContext +import kotlinx.coroutines.flow.Flow + +/** + * Context for performing Attachment operations. + * This typically is provided through a locking/exclusivity method. + */ +public interface AttachmentContext { + /** + * Delete the attachment from the attachment queue. + */ + public suspend fun deleteAttachment(id: String): Unit + + /** + * Set the state of the attachment to ignore. + */ + public suspend fun ignoreAttachment(id: String): Unit + + /** + * Get the attachment from the attachment queue using an ID. + */ + public suspend fun getAttachment(id: String): Attachment? + + /** + * Save the attachment to the attachment queue. + */ + public suspend fun saveAttachment(attachment: Attachment): Attachment + + /** + * Save the attachments to the attachment queue. + */ + public suspend fun saveAttachments(attachments: List): Unit + + /** + * Get all the ID's of attachments in the attachment queue. + */ + public suspend fun getAttachmentIds(): List + + /** + * Get all Attachment records present in the database. + */ + public suspend fun getAttachments(): List + + /** + * Gets all the active attachments which require an operation to be performed. + */ + public suspend fun getActiveAttachments(): List + + /** + * Helper function to clear the attachment queue + * Currently only used for testing purposes. + */ + public suspend fun clearQueue(): Unit + + /** + * Delete attachments which have been archived + * @returns true if all items have been deleted. Returns false if there might be more archived + * items remaining. + */ + public suspend fun deleteArchivedAttachments(callback: suspend (attachments: List) -> Unit): Boolean + + /** + * Upserts an attachment record synchronously given a database connection context. + */ + public fun upsertAttachment( + attachment: Attachment, + context: ConnectionContext, + ): Attachment +} + +/** + * Service for interacting with the local attachment records. + */ +public interface AttachmentService { + /** + * Watcher for changes to attachments table. + * Once a change is detected it will initiate a sync of the attachments + */ + public fun watchActiveAttachments(): Flow + + /** + * Executes a callback with an exclusive lock on all attachment operations. + * This helps prevent race conditions between different updates. + */ + public suspend fun withContext(action: suspend (context: AttachmentContext) -> R): R +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentTable.kt b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentTable.kt new file mode 100644 index 00000000..5b54831f --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentTable.kt @@ -0,0 +1,28 @@ +package com.powersync.attachments + +import com.powersync.db.schema.Column +import com.powersync.db.schema.ColumnType +import com.powersync.db.schema.Table + +/** + * Creates a PowerSync table for storing local attachment state. + * + * @param name The name of the table. + * @return A [Table] object configured for storing attachment data. + */ +public fun createAttachmentsTable(name: String): Table = + Table( + name = name, + columns = + listOf( + Column("filename", ColumnType.TEXT), + Column("local_uri", ColumnType.TEXT), + Column("timestamp", ColumnType.INTEGER), + Column("size", ColumnType.INTEGER), + Column("media_type", ColumnType.TEXT), + Column("state", ColumnType.INTEGER), + Column("has_synced", ColumnType.INTEGER), + Column("meta_data", ColumnType.TEXT), + ), + localOnly = true, + ) diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/LocalStorage.kt b/core/src/commonMain/kotlin/com/powersync/attachments/LocalStorage.kt new file mode 100644 index 00000000..69ace829 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/LocalStorage.kt @@ -0,0 +1,95 @@ +package com.powersync.attachments + +import com.powersync.PowerSyncException +import kotlinx.coroutines.flow.Flow +import kotlin.coroutines.cancellation.CancellationException + +/** + * Provides access to local storage on a device. + */ +public interface LocalStorage { + /** + * Saves a source of data bytes to a path. + * + * @param filePath The path where the file will be saved. + * @param data A [Flow] of [ByteArray] representing the file data. + * @return The byte size of the saved file. + * @throws PowerSyncException If an error occurs during the save operation. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun saveFile( + filePath: String, + data: Flow, + ): Long + + /** + * Reads a file from the specified path. + * + * @param filePath The path of the file to read. + * @param mediaType Optional media type of the file. + * @return A [Flow] of [ByteArray] representing the file data. + * @throws PowerSyncException If an error occurs during the read operation. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun readFile( + filePath: String, + mediaType: String? = null, + ): Flow + + /** + * Deletes a file at the specified path. + * + * @param filePath The path of the file to delete. + * @throws PowerSyncException If an error occurs during the delete operation. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun deleteFile(filePath: String): Unit + + /** + * Checks if a file exists at the specified path. + * + * @param filePath The path of the file to check. + * @return `true` if the file exists, `false` otherwise. + * @throws PowerSyncException If an error occurs during the check. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun fileExists(filePath: String): Boolean + + /** + * Creates a directory at the specified path. + * + * @param path The path of the directory to create. + * @throws PowerSyncException If an error occurs during the directory creation. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun makeDir(path: String): Unit + + /** + * Removes a directory at the specified path. + * + * @param path The path of the directory to remove. + * @throws PowerSyncException If an error occurs during the directory removal. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun rmDir(path: String): Unit + + /** + * Copies a file from the source path to the target path. + * + * @param sourcePath The path of the source file. + * @param targetPath The path where the file will be copied to. + * @throws PowerSyncException If an error occurs during the copy operation. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun copyFile( + sourcePath: String, + targetPath: String, + ): Unit +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/README.md b/core/src/commonMain/kotlin/com/powersync/attachments/README.md new file mode 100644 index 00000000..66b6de8d --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/README.md @@ -0,0 +1,267 @@ +# PowerSync Attachment Helpers + +A [PowerSync](https://powersync.com) library to manage attachments (such as images or files) in Kotlin Multiplatform apps. + +This package is included in the PowerSync Core module. + +### Alpha Release + +Attachment helpers are currently in an alpha state, intended strictly for testing. Expect breaking changes and instability as development continues. + +Do not rely on this package for production use. + +## Usage + +An `AttachmentQueue` is used to manage and sync attachments in your app. The attachments' state is +stored in a local-only attachments table. + +### Key Assumptions + +- Each attachment is identified by a unique ID +- Attachments are immutable once created +- Relational data should reference attachments using a foreign key column +- Relational data should reflect the holistic state of attachments at any given time. An existing local attachment will deleted locally if no relational data references it. + +### Example Implementation + +See the [Android Supabase Demo](/demos/android-supabase-todolist/README.md) for a basic example of attachment syncing. + +In the example below, the user captures photos when checklist items are completed as part of an +inspection workflow. + +1. First, define your schema including the `checklist` table and the local-only attachments table: + +```kotlin +val checklists = Table( + name = "checklists", + columns = + listOf( + Column.text("description"), + Column.integer("completed"), + Column.text("photo_id"), + ), +) + +val schema = Schema( + UserRow.table, + // Add the local-only table which stores attachment states + // Learn more about this function below + createAttachmentsTable("attachments") +) +``` + +2. Create an `AttachmentQueue` instance. This class provides +default syncing utilities and implements a default sync strategy. This class is open and can be overridden for custom functionality: + +```kotlin +val queue = AttachmentQueue( + db = db, + attachmentsDirectory = attachmentsDirectory, + remoteStorage = SupabaseRemoteStorage(supabase), + watchAttachments = { + db.watch( + sql = """ + SELECT photo_id + FROM checklists + WHERE photo_id IS NOT NULL + """, + ) { + WatchedAttachmentItem(id = it.getString("photo_id"), fileExtension = "jpg") + } + } +) +``` + +* The `attachmentsDirectory`, specifies where local attachment files should be stored. This directory needs to be provided to the constructor. On Android + `"${applicationContext.filesDir.canonicalPath}/attachments"` is a good choice. +* The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` interface + definition [here](https://github.com/powersync-ja/powersync-kotlin/blob/main/core/src/commonMain/kotlin/com.powersync/attachments/RemoteStorageAdapter.ts). +* `watchAttachments` is a `Flow` of `WatchedAttachmentItem`. The `WatchedAttachmentItem`s represent the attachments which should be present in the + application. We recommend using `PowerSync`'s `watch` query as shown above. In this example we provide the `fileExtension` for all photos. This information could also be + obtained from the query if necessary. + +3. Implement a `RemoteStorageAdapter` which interfaces with a remote storage provider. This will be + used for downloading, uploading and deleting attachments: + +```kotlin +val remote = object : RemoteStorage() { + override suspend fun uploadFile(fileData: Flow, attachment: Attachment) { + TODO("Implement upload to your backend") + } + + override suspend fun downloadFile(attachment: Attachment): Flow { + TODO("Implement download from your backend") + } + + override suspend fun deleteFile(attachment: Attachment) { + TODO("Implement delete in your backend") + } +} +``` + +4. Start the sync process: + +```kotlin +queue.startSync() +``` + +5. Create and save attachments using `saveFile()`. This method will + save the file to the local storage, create an attachment record which queues the file for upload + to the remote storage and allows assigning the newly created attachment ID to a checklist item: + +```kotlin +queue.saveFile( + data = flowOf(ByteArray(1)), // Your attachment data + mediaType = "image/jpg", + fileExtension = "jpg", +) { tx, attachment -> + /** + * This lambda is invoked in the same transaction which creates the attachment record. + * Assignments of the newly created photo_id should be done in the same transaction for maximum efficiency. + */ + tx.execute( + """ + UPDATE checklists + SET photo_id = ? + WHERE id = ? + """, + listOf(attachment.id, checklistId), + ) +} +``` + +## Implementation Details + +### Attachment Table Structure + +The `createAttachmentsTable` function creates a local-only table for tracking attachment states. + +An attachments table definition can be created with the following options: + +| Option | Description | Default | +|--------|-----------------------|---------------| +| `name` | The name of the table | `attachments` | + +The default columns are: + +| Column Name | Type | Description | +|--------------|-----------|--------------------------------------------------------------------------------------------------------------------| +| `id` | `TEXT` | Unique identifier for the attachment | +| `filename` | `TEXT` | The filename of the attachment | +| `media_type` | `TEXT` | The media type of the attachment | +| `state` | `INTEGER` | Current state of the attachment (see `AttachmentState` enum) | +| `timestamp` | `INTEGER` | The timestamp of last update to the attachment | +| `size` | `INTEGER` | File size in bytes | +| `has_synced` | `INTEGER` | Internal flag tracking if the attachment has ever been synced (used for caching) | +| `meta_data` | `TEXT` | Additional metadata in JSON format | + +### Attachment States + +Attachments are managed through the following states: + +| State | Description | +|-------------------|-------------------------------------------------------------------------------| +| `QUEUED_UPLOAD` | Attachment is queued for upload to cloud storage | +| `QUEUED_DELETE` | Attachment is queued for deletion from cloud storage and local storage | +| `QUEUED_DOWNLOAD` | Attachment is queued for download from cloud storage | +| `SYNCED` | Attachment is fully synced | +| `ARCHIVED` | Attachment is orphaned - i.e. no longer referenced by any data | + +### Sync Process + +The `AttachmentQueue` implements a sync process with these components: + +1. **State Monitoring**: The queue watches the attachments table for records in `QUEUED_UPLOAD`, `QUEUED_DELETE`, and `QUEUED_DOWNLOAD` states. An event loop triggers calls to the remote storage for these operations. + +2. **Periodic Sync**: By default, the queue triggers a sync every 30 seconds to retry failed uploads/downloads, in particular after the app was offline. This interval can be configured by setting `syncInterval` in the `AttachmentQueue` constructor options, or disabled by setting the interval to `0`. + +3. **Watching State**: The `watchAttachments` flow generator in the `AttachmentQueue` constructor is used to maintain consistency between local and remote states: + - New items trigger downloads - see the Download Process below. + - Missing items trigger archiving - see Cache Management below. + +### Upload Process + +The `saveFile` method handles attachment creation and upload: + +1. The attachment is saved to local storage +2. An `AttachmentRecord` is created with `QUEUED_UPLOAD` state, linked to the local file using `localURI` +3. The attachment must be assigned to relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +4. The `RemoteStorage` `uploadFile` function is called +5. On successful upload, the state changes to `SYNCED` +6. If upload fails, the record stays in `QUEUED_UPLOAD` state for retry + +### Download Process + +Attachments are scheduled for download when the flow from `watchAttachments` emits a new item that is not present locally: + +1. An `AttachmentRecord` is created with `QUEUED_DOWNLOAD` state +2. The `RemoteStorage` `downloadFile` function is called +3. The received data is saved to local storage +4. On successful download, the state changes to `SYNCED` +5. If download fails, the operation is retried in the next sync cycle + +### Delete Process + +The `deleteFile` method deletes attachments from both local and remote storage: + +1. The attachment record moves to `QUEUED_DELETE` state +2. The attachment must be unassigned from relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +3. On successful deletion, the record is removed +4. If deletion fails, the operation is retried in the next sync cycle + +### Cache Management + +The `AttachmentQueue` implements a caching system for archived attachments: + +1. Local attachments are marked as `ARCHIVED` if the flow from `watchAttachments` no longer references them +2. Archived attachments are kept in the cache for potential future restoration +3. The cache size is controlled by the `archivedCacheLimit` parameter in the `AttachmentQueue` constructor +4. By default, the queue keeps the last 100 archived attachment records +5. When the cache limit is reached, the oldest archived attachments are permanently deleted +6. If an archived attachment is referenced again while still in the cache, it can be restored +7. The cache limit can be configured in the `AttachmentQueue` constructor + +### Error Handling + +1. **Automatic Retries**: + - Failed uploads/downloads/deletes are automatically retried + - The sync interval (default 30 seconds) ensures periodic retry attempts + - Retries continue indefinitely until successful + +2. **Custom Error Handling**: + - A `SyncErrorHandler` can be implemented to customize retry behavior (see example below) + - The handler can decide whether to retry or archive failed operations + - Different handlers can be provided for upload, download, and delete operations + + +Example of a custom `SyncErrorHandler`: + +```kotlin +val errorHandler = object : SyncErrorHandler { + override suspend fun onDownloadError( + attachment: Attachment, + exception: Exception + ): Boolean { + TODO("Return if the attachment sync should be retried") + } + + override suspend fun onUploadError( + attachment: Attachment, + exception: Exception + ): Boolean { + TODO("Return if the attachment sync should be retried") + } + + override suspend fun onDeleteError( + attachment: Attachment, + exception: Exception + ): Boolean { + TODO("Return if the attachment sync should be retried") + } +} + +val queue = AttachmentQueue( + // ... other parameters ... + errorHandler = errorHandler +) +``` \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/RemoteStorage.kt b/core/src/commonMain/kotlin/com/powersync/attachments/RemoteStorage.kt new file mode 100644 index 00000000..0a700224 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/RemoteStorage.kt @@ -0,0 +1,34 @@ +package com.powersync.attachments + +import kotlinx.coroutines.flow.Flow + +/** + * Adapter for interfacing with remote attachment storage. + */ +public interface RemoteStorage { + /** + * Uploads a file to remote storage. + * + * @param fileData The file data as a flow of byte arrays. + * @param attachment The attachment record associated with the file. + */ + public suspend fun uploadFile( + fileData: Flow, + attachment: Attachment, + ): Unit + + /** + * Downloads a file from remote storage. + * + * @param attachment The attachment record associated with the file. + * @return A flow of byte arrays representing the file data. + */ + public suspend fun downloadFile(attachment: Attachment): Flow + + /** + * Deletes a file from remote storage. + * + * @param attachment The attachment record associated with the file. + */ + public suspend fun deleteFile(attachment: Attachment) +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/SyncErrorHandler.kt b/core/src/commonMain/kotlin/com/powersync/attachments/SyncErrorHandler.kt new file mode 100644 index 00000000..f32b9193 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/SyncErrorHandler.kt @@ -0,0 +1,44 @@ +package com.powersync.attachments + +/** + * Interface for handling errors during attachment operations. + * Implementations determine whether failed operations should be retried. + * Attachment records are archived if an operation fails and should not be retried. + */ +public interface SyncErrorHandler { + /** + * Determines whether the provided attachment download operation should be retried. + * + * @param attachment The attachment involved in the failed download operation. + * @param exception The exception that caused the download failure. + * @return `true` if the download operation should be retried, `false` otherwise. + */ + public suspend fun onDownloadError( + attachment: Attachment, + exception: Exception, + ): Boolean + + /** + * Determines whether the provided attachment upload operation should be retried. + * + * @param attachment The attachment involved in the failed upload operation. + * @param exception The exception that caused the upload failure. + * @return `true` if the upload operation should be retried, `false` otherwise. + */ + public suspend fun onUploadError( + attachment: Attachment, + exception: Exception, + ): Boolean + + /** + * Determines whether the provided attachment delete operation should be retried. + * + * @param attachment The attachment involved in the failed delete operation. + * @param exception The exception that caused the delete failure. + * @return `true` if the delete operation should be retried, `false` otherwise. + */ + public suspend fun onDeleteError( + attachment: Attachment, + exception: Exception, + ): Boolean +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt new file mode 100644 index 00000000..a013111f --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt @@ -0,0 +1,194 @@ +package com.powersync.attachments.implementation + +import co.touchlab.kermit.Logger +import com.powersync.PowerSyncDatabase +import com.powersync.attachments.Attachment +import com.powersync.attachments.AttachmentContext +import com.powersync.attachments.AttachmentState +import com.powersync.db.getString +import com.powersync.db.internal.ConnectionContext +import kotlinx.datetime.Clock +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json + +public open class AttachmentContextImpl( + public val db: PowerSyncDatabase, + public val table: String, + private val logger: Logger, + private val maxArchivedCount: Long, +) : AttachmentContext { + /** + * Delete the attachment from the attachment queue. + */ + public override suspend fun deleteAttachment(id: String) { + db.execute("DELETE FROM $table WHERE id = ?", listOf(id)) + } + + /** + * Set the state of the attachment to ignore. + */ + public override suspend fun ignoreAttachment(id: String) { + db.execute( + "UPDATE $table SET state = ? WHERE id = ?", + listOf(AttachmentState.ARCHIVED.ordinal, id), + ) + } + + /** + * Get the attachment from the attachment queue using an ID. + */ + public override suspend fun getAttachment(id: String): Attachment? = + db.getOptional("SELECT * FROM $table WHERE id = ?", listOf(id)) { + Attachment.fromCursor(it) + } + + /** + * Save the attachment to the attachment queue. + */ + public override suspend fun saveAttachment(attachment: Attachment): Attachment = + db.writeLock { ctx -> + upsertAttachment(attachment, ctx) + } + + /** + * Save the attachments to the attachment queue. + */ + public override suspend fun saveAttachments(attachments: List) { + if (attachments.isEmpty()) { + return + } + + db.writeTransaction { tx -> + for (attachment in attachments) { + upsertAttachment(attachment, tx) + } + } + } + + /** + * Get all the ID's of attachments in the attachment queue. + */ + public override suspend fun getAttachmentIds(): List = + db.getAll( + "SELECT id FROM $table WHERE id IS NOT NULL", + ) { it.getString("name") } + + public override suspend fun getAttachments(): List = + db.getAll( + """ + SELECT + * + FROM + $table + WHERE + id IS NOT NULL + ORDER BY + timestamp ASC + """, + ) { Attachment.fromCursor(it) } + + /** + * Gets all the active attachments which require an operation to be performed. + */ + public override suspend fun getActiveAttachments(): List = + db.getAll( + """ + SELECT + * + FROM + $table + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + """, + listOf( + AttachmentState.QUEUED_UPLOAD.ordinal, + AttachmentState.QUEUED_DOWNLOAD.ordinal, + AttachmentState.QUEUED_DELETE.ordinal, + ), + ) { Attachment.fromCursor(it) } + + /** + * Helper function to clear the attachment queue + * Currently only used for testing purposes. + */ + public override suspend fun clearQueue() { + logger.i("Clearing attachment queue...") + db.execute("DELETE FROM $table") + } + + /** + * Delete attachments which have been archived + * @returns true if all items have been deleted. Returns false if there might be more archived + * items remaining. + */ + public override suspend fun deleteArchivedAttachments(callback: suspend (attachments: List) -> Unit): Boolean { + // First fetch the attachments in order to allow other cleanup + val limit = 1000 + val attachments = + db.getAll( + """ + SELECT + * + FROM + $table + WHERE + state = ? + ORDER BY + timestamp DESC + LIMIT ? OFFSET ? + """, + listOf( + AttachmentState.ARCHIVED.ordinal, + limit, + maxArchivedCount, + ), + ) { Attachment.fromCursor(it) } + callback(attachments) + db.execute( + "DELETE FROM $table WHERE id IN (SELECT value FROM json_each(?));", + listOf( + Json.encodeToString(attachments.map { it.id }), + ), + ) + return attachments.size < limit + } + + /** + * Upserts an attachment record synchronously given a database connection context. + */ + public override fun upsertAttachment( + attachment: Attachment, + context: ConnectionContext, + ): Attachment { + val updatedRecord = + attachment.copy( + timestamp = Clock.System.now().toEpochMilliseconds(), + ) + + context.execute( + """ + INSERT OR REPLACE INTO + $table (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + listOf( + updatedRecord.id, + updatedRecord.timestamp, + updatedRecord.filename, + updatedRecord.localUri, + updatedRecord.mediaType, + updatedRecord.size, + updatedRecord.state.ordinal, + updatedRecord.hasSynced, + updatedRecord.metaData, + ), + ) + + return attachment + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentServiceImpl.kt b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentServiceImpl.kt new file mode 100644 index 00000000..c94c09eb --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentServiceImpl.kt @@ -0,0 +1,70 @@ +package com.powersync.attachments.implementation + +import co.touchlab.kermit.Logger +import com.powersync.PowerSyncDatabase +import com.powersync.attachments.AttachmentContext +import com.powersync.attachments.AttachmentService +import com.powersync.attachments.AttachmentState +import com.powersync.db.getString +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * Service for interacting with the local attachment records. + */ +public open class AttachmentServiceImpl( + private val db: PowerSyncDatabase, + private val tableName: String, + private val logger: Logger, + private val maxArchivedCount: Long, +) : AttachmentService { + /** + * Table used for storing attachments in the attachment queue. + */ + private val table: String + get() = tableName + + private val mutex = Mutex() + + private val context: AttachmentContext = + AttachmentContextImpl( + db = db, + table = table, + logger = logger, + maxArchivedCount = maxArchivedCount, + ) + + public override suspend fun withContext(action: suspend (AttachmentContext) -> R): R = mutex.withLock { action(context) } + + /** + * Watcher for changes to attachments table. + * Once a change is detected it will initiate a sync of the attachments + */ + public override fun watchActiveAttachments(): Flow { + logger.i("Watching attachments...") + return db + .watch( + """ + SELECT + id + FROM + $table + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + """, + listOf( + AttachmentState.QUEUED_UPLOAD.ordinal, + AttachmentState.QUEUED_DOWNLOAD.ordinal, + AttachmentState.QUEUED_DELETE.ordinal, + ), + ) { it.getString("id") } + // We only use changes here to trigger a sync consolidation + .map { Unit } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/storage/IOLocalStorageAdapter.kt b/core/src/commonMain/kotlin/com/powersync/attachments/storage/IOLocalStorageAdapter.kt new file mode 100644 index 00000000..e194be2f --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/storage/IOLocalStorageAdapter.kt @@ -0,0 +1,117 @@ +package com.powersync.attachments.storage + +import com.powersync.attachments.LocalStorage +import com.powersync.db.runWrappedSuspending +import io.ktor.utils.io.core.remaining +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.withContext +import kotlinx.io.Buffer +import kotlinx.io.buffered +import kotlinx.io.files.FileSystem +import kotlinx.io.files.Path +import kotlinx.io.files.SystemFileSystem +import kotlinx.io.readByteArray + +/** + * Storage adapter for local storage using the KotlinX IO library + */ +public class IOLocalStorageAdapter : LocalStorage { + private val fileSystem: FileSystem = SystemFileSystem + + public override suspend fun saveFile( + filePath: String, + data: Flow, + ): Long = + runWrappedSuspending { + withContext(Dispatchers.IO) { + var totalSize = 0L + fileSystem.sink(Path(filePath)).use { sink -> + // Copy to a buffer in order to write + Buffer().use { buffer -> + data.collect { chunk -> + // Copy into a buffer in order to sink the chunk + buffer.write(chunk, 0) + val chunkSize = chunk.size.toLong() + totalSize += chunkSize + sink.write(buffer, chunkSize) + } + } + sink.flush() + return@withContext totalSize + } + } + } + + public override suspend fun readFile( + filePath: String, + mediaType: String?, + ): Flow = + flow { + fileSystem.source(Path(filePath)).use { source -> + source.buffered().use { bufferedSource -> + var remaining = 0L + val bufferSize = 8192L + do { + bufferedSource.request(bufferSize) + remaining = bufferedSource.remaining + emit(bufferedSource.readByteArray(remaining.toInt())) + } while (remaining > 0) + } + } + }.flowOn(Dispatchers.IO) + + public override suspend fun deleteFile(filePath: String): Unit = + runWrappedSuspending { + withContext(Dispatchers.IO) { + fileSystem.delete(Path(filePath)) + } + } + + public override suspend fun fileExists(filePath: String): Boolean = + runWrappedSuspending { + withContext(Dispatchers.IO) { + fileSystem.exists(Path(filePath)) + } + } + + public override suspend fun makeDir(path: String): Unit = + runWrappedSuspending { + withContext(Dispatchers.IO) { + fileSystem.createDirectories(Path(path)) + } + } + + public override suspend fun rmDir(path: String): Unit = + runWrappedSuspending { + withContext(Dispatchers.IO) { + for (item in fileSystem.list(Path(path))) { + // Can't delete directories with files in them. Need to go down the file tree + // and clear the directory. + val meta = fileSystem.metadataOrNull(item) + if (meta?.isDirectory == true) { + rmDir(item.toString()) + } else if (meta?.isRegularFile == true) { + fileSystem.delete(item) + } + } + } + } + + public override suspend fun copyFile( + sourcePath: String, + targetPath: String, + ): Unit = + runWrappedSuspending { + withContext(Dispatchers.IO) { + fileSystem.source(Path(sourcePath)).use { source -> + fileSystem.sink(Path(targetPath)).use { sink -> + source.buffered().transferTo(sink.buffered()) + } + } + } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt new file mode 100644 index 00000000..cd9af5a7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt @@ -0,0 +1,311 @@ +package com.powersync.attachments.sync + +import co.touchlab.kermit.Logger +import com.powersync.PowerSyncException +import com.powersync.attachments.Attachment +import com.powersync.attachments.AttachmentContext +import com.powersync.attachments.AttachmentService +import com.powersync.attachments.AttachmentState +import com.powersync.attachments.LocalStorage +import com.powersync.attachments.RemoteStorage +import com.powersync.attachments.SyncErrorHandler +import com.powersync.utils.throttle +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * Service responsible for syncing attachments between local and remote storage. + * + * This service handles downloading, uploading, and deleting attachments, as well as + * periodically syncing attachment states. It ensures proper lifecycle management + * of sync operations and provides mechanisms for error handling and retries. + * + * @property remoteStorage The remote storage implementation for handling file operations. + * @property localStorage The local storage implementation for managing files locally. + * @property attachmentsService The service for managing attachment states and operations. + * @property getLocalUri A function to resolve the local URI for a given filename. + * @property errorHandler Optional error handler for managing sync-related errors. + * @property logger Logger instance for logging sync operations and errors. + * @property syncThrottle The minimum duration between consecutive sync operations. + * @property scope The coroutine scope used for managing sync operations. + */ +public class SyncingService + @OptIn(DelicateCoroutinesApi::class) + constructor( + private val remoteStorage: RemoteStorage, + private val localStorage: LocalStorage, + private val attachmentsService: AttachmentService, + private val getLocalUri: suspend (String) -> String, + private val errorHandler: SyncErrorHandler?, + private val logger: Logger, + private val scope: CoroutineScope, + private val syncThrottle: Duration = 5.seconds, + ) { + private val mutex = Mutex() + private var syncJob: Job? = null + + /** + * Used to trigger the sync process either manually or periodically + */ + private val syncTriggerFlow = MutableSharedFlow(replay = 0) + + /** + * Starts the syncing process, including periodic and event-driven sync operations. + * + * @param period The interval at which periodic sync operations are triggered. + */ + public suspend fun startSync(period: Duration = 30.seconds): Unit = + mutex.withLock { + syncJob?.cancelAndJoin() + + syncJob = + scope.launch { + launch { + merge( + // Handles manual triggers for sync events + syncTriggerFlow.asSharedFlow(), + // Triggers the sync process whenever an underlaying change to the + // attachments table happens + attachmentsService + .watchActiveAttachments(), + ) + // We only use these flows to trigger the process. We can skip multiple invocations + // while we are processing. We will always process on the trailing edge. + // This buffer operation should automatically be applied to all merged sources. + .buffer(1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + .throttle(syncThrottle) + .collect { + attachmentsService.withContext { context -> + /** + * Gets and performs the operations for active attachments which are + * pending download, upload, or delete. + */ + try { + val attachments = context.getActiveAttachments() + // Performs pending operations and updates attachment states + handleSync(attachments, context) + + // Cleanup archived attachments + deleteArchivedAttachments(context) + } catch (ex: Exception) { + if (ex is CancellationException) { + throw ex + } + // Rare exceptions caught here will be swallowed and retried on the + // next tick. + logger.e("Caught exception when processing attachments $ex") + } + } + } + } + + launch { + logger.i("Periodically syncing attachments") + while (true) { + syncTriggerFlow.emit(Unit) + delay(period) + } + } + } + } + + /** + * Enqueues a sync operation + */ + public suspend fun triggerSync() { + syncTriggerFlow.emit(Unit) + } + + /** + * Stops all ongoing sync operations. + */ + public suspend fun stopSync(): Unit = + mutex.withLock { + syncJob?.cancelAndJoin() + } + + /** + * Closes the syncing service, stopping all operations and releasing resources. + */ + public suspend fun close() { + stopSync() + } + + /** + * Handles syncing operations for a list of attachments, including downloading, + * uploading, and deleting files based on their states. + * + * @param attachments The list of attachments to process. + * @param context The attachment context used for managing attachment states. + */ + private suspend fun handleSync( + attachments: List, + context: AttachmentContext, + ) { + val updatedAttachments = mutableListOf() + try { + for (attachment in attachments) { + when (attachment.state) { + AttachmentState.QUEUED_DOWNLOAD -> { + logger.i("Downloading ${attachment.filename}") + updatedAttachments.add(downloadAttachment(attachment)) + } + + AttachmentState.QUEUED_UPLOAD -> { + logger.i("Uploading ${attachment.filename}") + updatedAttachments.add(uploadAttachment(attachment)) + } + + AttachmentState.QUEUED_DELETE -> { + logger.i("Deleting ${attachment.filename}") + updatedAttachments.add(deleteAttachment(attachment)) + } + + AttachmentState.SYNCED -> {} + AttachmentState.ARCHIVED -> {} + } + } + + // Update the state of processed attachments + context.saveAttachments(updatedAttachments) + } catch (error: Exception) { + // We retry, on the next invocation, whenever there are errors on this level + logger.e("Error during sync: ${error.message}") + } + } + + /** + * Uploads an attachment from local storage to remote storage. + * + * @param attachment The attachment to upload. + * @return The updated attachment with its new state. + */ + private suspend fun uploadAttachment(attachment: Attachment): Attachment { + try { + if (attachment.localUri == null) { + throw PowerSyncException( + "No localUri for attachment $attachment", + cause = Exception("attachment.localUri == null"), + ) + } + + remoteStorage.uploadFile( + localStorage.readFile(attachment.localUri), + attachment, + ) + logger.i("Uploaded attachment \"${attachment.id}\" to Cloud Storage") + return attachment.copy(state = AttachmentState.SYNCED, hasSynced = 1) + } catch (e: Exception) { + logger.e("Upload attachment error for attachment $attachment: ${e.message}") + if (errorHandler != null) { + val shouldRetry = errorHandler.onUploadError(attachment, e) + if (!shouldRetry) { + logger.i("Attachment with ID ${attachment.id} has been archived") + return attachment.copy(state = AttachmentState.ARCHIVED) + } + } + + // Retry the upload (same state) + return attachment + } + } + + /** + * Downloads an attachment from remote storage and saves it to local storage. + * + * @param attachment The attachment to download. + * @return The updated attachment with its new state. + */ + private suspend fun downloadAttachment(attachment: Attachment): Attachment { + /** + * When downloading an attachment we take the filename and resolve + * the local_uri where the file will be stored + */ + val attachmentPath = getLocalUri(attachment.filename) + + try { + val fileFlow = remoteStorage.downloadFile(attachment) + localStorage.saveFile(attachmentPath, fileFlow) + logger.i("Downloaded file \"${attachment.id}\"") + + // The attachment has been downloaded locally + return attachment.copy( + localUri = attachmentPath, + state = AttachmentState.SYNCED, + hasSynced = 1, + ) + } catch (e: Exception) { + if (errorHandler != null) { + val shouldRetry = errorHandler.onDownloadError(attachment, e) + if (!shouldRetry) { + logger.i("Attachment with ID ${attachment.id} has been archived") + return attachment.copy(state = AttachmentState.ARCHIVED) + } + } + + logger.e("Download attachment error for attachment $attachment: ${e.message}") + // Return the same state, this will cause a retry + return attachment + } + } + + /** + * Deletes an attachment from remote and local storage, and removes it from the queue. + * + * @param attachment The attachment to delete. + * @return The updated attachment with its new state. + */ + private suspend fun deleteAttachment(attachment: Attachment): Attachment { + try { + remoteStorage.deleteFile(attachment) + if (attachment.localUri != null && localStorage.fileExists(attachment.localUri)) { + localStorage.deleteFile(attachment.localUri) + } + return attachment.copy(state = AttachmentState.ARCHIVED) + } catch (e: Exception) { + if (errorHandler != null) { + val shouldRetry = errorHandler.onDeleteError(attachment, e) + if (!shouldRetry) { + logger.i("Attachment with ID ${attachment.id} has been archived") + return attachment.copy(state = AttachmentState.ARCHIVED) + } + } + // We'll retry this + logger.e("Error deleting attachment: ${e.message}") + return attachment + } + } + + /** + * Deletes archived attachments from local storage. + * + * @param context The attachment context used to retrieve and manage archived attachments. + * @return `true` if all archived attachments were successfully deleted, `false` otherwise. + */ + public suspend fun deleteArchivedAttachments(context: AttachmentContext): Boolean = + context.deleteArchivedAttachments { pendingDelete -> + for (attachment in pendingDelete) { + if (attachment.localUri == null) { + continue + } + if (!localStorage.fileExists(attachment.localUri)) { + continue + } + localStorage.deleteFile(attachment.localUri) + } + } + } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 87209f4f..504d8e16 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -46,6 +46,7 @@ import kotlinx.datetime.LocalDateTime import kotlinx.datetime.TimeZone import kotlinx.datetime.toInstant import kotlinx.serialization.encodeToString +import kotlin.time.Duration.Companion.milliseconds /** * A PowerSync managed database. @@ -237,7 +238,7 @@ internal class PowerSyncDatabaseImpl( internalDb .updatesOnTables() .filter { it.contains(InternalTable.CRUD.toString()) } - .throttle(crudThrottleMs) + .throttle(crudThrottleMs.milliseconds) .collect { stream.triggerCrudUploadAsync().join() } @@ -509,15 +510,17 @@ internal class PowerSyncDatabaseImpl( } override suspend fun close() = - mutex.withLock { - if (closed) { - return@withLock + runWrappedSuspending { + mutex.withLock { + if (closed) { + return@withLock + } + initializeJob.cancelAndJoin() + disconnectInternal() + internalDb.close() + resource.dispose() + closed = true } - initializeJob.cancelAndJoin() - disconnectInternal() - internalDb.close() - resource.dispose() - closed = true } /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index ca577ca4..edf5130d 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString +import kotlin.time.Duration.Companion.milliseconds @OptIn(FlowPreview::class) internal class InternalDatabaseImpl( @@ -55,7 +56,7 @@ internal class InternalDatabaseImpl( private val dbContext = Dispatchers.IO companion object { - const val DEFAULT_WATCH_THROTTLE_MS = 30L + val DEFAULT_WATCH_THROTTLE = 30.milliseconds } override suspend fun execute( @@ -133,7 +134,7 @@ internal class InternalDatabaseImpl( // still trigger a trailing edge update. // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. // Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results. - .throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) + .throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE) .collect { send(getAll(sql, parameters = parameters, mapper = mapper)) } @@ -281,6 +282,7 @@ internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatemen is Boolean -> bindBoolean(index, parameter) is String -> bindString(index, parameter) is Long -> bindLong(index, parameter) + is Int -> bindLong(index, parameter.toLong()) is Double -> bindDouble(index, parameter) is ByteArray -> bindBytes(index, parameter) else -> { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 3f07e0b5..1f6e00d6 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -81,11 +81,6 @@ internal class SyncStream( invalidCredentials = false } streamingSyncIteration() -// val state = streamingSyncIteration() -// TODO: We currently always retry -// if (!state.retry) { -// break; -// } } catch (e: Exception) { if (e is CancellationException) { throw e diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt index 711b2cb4..82d5c976 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flow +import kotlin.time.Duration /** * Throttles a flow with emissions on the leading and trailing edge. @@ -13,7 +14,7 @@ import kotlinx.coroutines.flow.flow * This throttle method acts as a slow consumer, but backpressure is not a concern * due to the conflated buffer dropping events during the throttle window. */ -internal fun Flow.throttle(windowMs: Long): Flow = +internal fun Flow.throttle(window: Duration): Flow = flow { // Use a buffer before throttle (ensure only the latest event is kept) val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) @@ -23,7 +24,7 @@ internal fun Flow.throttle(windowMs: Long): Flow = emit(value) // Delay for the throttle window to avoid emitting too frequently - delay(windowMs) + delay(window) // The next incoming event will be provided from the buffer. // The next collect will emit the trailing edge diff --git a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt index aa2611e1..46182096 100644 --- a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt @@ -23,6 +23,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds class JsonTest { @Test @@ -53,7 +54,7 @@ class JsonTest { emit(3) delay(100) emit(4) - }.throttle(100) + }.throttle(100.milliseconds) .map { // Adding a delay here to simulate a slow consumer delay(1000) diff --git a/core/src/jvmTest/kotlin/com/powersync/testutils/TestUtils.jvm.kt b/core/src/jvmTest/kotlin/com/powersync/testutils/TestUtils.jvm.kt index 3b53926a..9296387a 100644 --- a/core/src/jvmTest/kotlin/com/powersync/testutils/TestUtils.jvm.kt +++ b/core/src/jvmTest/kotlin/com/powersync/testutils/TestUtils.jvm.kt @@ -10,4 +10,6 @@ actual fun cleanup(path: String) { File(path).delete() } -actual fun getTempDir(): String? = System.getProperty("java.io.tmpdir") +actual fun getTempDir(): String = System.getProperty("java.io.tmpdir") + +actual fun isIOS(): Boolean = false diff --git a/demos/android-supabase-todolist/app/build.gradle.kts b/demos/android-supabase-todolist/app/build.gradle.kts index 7b24a362..a8011368 100644 --- a/demos/android-supabase-todolist/app/build.gradle.kts +++ b/demos/android-supabase-todolist/app/build.gradle.kts @@ -13,9 +13,10 @@ if (localPropertiesFile.exists()) { localPropertiesFile.inputStream().use { localProperties.load(it) } } -fun getLocalProperty(key: String, defaultValue: String): String { - return localProperties.getProperty(key, defaultValue) -} +fun getLocalProperty( + key: String, + defaultValue: String, +): String = localProperties.getProperty(key, defaultValue) android { namespace = "com.powersync.androidexample" @@ -38,7 +39,16 @@ android { } buildConfigField("String", "SUPABASE_URL", "\"${getLocalProperty("SUPABASE_URL", "")}\"") - buildConfigField("String", "SUPABASE_ANON_KEY", "\"${getLocalProperty("SUPABASE_ANON_KEY", "")}\"") + buildConfigField( + "String", + "SUPABASE_ANON_KEY", + "\"${getLocalProperty("SUPABASE_ANON_KEY", "")}\"", + ) + buildConfigField( + "String", + "SUPABASE_STORAGE_BUCKET", + "\"${getLocalProperty("SUPABASE_STORAGE_BUCKET", "null")}\"", + ) buildConfigField("String", "POWERSYNC_URL", "\"${getLocalProperty("POWERSYNC_URL", "")}\"") } @@ -47,7 +57,7 @@ android { isMinifyEnabled = false proguardFiles( getDefaultProguardFile("proguard-android-optimize.txt"), - "proguard-rules.pro" + "proguard-rules.pro", ) } } @@ -96,4 +106,5 @@ dependencies { implementation(libs.uuid) implementation(libs.kermit) implementation(libs.androidx.material.icons.extended) -} \ No newline at end of file + implementation("androidx.activity:activity-ktx:1.9.0") +} diff --git a/demos/android-supabase-todolist/app/src/main/AndroidManifest.xml b/demos/android-supabase-todolist/app/src/main/AndroidManifest.xml index 622607af..844bfbac 100644 --- a/demos/android-supabase-todolist/app/src/main/AndroidManifest.xml +++ b/demos/android-supabase-todolist/app/src/main/AndroidManifest.xml @@ -11,7 +11,9 @@ android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/Theme.App.Starting" - tools:targetApi="31"> + tools:targetApi="31" + android:networkSecurityConfig="@xml/network_security_config" + > + + + \ No newline at end of file diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/App.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/App.kt index 637062c6..70120158 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/App.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/App.kt @@ -11,10 +11,14 @@ import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember import androidx.compose.runtime.rememberUpdatedState import androidx.compose.ui.Modifier -import com.powersync.androidexample.BuildConfig import com.powersync.PowerSyncDatabase +import com.powersync.androidexample.ui.CameraService +import com.powersync.attachments.AttachmentQueue +import com.powersync.attachments.WatchedAttachmentItem import com.powersync.compose.rememberDatabaseDriverFactory import com.powersync.connector.supabase.SupabaseConnector +import com.powersync.connector.supabase.SupabaseRemoteStorage +import com.powersync.db.getString import com.powersync.demos.components.EditDialog import com.powersync.demos.powersync.ListContent import com.powersync.demos.powersync.ListItem @@ -25,26 +29,60 @@ import com.powersync.demos.screens.SignInScreen import com.powersync.demos.screens.SignUpScreen import com.powersync.demos.screens.TodosScreen import kotlinx.coroutines.runBlocking - +import com.powersync.androidexample.BuildConfig @Composable -fun App() { +fun App( + cameraService: CameraService, + attachmentDirectory: String, +) { val driverFactory = rememberDatabaseDriverFactory() - val supabase = remember { - SupabaseConnector( - powerSyncEndpoint = BuildConfig.POWERSYNC_URL, - supabaseUrl = BuildConfig.SUPABASE_URL, - supabaseKey = BuildConfig.SUPABASE_ANON_KEY - ) - } + val supabase = + remember { + SupabaseConnector( + powerSyncEndpoint = BuildConfig.POWERSYNC_URL, + supabaseUrl = BuildConfig.SUPABASE_URL, + supabaseKey = BuildConfig.SUPABASE_ANON_KEY, + storageBucket = BuildConfig.SUPABASE_STORAGE_BUCKET, + ) + } + val db = remember { PowerSyncDatabase(driverFactory, schema) } + val attachments = + remember { + if (BuildConfig.SUPABASE_STORAGE_BUCKET != "null") { + AttachmentQueue( + db = db, + remoteStorage = SupabaseRemoteStorage(supabase), + attachmentsDirectory = attachmentDirectory, + watchAttachments = { + db.watch( + "SELECT photo_id from todos WHERE photo_id IS NOT NULL", + ) { + WatchedAttachmentItem( + id = it.getString("photo_id"), + fileExtension = "jpg", + ) + } + } + ) + } else { + null + } + } + val syncStatus = db.currentStatus val status by syncStatus.asFlow().collectAsState(syncStatus) - val navController = remember { NavController(Screen.Home) } - val authViewModel = remember { - AuthViewModel(supabase, db, navController) - } + val navController = + remember { + NavController(Screen.Home) + } + + val authViewModel = + remember { + AuthViewModel(supabase, db, navController, attachments) + } val authState by authViewModel.authState.collectAsState() val currentScreen by navController.currentScreen.collectAsState() @@ -59,9 +97,9 @@ fun App() { val items by lists.value.watchItems().collectAsState(initial = emptyList()) val listsInputText by lists.value.inputText.collectAsState() - val todos = remember { mutableStateOf(Todo(db, userId)) } + val todos = remember { mutableStateOf(Todo(db, attachments, userId)) } LaunchedEffect(currentUserId.value) { - todos.value = Todo(db, currentUserId.value) + todos.value = Todo(db, attachments, currentUserId.value) } val todoItems by todos.value.watchItems(selectedListId).collectAsState(initial = emptyList()) val editingItem by todos.value.editingItem.collectAsState() @@ -75,7 +113,7 @@ fun App() { when (currentScreen) { is Screen.Home -> { - if(authState == AuthState.SignedOut) { + if (authState == AuthState.SignedOut) { navController.navigate(Screen.SignIn) } @@ -121,29 +159,32 @@ fun App() { onCloseClicked = todos.value::onEditorCloseClicked, onTextChanged = todos.value::onEditorTextChanged, onDoneChanged = todos.value::onEditorDoneChanged, + onPhotoClear = todos.value::onPhotoDelete, + onPhotoCapture = { todos.value::onPhotoCapture.invoke(cameraService) }, + attachmentsSupported = attachments != null, ) } } is Screen.SignIn -> { - if(authState == AuthState.SignedIn) { + if (authState == AuthState.SignedIn) { navController.navigate(Screen.Home) } SignInScreen( navController, - authViewModel + authViewModel, ) } is Screen.SignUp -> { - if(authState == AuthState.SignedIn) { + if (authState == AuthState.SignedIn) { navController.navigate(Screen.Home) } SignUpScreen( navController, - authViewModel + authViewModel, ) } } diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/Auth.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/Auth.kt index 889b3456..598aa886 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/Auth.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/Auth.kt @@ -4,6 +4,7 @@ import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope import co.touchlab.kermit.Logger import com.powersync.PowerSyncDatabase +import com.powersync.attachments.AttachmentQueue import com.powersync.connector.supabase.SupabaseConnector import io.github.jan.supabase.auth.status.RefreshFailureCause import io.github.jan.supabase.auth.status.SessionStatus @@ -21,6 +22,7 @@ internal class AuthViewModel( private val supabase: SupabaseConnector, private val db: PowerSyncDatabase, private val navController: NavController, + private val attachmentsQueue: AttachmentQueue?, ) : ViewModel() { private val _authState = MutableStateFlow(AuthState.SignedOut) val authState: StateFlow = _authState @@ -35,9 +37,14 @@ internal class AuthViewModel( _authState.value = AuthState.SignedIn _userId.value = it.session.user?.id db.connect(supabase) - navController.navigate(Screen.Home) + attachmentsQueue?.startSync() + if (navController.currentScreen.value is Screen.SignIn || + navController.currentScreen.value is Screen.SignUp + ) { + navController.navigate(Screen.Home) + } } - SessionStatus.Initializing -> Logger.e("Loading from storage") + is SessionStatus.Initializing -> Logger.e("Loading from storage") is SessionStatus.RefreshFailure -> { when (it.cause) { is RefreshFailureCause.NetworkError -> Logger.e("Network error occurred") @@ -72,6 +79,8 @@ internal class AuthViewModel( suspend fun signOut() { try { + attachmentsQueue?.clearQueue() + attachmentsQueue?.close() supabase.signOut() } catch (e: Exception) { Logger.e("Error signing out: $e") diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/MainActivity.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/MainActivity.kt index 39e1eceb..7f2cef02 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/MainActivity.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/MainActivity.kt @@ -4,16 +4,22 @@ import android.os.Bundle import androidx.activity.ComponentActivity import androidx.activity.compose.setContent import androidx.core.splashscreen.SplashScreen.Companion.installSplashScreen +import com.powersync.androidexample.ui.CameraService import com.powersync.demos.App class MainActivity : ComponentActivity() { + private val cameraService = CameraService(this) + override fun onCreate(savedInstanceState: Bundle?) { installSplashScreen() super.onCreate(savedInstanceState) setContent { - App() + App( + cameraService = cameraService, + attachmentDirectory = "${applicationContext.filesDir.canonicalPath}/attachments", + ) } } -} \ No newline at end of file +} diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/components/EditDialog.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/components/EditDialog.kt index 6df9327d..6c4785db 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/components/EditDialog.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/components/EditDialog.kt @@ -1,8 +1,12 @@ package com.powersync.demos.components +import android.graphics.BitmapFactory +import androidx.compose.foundation.Image +import androidx.compose.foundation.clickable import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.IntrinsicSize +import androidx.compose.foundation.layout.PaddingValues import androidx.compose.foundation.layout.Row import androidx.compose.foundation.layout.Spacer import androidx.compose.foundation.layout.fillMaxWidth @@ -18,8 +22,11 @@ import androidx.compose.material3.ProvideTextStyle import androidx.compose.material3.Text import androidx.compose.material3.TextField import androidx.compose.runtime.Composable +import androidx.compose.runtime.remember import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier +import androidx.compose.ui.graphics.Color +import androidx.compose.ui.graphics.asImageBitmap import androidx.compose.ui.unit.dp import androidx.compose.ui.window.Dialog import com.powersync.demos.powersync.TodoItem @@ -30,6 +37,9 @@ internal fun EditDialog( onCloseClicked: () -> Unit, onTextChanged: (String) -> Unit, onDoneChanged: (Boolean) -> Unit, + onPhotoClear: () -> Unit, + onPhotoCapture: () -> Unit, + attachmentsSupported: Boolean = false, ) { EditDialog( onCloseRequest = onCloseClicked, @@ -51,6 +61,40 @@ internal fun EditDialog( onCheckedChange = onDoneChanged, ) } + + val bitmap = + remember(item.photoURI) { + item.photoURI?.let { BitmapFactory.decodeFile(it)?.asImageBitmap() } + } + + if (attachmentsSupported == true) { + Box( + modifier = + Modifier + .clickable { if (item.photoId == null) onPhotoCapture() } + .padding(8.dp), + contentAlignment = Alignment.Center, + ) { + if (bitmap == null) { + Button( + onClick = onPhotoCapture, + modifier = Modifier.align(Alignment.Center), + contentPadding = PaddingValues(0.dp), + ) { + Text("Add Photo", color = Color.Gray) + } + } else { + Image(bitmap = bitmap, contentDescription = "Photo Preview") + Button( + onClick = onPhotoClear, + modifier = Modifier.align(Alignment.TopEnd), + contentPadding = PaddingValues(0.dp), + ) { + Text("Clear Photo", color = Color.Red) + } + } + } + } } } } @@ -58,16 +102,17 @@ internal fun EditDialog( @Composable private fun EditDialog( onCloseRequest: () -> Unit, - content: @Composable () -> Unit + content: @Composable () -> Unit, ) { Dialog( onDismissRequest = onCloseRequest, ) { - Card(elevation = CardDefaults.cardElevation(defaultElevation = 8.dp) ) { + Card(elevation = CardDefaults.cardElevation(defaultElevation = 8.dp)) { Column( - modifier = Modifier - .padding(8.dp) - .height(IntrinsicSize.Min) + modifier = + Modifier + .padding(8.dp) + .height(IntrinsicSize.Min), ) { ProvideTextStyle(MaterialTheme.typography.bodySmall) { Text(text = "Edit todo") @@ -83,7 +128,7 @@ private fun EditDialog( Button( onClick = onCloseRequest, - modifier = Modifier.align(Alignment.End) + modifier = Modifier.align(Alignment.End), ) { Text(text = "Done") } diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Schema.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Schema.kt index 9ab757e4..8ce17635 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Schema.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Schema.kt @@ -1,5 +1,6 @@ package com.powersync.demos.powersync +import com.powersync.attachments.createAttachmentsTable import com.powersync.db.schema.Column import com.powersync.db.schema.Index import com.powersync.db.schema.IndexedColumn @@ -43,6 +44,7 @@ val schema: Schema = Schema( listOf( todos, lists, + createAttachmentsTable("attachments") ) ) @@ -57,6 +59,7 @@ data class TodoItem( val id: String, val listId: String, val photoId: String?, + val photoURI: String?, val createdAt: String?, val completedAt: String?, val description: String, diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Todo.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Todo.kt index 2bfa54fe..5e141e29 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Todo.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/Todo.kt @@ -4,34 +4,43 @@ import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope import co.touchlab.kermit.Logger import com.powersync.PowerSyncDatabase +import com.powersync.androidexample.ui.CameraService +import com.powersync.attachments.AttachmentQueue import com.powersync.db.getLongOptional import com.powersync.db.getString import com.powersync.db.getStringOptional +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch import kotlinx.datetime.Clock internal class Todo( private val db: PowerSyncDatabase, - private val userId: String? -): ViewModel() { - + private val attachmentsQueue: AttachmentQueue?, + private val userId: String?, +) : ViewModel() { private val _inputText = MutableStateFlow("") val inputText: StateFlow = _inputText private val _editingItem = MutableStateFlow(null) val editingItem: StateFlow = _editingItem - fun watchItems(listId: String?): Flow> { - return db.watch(""" - SELECT * - FROM $TODOS_TABLE - WHERE list_id = ? - ORDER by id + fun watchItems(listId: String?): Flow> = + db.watch( + """ + SELECT + t.*, a.local_uri + FROM + $TODOS_TABLE t + LEFT JOIN attachments a ON t.photo_id = a.id + WHERE + t.list_id = ? + ORDER BY t.id; """, - if(listId != null) listOf(listId) else null + if (listId != null) listOf(listId) else null, ) { cursor -> TodoItem( id = cursor.getString("id"), @@ -42,37 +51,46 @@ internal class Todo( completedBy = cursor.getStringOptional("completed_by"), completed = cursor.getLongOptional("completed") == 1L, listId = cursor.getString("list_id"), - photoId = cursor.getStringOptional("photo_id") + photoId = cursor.getStringOptional("photo_id"), + photoURI = cursor.getStringOptional("local_uri"), ) } - } fun onItemClicked(item: TodoItem) { _editingItem.value = item } - fun onItemDoneChanged(item: TodoItem, isDone: Boolean) { + fun onItemDoneChanged( + item: TodoItem, + isDone: Boolean, + ) { updateItem(item = item) { it.copy( completed = isDone, - completedBy = if(isDone) userId else null, - completedAt = if(isDone) Clock.System.now().toString() else null + completedBy = if (isDone) userId else null, + completedAt = if (isDone) Clock.System.now().toString() else null, ) } } fun onItemDeleteClicked(item: TodoItem) { viewModelScope.launch { + if (item.photoId != null) { + attachmentsQueue?.deleteFile(item.photoId) { _, _ -> } + } db.writeTransaction { tx -> tx.execute("DELETE FROM $TODOS_TABLE WHERE id = ?", listOf(item.id)) } } } - fun onAddItemClicked(userId: String?, listId: String?) { + fun onAddItemClicked( + userId: String?, + listId: String?, + ) { if (_inputText.value.isBlank()) return - if(userId == null || listId == null) { + if (userId == null || listId == null) { throw Exception("userId or listId is null") } @@ -80,7 +98,7 @@ internal class Todo( db.writeTransaction { tx -> tx.execute( "INSERT INTO $TODOS_TABLE (id, created_at, created_by, description, list_id) VALUES (uuid(), datetime(), ?, ?, ?)", - listOf(userId, _inputText.value, listId) + listOf(userId, _inputText.value, listId), ) } _inputText.value = "" @@ -106,24 +124,69 @@ internal class Todo( updateEditingItem(item = requireNotNull(_editingItem.value)) { it.copy( completed = isDone, - completedBy = if(isDone) userId else null, - completedAt = if(isDone) Clock.System.now().toString() else null + completedBy = if (isDone) userId else null, + completedAt = if (isDone) Clock.System.now().toString() else null, ) } } - private fun updateEditingItem(item: TodoItem, transformer: (item: TodoItem) -> TodoItem) { + fun onPhotoCapture(cameraService: CameraService) { + viewModelScope.launch { + val item = requireNotNull(_editingItem.value) + val photoData = + try { + cameraService.takePicture() + } catch (ex: Exception) { + if (ex is CancellationException) { + throw ex + } else { + // otherwise ignore + return@launch + } + } + val attachment = + attachmentsQueue!!.saveFile(data = flowOf(photoData), mediaType = "image/jped", fileExtension = "jpg") { tx, attachment -> + tx.execute("UPDATE $TODOS_TABLE SET photo_id = ? WHERE id = ?", listOf(attachment.id, item.id)) + } + + updateEditingItem(item = item) { it.copy(photoURI = attachment.localUri) } + } + } + + fun onPhotoDelete() { + viewModelScope.launch { + val item = requireNotNull(_editingItem.value) + attachmentsQueue!!.deleteFile(item.photoId!!) { tx, _ -> + tx.execute("UPDATE $TODOS_TABLE SET photo_id = NULL WHERE id = ?", listOf(item.id)) + } + updateEditingItem(item = item) { it.copy(photoURI = null) } + } + } + + private fun updateEditingItem( + item: TodoItem, + transformer: (item: TodoItem) -> TodoItem, + ) { _editingItem.value = transformer(item) } - private fun updateItem(item: TodoItem, transformer: (item: TodoItem) -> TodoItem) { + private fun updateItem( + item: TodoItem, + transformer: (item: TodoItem) -> TodoItem, + ) { viewModelScope.launch { val updatedItem = transformer(item) Logger.i("Updating item: $updatedItem") db.writeTransaction { tx -> tx.execute( "UPDATE $TODOS_TABLE SET description = ?, completed = ?, completed_by = ?, completed_at = ? WHERE id = ?", - listOf(updatedItem.description, updatedItem.completed, updatedItem.completedBy, updatedItem.completedAt, item.id) + listOf( + updatedItem.description, + updatedItem.completed, + updatedItem.completedBy, + updatedItem.completedAt, + item.id, + ), ) } } diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/screens/TodosScreen.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/screens/TodosScreen.kt index be261e26..4cb0508b 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/screens/TodosScreen.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/screens/TodosScreen.kt @@ -6,13 +6,13 @@ import androidx.compose.foundation.layout.Spacer import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.width +import androidx.compose.material.icons.Icons +import androidx.compose.material.icons.automirrored.filled.ArrowBack +import androidx.compose.material3.ExperimentalMaterial3Api import androidx.compose.material3.Icon import androidx.compose.material3.IconButton import androidx.compose.material3.Text import androidx.compose.material3.TopAppBar -import androidx.compose.material.icons.Icons -import androidx.compose.material.icons.automirrored.filled.ArrowBack -import androidx.compose.material3.ExperimentalMaterial3Api import androidx.compose.runtime.Composable import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign @@ -44,8 +44,9 @@ internal fun TodosScreen( Text( "Todo List", textAlign = TextAlign.Center, - modifier = Modifier.fillMaxWidth().padding(end = 36.dp) - ) }, + modifier = Modifier.fillMaxWidth().padding(end = 36.dp), + ) + }, navigationIcon = { IconButton(onClick = { navController.navigate(Screen.Home) }) { Icon(Icons.AutoMirrored.Filled.ArrowBack, contentDescription = "Go back") @@ -54,14 +55,14 @@ internal fun TodosScreen( actions = { WifiIcon(isConnected ?: false) Spacer(modifier = Modifier.width(16.dp)) - } + }, ) Input( text = inputText, onAddClicked = onAddItemClicked, onTextChanged = onInputTextChanged, - screen = Screen.Todos + screen = Screen.Todos, ) Box(Modifier.weight(1F)) { @@ -69,7 +70,7 @@ internal fun TodosScreen( items = items, onItemClicked = onItemClicked, onItemDoneChanged = onItemDoneChanged, - onItemDeleteClicked = onItemDeleteClicked + onItemDeleteClicked = onItemDeleteClicked, ) } } diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/ui/CameraService.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/ui/CameraService.kt new file mode 100644 index 00000000..4057c8d1 --- /dev/null +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/ui/CameraService.kt @@ -0,0 +1,55 @@ +package com.powersync.androidexample.ui + +import android.net.Uri +import android.os.Environment +import androidx.activity.result.contract.ActivityResultContracts +import androidx.core.content.FileProvider +import com.powersync.androidexample.MainActivity +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.io.File +import java.text.SimpleDateFormat +import java.util.Date +import java.util.Locale + +/** + * A very basic camera service. This should not be used in production. + */ +class CameraService( + val activity: MainActivity, +) { + private var currentPhotoUri: Uri? = null + private var pictureResult: CompletableDeferred? = null + private var file: File? = null + private val mutex = Mutex() + + private val takePictureLauncher = + activity.registerForActivityResult(ActivityResultContracts.TakePicture()) { success -> + if (success && currentPhotoUri != null) { + activity.contentResolver.openInputStream(currentPhotoUri!!)?.use { + pictureResult!!.complete(it.readBytes()) + } + file!!.delete() + } else { + pictureResult!!.completeExceptionally(Exception("Could not capture photo")) + } + + file = null + currentPhotoUri = null + pictureResult = null + } + + suspend fun takePicture(): ByteArray = + mutex.withLock { + pictureResult = CompletableDeferred() + val timeStamp = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.getDefault()).format(Date()) + val storageDir = activity.getExternalFilesDir(Environment.DIRECTORY_PICTURES) + file = File.createTempFile("JPEG_${timeStamp}_", ".jpg", storageDir) + currentPhotoUri = FileProvider.getUriForFile(activity, "${activity.packageName}.fileprovider", file!!) + + takePictureLauncher.launch(currentPhotoUri!!) + + return pictureResult!!.await() + } +} diff --git a/demos/android-supabase-todolist/app/src/main/res/xml/filepaths.xml b/demos/android-supabase-todolist/app/src/main/res/xml/filepaths.xml new file mode 100644 index 00000000..6f554b86 --- /dev/null +++ b/demos/android-supabase-todolist/app/src/main/res/xml/filepaths.xml @@ -0,0 +1,4 @@ + + + + diff --git a/demos/android-supabase-todolist/app/src/main/res/xml/network_security_config.xml b/demos/android-supabase-todolist/app/src/main/res/xml/network_security_config.xml new file mode 100644 index 00000000..de61259a --- /dev/null +++ b/demos/android-supabase-todolist/app/src/main/res/xml/network_security_config.xml @@ -0,0 +1,6 @@ + + + + localhost + + diff --git a/demos/android-supabase-todolist/gradle/libs.versions.toml b/demos/android-supabase-todolist/gradle/libs.versions.toml index 40dd839b..4268a9db 100644 --- a/demos/android-supabase-todolist/gradle/libs.versions.toml +++ b/demos/android-supabase-todolist/gradle/libs.versions.toml @@ -12,7 +12,7 @@ composeBom = "2025.02.00" materialIconsExtended = "1.7.8" uuid = "0.8.2" kermit = "2.0.5" -sqldelight= "2.0.2" +sqldelight = "2.0.2" [libraries] androidx-core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "coreKtx" } diff --git a/demos/android-supabase-todolist/local.properties.example b/demos/android-supabase-todolist/local.properties.example index 0b8f0d04..a1ac1145 100644 --- a/demos/android-supabase-todolist/local.properties.example +++ b/demos/android-supabase-todolist/local.properties.example @@ -11,6 +11,7 @@ sdk.dir=/Users/dominic/Library/Android/sdk SUPABASE_URL=https://foo.supabase.co SUPABASE_ANON_KEY=foo +SUPABASE_STORAGE_BUCKET=media # optional attachment bucket, set to null to disable POWERSYNC_URL=https://foo.powersync.journeyapps.com # Set to true to use released PowerSync packages instead of the ones built locally. USE_RELEASED_POWERSYNC_VERSIONS=false diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 666359ea..336a60e3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,6 +4,7 @@ android-minSdk = "24" android-targetSdk = "35" android-compileSdk = "35" configurationAnnotations = "0.9.5" +gradleDownloadTask = "5.5.0" java = "17" idea = "243.22562.218" # Meerkat | 2024.3.1 (see https://plugins.jetbrains.com/docs/intellij/android-studio-releases-list.html) @@ -55,11 +56,12 @@ junitVersion = "1.2.1" [libraries] configuration-annotations = { module = "co.touchlab.skie:configuration-annotations", version.ref = "configurationAnnotations" } +gradle-download-task = { module = "de.undercouch:gradle-download-task", version.ref = "gradleDownloadTask" } kermit = { module = "co.touchlab:kermit", version.ref = "kermit" } kermit-test = { module = "co.touchlab:kermit-test", version.ref = "kermit" } +kotlin-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } powersync-sqlite-core-android = { module = "co.powersync:powersync-sqlite-core", version.ref = "powersync-core" } mavenPublishPlugin = { module = "com.vanniktech:gradle-maven-publish-plugin", version.ref = "maven-publish" } -kotlin-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } test-junit = { group = "junit", name = "junit", version.ref = "junit" } test-android-runner = { module = "androidx.test:runner", version.ref = "androidx-test-runner" } @@ -99,7 +101,7 @@ sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" } stately-concurrency = { module = "co.touchlab:stately-concurrency", version.ref = "stately" } supabase-client = { module = "io.github.jan-tennert.supabase:postgrest-kt", version.ref = "supabase" } supabase-auth = { module = "io.github.jan-tennert.supabase:auth-kt", version.ref = "supabase" } - +supabase-storage = { module = "io.github.jan-tennert.supabase:storage-kt", version.ref = "supabase" } androidx-sqliteFramework = { module = "androidx.sqlite:sqlite-framework", version.ref = "androidxSqlite" } # Sample - Android diff --git a/plugins/build-plugin/build.gradle.kts b/plugins/build-plugin/build.gradle.kts new file mode 100644 index 00000000..1690f112 --- /dev/null +++ b/plugins/build-plugin/build.gradle.kts @@ -0,0 +1,16 @@ +plugins { + `kotlin-dsl` // Enables Kotlin DSL for writing Gradle build logic +} + +gradlePlugin { + // Define the plugin + val sonatypeCentralUpload by plugins.creating { + id = "com.powersync.plugins.sharedbuild" + implementationClass = "com.powersync.plugins.sharedbuild.SharedBuildPlugin" + } +} + +dependencies { + implementation(libs.gradle.download.task) + implementation(libs.kotlin.gradle.plugin) +} diff --git a/plugins/build-plugin/settings.gradle.kts b/plugins/build-plugin/settings.gradle.kts new file mode 100644 index 00000000..7fbbd448 --- /dev/null +++ b/plugins/build-plugin/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "build-logic" diff --git a/plugins/build-plugin/src/main/kotlin/SharedBuildPlugin.kt b/plugins/build-plugin/src/main/kotlin/SharedBuildPlugin.kt new file mode 100644 index 00000000..e9488485 --- /dev/null +++ b/plugins/build-plugin/src/main/kotlin/SharedBuildPlugin.kt @@ -0,0 +1,98 @@ +package com.powersync.plugins.sharedbuild + +import de.undercouch.gradle.tasks.download.Download +import org.gradle.api.Plugin +import org.gradle.api.Project +import org.gradle.api.artifacts.VersionCatalogsExtension +import org.gradle.api.tasks.Exec +import org.gradle.kotlin.dsl.withType +import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension +import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget +import org.jetbrains.kotlin.konan.target.Family +import java.io.File + +class SharedBuildPlugin : Plugin { + override fun apply(project: Project) { + val binariesFolder = project.layout.buildDirectory.dir("binaries") + + val coreVersion = + project.extensions + .getByType(VersionCatalogsExtension::class.java) + .named("libs") + .findVersion("powersync.core") + .get() + .toString() + + val frameworkUrl = + "https://github.com/powersync-ja/powersync-sqlite-core/releases/download/v$coreVersion/powersync-sqlite-core.xcframework.zip" + + val downloadPowersyncFramework = + project.tasks.register("downloadPowersyncFramework", Download::class.java) { + src(frameworkUrl) + dest(binariesFolder.map { it.file("framework/powersync-sqlite-core.xcframework.zip") }) + onlyIfModified(true) + } + + val unzipPowersyncFramework = + project.tasks.register("unzipPowersyncFramework", Exec::class.java) { + dependsOn(downloadPowersyncFramework) + + val zipfile = downloadPowersyncFramework.get().dest + inputs.file(zipfile) + val destination = File(zipfile.parentFile, "extracted") + doFirst { + destination.deleteRecursively() + destination.mkdir() + } + + // We're using unzip here because the Gradle copy task doesn't support symlinks. + executable = "unzip" + args(zipfile.absolutePath) + workingDir(destination) + outputs.dir(destination) + } + + project.extensions + .getByType(KotlinMultiplatformExtension::class.java) + .targets + .withType() + .configureEach { + if (konanTarget.family == Family.IOS && + konanTarget.name.contains( + "simulator", + ) + ) { + binaries + .withType() + .configureEach { + linkTaskProvider.configure { dependsOn(unzipPowersyncFramework) } + linkerOpts("-framework", "powersync-sqlite-core") + + val frameworkRoot = + binariesFolder + .map { it.dir("framework/extracted/powersync-sqlite-core.xcframework/ios-arm64_x86_64-simulator") } + .get() + .asFile.path + + linkerOpts("-F", frameworkRoot) + linkerOpts("-rpath", frameworkRoot) + } + } else if (konanTarget.family == Family.OSX) { + binaries + .withType() + .configureEach { + linkTaskProvider.configure { dependsOn("unzipPowersyncFramework") } + linkerOpts("-framework", "powersync-sqlite-core") + var frameworkRoot = + binariesFolder + .map { it.dir("framework/extracted/powersync-sqlite-core.xcframework/macos-arm64_x86_64") } + .get() + .asFile.path + + linkerOpts("-F", frameworkRoot) + linkerOpts("-rpath", frameworkRoot) + } + } + } + } +} diff --git a/plugins/settings.gradle.kts b/plugins/settings.gradle.kts index 5b507f9a..cfc24791 100644 --- a/plugins/settings.gradle.kts +++ b/plugins/settings.gradle.kts @@ -22,4 +22,5 @@ dependencyResolutionManagement { rootProject.name = "plugins" -include(":sonatype") \ No newline at end of file +include(":sonatype") +include(":build-plugin")