From 450c073b3f496846897ddb43887c683b05ed1f97 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 11:55:55 +0200 Subject: [PATCH 01/11] Report sync download progress --- core/build.gradle.kts | 5 +- .../powersync/sync/BaseInMemorySyncTest.kt | 97 ++++++ .../{ => sync}/SyncIntegrationTest.kt | 94 +----- .../com/powersync/sync/SyncProgressTest.kt | 300 ++++++++++++++++++ .../com/powersync/bucket/BucketStorage.kt | 1 + .../com/powersync/bucket/BucketStorageImpl.kt | 30 +- .../bucket/LocalOperationCounters.kt | 6 + .../com/powersync/db/PowerSyncDatabaseImpl.kt | 37 +-- .../kotlin/com/powersync/sync/Progress.kt | 115 +++++++ .../kotlin/com/powersync/sync/SyncStatus.kt | 42 +++ .../kotlin/com/powersync/sync/SyncStream.kt | 74 ++--- .../com/powersync/sync/SyncStreamTest.kt | 4 +- 12 files changed, 647 insertions(+), 158 deletions(-) create mode 100644 core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt rename core/src/commonIntegrationTest/kotlin/com/powersync/{ => sync}/SyncIntegrationTest.kt (84%) create mode 100644 core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/sync/Progress.kt diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 96f77274..69aa3ed8 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -321,7 +321,10 @@ androidComponents.onVariants { } tasks.named(kotlin.jvm().compilations["main"].processResourcesTaskName) { - from(downloadPowersyncDesktopBinaries) + //from(downloadPowersyncDesktopBinaries) + from("/Users/simon/src/powersync-sqlite-core/target/debug/libpowersync.dylib") { + rename { "libpowersync_aarch64.dylib" } + } } // We want to build with recent JDKs, but need to make sure we support Java 8. https://jakewharton.com/build-on-latest-java-test-through-lowest-java/ diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt new file mode 100644 index 00000000..a2e4d01b --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt @@ -0,0 +1,97 @@ +package com.powersync.sync + +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.PowerSyncDatabase +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials +import com.powersync.db.PowerSyncDatabaseImpl +import com.powersync.db.schema.Schema +import com.powersync.testutils.MockSyncService +import com.powersync.testutils.UserRow +import com.powersync.testutils.cleanup +import com.powersync.testutils.factory +import com.powersync.testutils.generatePrintLogWriter +import dev.mokkery.answering.returns +import dev.mokkery.everySuspend +import dev.mokkery.mock +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.JsonObject +import kotlin.test.AfterTest +import kotlin.test.BeforeTest + +@OptIn(ExperimentalKermitApi::class) +abstract class BaseInMemorySyncTest { + val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + + val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(logWriter, generatePrintLogWriter()), + ), + ) + internal lateinit var database: PowerSyncDatabaseImpl + internal lateinit var connector: PowerSyncBackendConnector + internal lateinit var syncLines: Channel + + @BeforeTest + fun setup() { + cleanup("testdb") + logWriter.reset() + database = openDb() + connector = + mock { + everySuspend { getCredentialsCached() } returns + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + + everySuspend { invalidateCredentials() } returns Unit + } + syncLines = Channel() + + runBlocking { + database.disconnectAndClear(true) + } + } + + @AfterTest + fun teardown() { + runBlocking { + database.close() + } + cleanup("testdb") + } + + internal fun openDb() = + PowerSyncDatabase( + factory = factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + ) as PowerSyncDatabaseImpl + + internal fun syncStream(): SyncStream { + val client = MockSyncService(syncLines) + return SyncStream( + bucketStorage = database.bucketStorage, + connector = connector, + httpEngine = client, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt similarity index 84% rename from core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt rename to core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index cfefbf7b..c2831948 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -1,41 +1,24 @@ -package com.powersync +package com.powersync.sync import app.cash.turbine.turbineScope -import co.touchlab.kermit.ExperimentalKermitApi import co.touchlab.kermit.Logger -import co.touchlab.kermit.Severity -import co.touchlab.kermit.TestConfig -import co.touchlab.kermit.TestLogWriter +import com.powersync.PowerSyncDatabase +import com.powersync.PowerSyncException import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry -import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema -import com.powersync.sync.SyncLine -import com.powersync.sync.SyncStream -import com.powersync.testutils.MockSyncService import com.powersync.testutils.UserRow -import com.powersync.testutils.cleanup import com.powersync.testutils.factory -import com.powersync.testutils.generatePrintLogWriter import com.powersync.testutils.waitFor import com.powersync.utils.JsonUtil -import dev.mokkery.answering.returns -import dev.mokkery.everySuspend -import dev.mokkery.mock import dev.mokkery.verify import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.JsonObject -import kotlin.test.AfterTest -import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -44,75 +27,8 @@ import kotlin.test.assertNotNull import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds -@OptIn(ExperimentalKermitApi::class) -class SyncIntegrationTest { - private val logWriter = - TestLogWriter( - loggable = Severity.Debug, - ) - - private val logger = - Logger( - TestConfig( - minSeverity = Severity.Debug, - logWriterList = listOf(logWriter, generatePrintLogWriter()), - ), - ) - private lateinit var database: PowerSyncDatabaseImpl - private lateinit var connector: PowerSyncBackendConnector - private lateinit var syncLines: Channel - - @BeforeTest - fun setup() { - cleanup("testdb") - logWriter.reset() - database = openDb() - connector = - mock { - everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - userId = "test-user", - endpoint = "https://test.com", - ) - - everySuspend { invalidateCredentials() } returns Unit - } - syncLines = Channel() - - runBlocking { - database.disconnectAndClear(true) - } - } - - @AfterTest - fun teardown() { - runBlocking { - database.close() - } - cleanup("testdb") - } - - private fun openDb() = - PowerSyncDatabase( - factory = factory, - schema = Schema(UserRow.table), - dbFilename = "testdb", - ) as PowerSyncDatabaseImpl - - private fun syncStream(): SyncStream { - val client = MockSyncService(syncLines) - return SyncStream( - bucketStorage = database.bucketStorage, - connector = connector, - httpEngine = client, - uploadCrud = { }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - ) - } - +@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +class SyncIntegrationTest: BaseInMemorySyncTest() { private suspend fun expectUserCount(amount: Int) { val users = database.getAll("SELECT * FROM users;") { UserRow.from(it) } assertEquals(amount, users.size, "Expected $amount users, got $users") diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt new file mode 100644 index 00000000..e66c0c12 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -0,0 +1,300 @@ +package com.powersync.sync + +import app.cash.turbine.ReceiveTurbine +import app.cash.turbine.turbineScope +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.OpType +import com.powersync.bucket.OplogEntry +import com.powersync.testutils.waitFor +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.runTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class SyncProgressTest: BaseInMemorySyncTest() { + private var lastOpId = 0 + + @BeforeTest + fun resetOpId() { + lastOpId = 0 + } + + private fun bucket(name: String, count: Int, priority: BucketPriority = BucketPriority(3)): BucketChecksum { + return BucketChecksum( + bucket = name, + priority = priority, + checksum = 0, + count = count, + ) + } + + private suspend fun addDataLine(bucket: String, amount: Int) { + syncLines.send(SyncLine.SyncDataBucket( + bucket=bucket, + data = List(amount) { + OplogEntry( + checksum = 0, + opId = (++lastOpId).toString(), + op = OpType.PUT, + rowId = lastOpId.toString(), + rowType = bucket, + data = "{}", + ) + }, + after = null, + nextAfter = null, + )) + } + + private suspend fun addCheckpointComplete(priority: BucketPriority? = null) { + if (priority != null) { + syncLines.send(SyncLine.CheckpointPartiallyComplete( + lastOpId=lastOpId.toString(), + priority = priority, + )) + } else { + syncLines.send(SyncLine.CheckpointComplete(lastOpId=lastOpId.toString())) + } + } + + private suspend fun ReceiveTurbine.expectProgress( + total: Pair, + priorities: Map> = emptyMap() + ) { + val item = awaitItem() + val progress = item.downloadProgress ?: error("Expected download progress on $item") + + assertTrue { item.downloading } + assertEquals(total.first, progress.untilCompletion.completed) + assertEquals(total.second, progress.untilCompletion.total) + + priorities.forEach { (priority, expected) -> + val (expectedDownloaded, expectedTotal) = expected + val progress = progress.untilPriority(priority) + assertEquals(expectedDownloaded, progress.completed) + assertEquals(expectedTotal, progress.total) + } + } + + private suspend fun ReceiveTurbine.expectNotDownloading() { + awaitItem().also { + assertFalse { it.downloading } + assertNull(it.downloadProgress) + } + } + + @Test + fun withoutPriorities() = runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="10", + checksums = listOf(bucket("a", 10)) + ))) + turbine.expectProgress(0 to 10) + + addDataLine("a", 10) + turbine.expectProgress(10 to 10) + + addCheckpointComplete() + turbine.expectNotDownloading() + + // Emit new data, progress should be 0/2 instead of 10/12 + syncLines.send(SyncLine.CheckpointDiff( + lastOpId="12", + updatedBuckets = listOf(bucket("a", 12)), + removedBuckets = emptyList(), + )) + turbine.expectProgress(0 to 2) + + addDataLine("a", 2) + turbine.expectProgress(2 to 2) + + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun interruptedSync() = runTest { + var syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="10", + checksums = listOf(bucket("a", 10)) + ))) + turbine.expectProgress(0 to 10) + + addDataLine("a", 5) + turbine.expectProgress(5 to 10) + + turbine.cancel() + } + + // Emulate the app closing + database.close() + syncLines.close() + + // And reconnecting + database = openDb() + syncLines = Channel() + syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send the same checkpoint as before + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="10", + checksums = listOf(bucket("a", 10)) + ))) + + // Progress should be restored: 5 / 10 instead of 0/5 + turbine.expectProgress(5 to 10) + + addDataLine("a", 5) + turbine.expectProgress(10 to 10) + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun interruptedSyncWithNewCheckpoint() = runTest { + var syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="10", + checksums = listOf(bucket("a", 10)) + ))) + turbine.expectProgress(0 to 10) + + addDataLine("a", 5) + turbine.expectProgress(5 to 10) + + turbine.cancel() + } + + // Close and re-connect + database.close() + syncLines.close() + database = openDb() + syncLines = Channel() + syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with two more ops + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="12", + checksums = listOf(bucket("a", 12)) + ))) + + turbine.expectProgress(5 to 12) + + addDataLine("a", 7) + turbine.expectProgress(12 to 12) + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun differentPriorities() = runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + suspend fun expectProgress(prio0: Pair, prio2: Pair) { + turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2)) + } + + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId="10", + checksums = listOf( + bucket("a", 5, BucketPriority(0)), + bucket("b", 5, BucketPriority(2)), + ) + ))) + expectProgress(0 to 5, 0 to 10) + + addDataLine("a", 5) + expectProgress(5 to 5, 5 to 10) + + addCheckpointComplete(BucketPriority(0)) + expectProgress(5 to 5, 5 to 10) + + addDataLine("b", 2) + expectProgress(5 to 5, 7 to 10) + + // Before syncing b fully, send a new checkpoint + syncLines.send(SyncLine.CheckpointDiff( + lastOpId="14", + updatedBuckets = listOf( + bucket("a", 8, BucketPriority(0)), + bucket("b", 6, BucketPriority(2)), + ), + removedBuckets = emptyList(), + )) + expectProgress(5 to 8, 7 to 14) + + addDataLine("a", 3) + expectProgress(8 to 8, 10 to 14) + addDataLine("b", 4) + expectProgress(8 to 8, 14 to 14) + + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 94ca52df..5176b5ac 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -23,6 +23,7 @@ internal interface BucketStorage { suspend fun saveSyncData(syncDataBatch: SyncDataBatch) suspend fun getBucketStates(): List + suspend fun getBucketOperationProgress(): Map suspend fun removeBuckets(bucketsToDelete: List) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 2bdd340b..06282171 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -141,6 +141,19 @@ internal class BucketStorageImpl( }, ) + override suspend fun getBucketOperationProgress(): Map = buildMap { + val rows = db.getAll("SELECT name, count_at_last, count_since_last FROM ps_buckets") { cursor -> + cursor.getString(0)!! to LocalOperationCounters( + atLast = cursor.getLong(1)!!.toInt(), + sinceLast = cursor.getLong(2)!!.toInt(), + ) + } + + for ((name, counters) in rows) { + put(name, counters) + } + } + override suspend fun removeBuckets(bucketsToDelete: List) { bucketsToDelete.forEach { bucketName -> deleteBucket(bucketName) @@ -294,7 +307,6 @@ internal class BucketStorageImpl( } return db.writeTransaction { tx -> - tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", listOf("sync_local", args), @@ -305,7 +317,21 @@ internal class BucketStorageImpl( cursor.getLong(0)!! } - return@writeTransaction res == 1L + val didApply = res == 1L + if (didApply && priority == null) { + // Reset progress counters. We only do this for a complete sync, as we want a download progress to + // always cover a complete checkpoint instead of resetting for partial completions. + tx.execute(""" + UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE ?1->name IS NOT NULL + """.trimIndent(), listOf(JsonUtil.json.encodeToString(buildMap { + for (bucket in checkpoint.checksums) { + bucket.count?.let { put(bucket.bucket, it) } + } + }))) + } + + return@writeTransaction didApply } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt new file mode 100644 index 00000000..73f6a6fc --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt @@ -0,0 +1,6 @@ +package com.powersync.bucket + +internal data class LocalOperationCounters( + val atLast: Int, + val sinceLast: Int, +) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 244d9516..923663d8 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -199,21 +199,7 @@ internal class PowerSyncDatabaseImpl( } launch { - stream.status.asFlow().collect { - currentStatus.update( - connected = it.connected, - connecting = it.connecting, - uploading = it.uploading, - downloading = it.downloading, - lastSyncedAt = it.lastSyncedAt, - hasSynced = it.hasSynced, - uploadError = it.uploadError, - downloadError = it.downloadError, - clearDownloadError = it.downloadError == null, - clearUploadError = it.uploadError == null, - priorityStatusEntries = it.priorityStatusEntries, - ) - } + currentStatus.trackOther(stream.status) } launch { @@ -376,11 +362,12 @@ internal class PowerSyncDatabaseImpl( syncSupervisorJob = null } - currentStatus.update( + currentStatus.update { copy( connected = false, connecting = false, - lastSyncedAt = currentStatus.lastSyncedAt, - ) + downloading = false, + downloadProgress = null, + ) } } override suspend fun disconnectAndClear(clearLocal: Boolean) { @@ -389,7 +376,7 @@ internal class PowerSyncDatabaseImpl( internalDb.writeTransaction { tx -> tx.getOptional("SELECT powersync_clear(?)", listOf(if (clearLocal) "1" else "0")) {} } - currentStatus.update(lastSyncedAt = null, hasSynced = false) + currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } } private suspend fun updateHasSynced() { @@ -429,11 +416,13 @@ internal class PowerSyncDatabaseImpl( } } - currentStatus.update( - hasSynced = lastSyncedAt != null, - lastSyncedAt = lastSyncedAt, - priorityStatusEntries = priorityStatus, - ) + currentStatus.update { + copy( + hasSynced = lastSyncedAt != null, + lastSyncedAt = lastSyncedAt, + priorityStatusEntries = priorityStatus, + ) + } } override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt new file mode 100644 index 00000000..b0907339 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -0,0 +1,115 @@ +package com.powersync.sync + +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.LocalOperationCounters + +/** + * Information about a progressing download. + * + * This reports the [total] amount of operations to download, how many of them have already been [completed] and finally + * a [fraction] indicating relative progress. + * + * To obtain a [ProgressWithOperations] instance, use a method on [SyncDownloadProgress] which in turn is available + * on [SyncStatusData]. + */ +public data class ProgressWithOperations( + val total: Int, + val completed: Int +) { + /** + * The relative amount of [total] items that have been [completed], as a number between `0.0` and `1.0`. + */ + public val fraction: Double get() { + if (completed == 0) { + return 0.0; + } + + return completed.toDouble() / total.toDouble() + } +} + +/** + * Provides realtime progress on how PowerSync is downloading rows. + * + * The reported progress always reflects the status towards the end of a sync iteration (after which a consistent + * snapshot of all buckets is available locally). + * + * In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets) + * operation takes place between syncs), it's possible for the returned numbers to be slightly inaccurate. For this + * reason, [SyncDownloadProgress] should be seen as an approximation of progress. The information returned is good + * enough to build progress bars, but not exact enough to track individual download counts. + * + * Also note that data is downloaded in bulk, which means that individual counters are unlikely to be updated + * one-by-one. + */ +@ConsistentCopyVisibility +public data class SyncDownloadProgress private constructor( + private val buckets: Map +) { + /** + * Creates download progress information from the local progress counters since the last full sync and the target + * checkpoint. + */ + internal constructor(localProgress: Map, target: Checkpoint) : this(buildMap { + for (entry in target.checksums) { + val savedProgress = localProgress[entry.bucket] + + put(entry.bucket, BucketProgress( + priority = entry.priority, + atLast = savedProgress?.atLast ?: 0, + sinceLast = savedProgress?.sinceLast ?: 0, + targetCount = entry.count ?: 0, + )) + } + }) + + /** + * Download progress towards a complete checkpoint being received. + * + * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded + * in total and how many of them have already been received. + */ + public val untilCompletion: ProgressWithOperations get() = untilPriority(BucketPriority.FULL_SYNC_PRIORITY) + + /** + * Returns download progress towards all data up until the specified [priority] being received. + * + * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded + * in total and how many of them have already been received. + */ + public fun untilPriority(priority: BucketPriority): ProgressWithOperations { + val (total, completed) = targetAndCompletedCounts(priority) + return ProgressWithOperations(total, completed) + } + + internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress { + return SyncDownloadProgress(buildMap { + putAll(this@SyncDownloadProgress.buckets) + + for (bucket in batch.buckets) { + val previous = get(bucket.bucket) ?: continue + put(bucket.bucket, previous.copy( + sinceLast = previous.sinceLast + bucket.data.size + )) + } + }) + } + + private fun targetAndCompletedCounts(priority: BucketPriority): Pair { + return buckets.values.asSequence() + .filter { it.priority >= priority } + .fold(0 to 0) { (prevTarget, prevCompleted), entry -> + (prevTarget + entry.total) to (prevCompleted + entry.sinceLast) + } + } +} + +private data class BucketProgress( + val priority: BucketPriority, + val atLast: Int, + val sinceLast: Int, + val targetCount: Int +) { + val total get(): Int = targetCount - atLast +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index ce4fde57..060c697d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -5,6 +5,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.datetime.Clock import kotlinx.datetime.Instant @ConsistentCopyVisibility @@ -36,6 +37,15 @@ public interface SyncStatusData { */ public val downloading: Boolean + /** + * Realtime progress information about downloaded operations during an active sync. + * + * + * For more information on what progress is reported, see [SyncDownloadProgress]. + * This value will be non-null only if [downloading] is true. + */ + public val downloadProgress: SyncDownloadProgress? + /** * true if uploading changes */ @@ -106,6 +116,7 @@ internal data class SyncStatusDataContainer( override val connected: Boolean = false, override val connecting: Boolean = false, override val downloading: Boolean = false, + override val downloadProgress: SyncDownloadProgress? = null, override val uploading: Boolean = false, override val lastSyncedAt: Instant? = null, override val hasSynced: Boolean? = null, @@ -115,6 +126,19 @@ internal data class SyncStatusDataContainer( ) : SyncStatusData { override val anyError get() = downloadError ?: uploadError + + internal fun abortedDownload() = copy( + downloading = false, + downloadProgress = null, + ) + + internal fun copyWithCompletedDownload() = copy( + lastSyncedAt = Clock.System.now(), + downloading = false, + downloadProgress = null, + hasSynced = true, + downloadError = null, + ) } @ConsistentCopyVisibility @@ -131,6 +155,7 @@ public data class SyncStatus internal constructor( /** * Updates the internal sync status indicators and emits Flow updates */ + @Deprecated("Use callback-based approach instead") internal fun update( connected: Boolean? = null, connecting: Boolean? = null, @@ -159,6 +184,20 @@ public data class SyncStatus internal constructor( stateFlow.value = data } + /** + * Updates the internal sync status indicators and emits Flow updates + */ + internal inline fun update(makeCopy: SyncStatusDataContainer.() -> SyncStatusDataContainer) { + data = data.makeCopy() + stateFlow.value = data + } + + internal suspend fun trackOther(source: SyncStatus) { + source.stateFlow.collect { + update { it } + } + } + override val anyError: Any? get() = data.anyError @@ -171,6 +210,9 @@ public data class SyncStatus internal constructor( override val downloading: Boolean get() = data.downloading + override val downloadProgress: SyncDownloadProgress? + get() = data.downloadProgress + override val uploading: Boolean get() = data.uploading diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 23b3e5ed..1d63c96a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -82,7 +82,7 @@ internal class SyncStream( var invalidCredentials = false clientId = bucketStorage.getClientId() while (true) { - status.update(connecting = true) + status.update { copy(connecting = true) } try { if (invalidCredentials) { // This may error. In that case it will be retried again on the next @@ -102,15 +102,9 @@ internal class SyncStream( } logger.e("Error in streamingSync: ${e.message}") - status.update( - downloadError = e, - ) + status.update { copy(downloadError = e) } } finally { - status.update( - connected = false, - connecting = true, - downloading = false, - ) + status.update { copy(connected = false, connecting = true, downloading = false) } delay(retryDelayMs) } } @@ -145,7 +139,7 @@ internal class SyncStream( } checkedCrudItem = nextCrudItem - status.update(uploading = true) + status.update { copy(uploading = true) } uploadCrud() } else { // Uploading is completed @@ -154,12 +148,12 @@ internal class SyncStream( } } catch (e: Exception) { logger.e { "Error uploading crud: ${e.message}" } - status.update(uploading = false, uploadError = e) + status.update { copy(uploading = false, uploadError = e) } delay(retryDelayMs) break } } - status.update(uploading = false) + status.update { copy(uploading = false) } } private suspend fun getWriteCheckpoint(): String { @@ -215,7 +209,7 @@ internal class SyncStream( throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") } - status.update(connected = true, connecting = false) + status.update { copy(connected = true, connecting = false) } val channel: ByteReadChannel = httpResponse.body() while (!channel.isClosedForRead) { @@ -256,7 +250,7 @@ internal class SyncStream( state = handleInstruction(line, value, state) } - status.update(downloading = false) + status.update { abortedDownload() } return state } @@ -290,7 +284,6 @@ internal class SyncStream( ): SyncStreamState { val (checkpoint) = line state.targetCheckpoint = checkpoint - status.update(downloading = true) val bucketsToDelete = state.bucketSet!!.toMutableList() val newBuckets = mutableSetOf() @@ -302,15 +295,28 @@ internal class SyncStream( } } - if (bucketsToDelete.size > 0) { + state.bucketSet = newBuckets + startTrackingCheckpoint(checkpoint, bucketsToDelete) + + return state + } + + private suspend fun startTrackingCheckpoint( + checkpoint: Checkpoint, + bucketsToDelete: List, + ) { + val progress = bucketStorage.getBucketOperationProgress() + status.update { copy( + downloading = true, + downloadProgress = SyncDownloadProgress(progress, checkpoint), + ) } + + if (bucketsToDelete.isNotEmpty()) { logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" } } - state.bucketSet = newBuckets bucketStorage.removeBuckets(bucketsToDelete) bucketStorage.setTargetCheckpoint(checkpoint) - - return state } private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { @@ -332,12 +338,7 @@ internal class SyncStream( } state.validatedCheckpoint = state.targetCheckpoint - status.update( - lastSyncedAt = Clock.System.now(), - downloading = false, - hasSynced = true, - clearDownloadError = true, - ) + status.update { copyWithCompletedDownload() } return state } @@ -362,8 +363,8 @@ internal class SyncStream( logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } } - status.update( - priorityStatusEntries = + status.update { + copy(priorityStatusEntries = buildList { // All states with a higher priority can be deleted since this partial sync includes them. addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) @@ -374,8 +375,8 @@ internal class SyncStream( hasSynced = true, ), ) - }, - ) + },) + } return state } @@ -388,8 +389,6 @@ internal class SyncStream( throw Exception("Checkpoint diff without previous checkpoint") } - status.update(downloading = true) - val newBuckets = mutableMapOf() state.targetCheckpoint!!.checksums.forEach { checksum -> @@ -409,15 +408,7 @@ internal class SyncStream( ) state.targetCheckpoint = newCheckpoint - - state.bucketSet = newBuckets.keys.toMutableSet() - - val bucketsToDelete = checkpointDiff.removedBuckets - if (bucketsToDelete.isNotEmpty()) { - logger.d { "Remove buckets $bucketsToDelete" } - } - bucketStorage.removeBuckets(bucketsToDelete) - bucketStorage.setTargetCheckpoint(state.targetCheckpoint!!) + startTrackingCheckpoint(newCheckpoint, checkpointDiff.removedBuckets) return state } @@ -426,7 +417,8 @@ internal class SyncStream( data: SyncLine.SyncDataBucket, state: SyncStreamState, ): SyncStreamState { - status.update(downloading = true) + val batch = SyncDataBatch(listOf(data)) + status.update { copy(downloading = true, downloadProgress = downloadProgress?.incrementDownloaded(batch)) } bucketStorage.saveSyncData(SyncDataBatch(listOf(data))) return state } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 41573e3f..1dd37a18 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -77,6 +77,7 @@ class SyncStreamTest { checkpointValid = true, checkpointFailures = emptyList(), ) + everySuspend { getBucketOperationProgress() } returns mapOf() } connector = mock { @@ -143,7 +144,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), ) - syncStream.status.update(connected = true) + syncStream.status.update { copy(connected=true) } syncStream.triggerCrudUpload() testLogWriter.assertCount(2) @@ -289,6 +290,7 @@ class SyncStreamTest { verifySuspend(order) { if (priorityNo == 0) { + bucketStorage.getBucketOperationProgress() bucketStorage.removeBuckets(any()) bucketStorage.setTargetCheckpoint(any()) } From 931186d86c37f8e4aa050cbd13ed1a06c48eb07b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 12:10:18 +0200 Subject: [PATCH 02/11] Unit test for computing fractions --- .../kotlin/com/powersync/sync/Progress.kt | 4 ++-- .../kotlin/com/powersync/sync/ProgressTest.kt | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index b0907339..e9c33c5b 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -14,8 +14,8 @@ import com.powersync.bucket.LocalOperationCounters * on [SyncStatusData]. */ public data class ProgressWithOperations( + val completed: Int, val total: Int, - val completed: Int ) { /** * The relative amount of [total] items that have been [completed], as a number between `0.0` and `1.0`. @@ -80,7 +80,7 @@ public data class SyncDownloadProgress private constructor( */ public fun untilPriority(priority: BucketPriority): ProgressWithOperations { val (total, completed) = targetAndCompletedCounts(priority) - return ProgressWithOperations(total, completed) + return ProgressWithOperations(completed = completed, total = total) } internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress { diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt new file mode 100644 index 00000000..048d20d0 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt @@ -0,0 +1,16 @@ +package com.powersync.sync + +import kotlin.test.Test +import kotlin.test.assertEquals + +class ProgressTest { + + @Test + fun reportsFraction() { + assertEquals(0.0, ProgressWithOperations(0, 10).fraction) + assertEquals(0.5, ProgressWithOperations(5, 10).fraction) + assertEquals(1.0, ProgressWithOperations(10, 10).fraction) + + assertEquals(0.0, ProgressWithOperations(0, 0).fraction) + } +} From ac5b01b5dd9c05ce513ccb4ad8d230527bfd0fa6 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 12:20:19 +0200 Subject: [PATCH 03/11] Reformat --- .../powersync/sync/BaseInMemorySyncTest.kt | 10 +- .../com/powersync/sync/SyncIntegrationTest.kt | 2 +- .../com/powersync/sync/SyncProgressTest.kt | 450 ++++++++++-------- .../com/powersync/bucket/BucketStorage.kt | 1 + .../com/powersync/bucket/BucketStorageImpl.kt | 42 +- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 14 +- .../kotlin/com/powersync/sync/Progress.kt | 65 +-- .../kotlin/com/powersync/sync/SyncStatus.kt | 26 +- .../kotlin/com/powersync/sync/SyncStream.kt | 36 +- .../kotlin/com/powersync/sync/ProgressTest.kt | 1 - .../com/powersync/sync/SyncStreamTest.kt | 2 +- 11 files changed, 362 insertions(+), 287 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt index a2e4d01b..ef58595a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/BaseInMemorySyncTest.kt @@ -52,11 +52,11 @@ abstract class BaseInMemorySyncTest { connector = mock { everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - userId = "test-user", - endpoint = "https://test.com", - ) + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) everySuspend { invalidateCredentials() } returns Unit } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index c2831948..cc6a5fb2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -28,7 +28,7 @@ import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds @OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) -class SyncIntegrationTest: BaseInMemorySyncTest() { +class SyncIntegrationTest : BaseInMemorySyncTest() { private suspend fun expectUserCount(amount: Int) { val users = database.getAll("SELECT * FROM users;") { UserRow.from(it) } assertEquals(amount, users.size, "Expected $amount users, got $users") diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index e66c0c12..3a6693e9 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -17,7 +17,7 @@ import kotlin.test.assertFalse import kotlin.test.assertNull import kotlin.test.assertTrue -class SyncProgressTest: BaseInMemorySyncTest() { +class SyncProgressTest : BaseInMemorySyncTest() { private var lastOpId = 0 @BeforeTest @@ -25,47 +25,58 @@ class SyncProgressTest: BaseInMemorySyncTest() { lastOpId = 0 } - private fun bucket(name: String, count: Int, priority: BucketPriority = BucketPriority(3)): BucketChecksum { - return BucketChecksum( + private fun bucket( + name: String, + count: Int, + priority: BucketPriority = BucketPriority(3), + ): BucketChecksum = + BucketChecksum( bucket = name, priority = priority, checksum = 0, count = count, ) - } - private suspend fun addDataLine(bucket: String, amount: Int) { - syncLines.send(SyncLine.SyncDataBucket( - bucket=bucket, - data = List(amount) { - OplogEntry( - checksum = 0, - opId = (++lastOpId).toString(), - op = OpType.PUT, - rowId = lastOpId.toString(), - rowType = bucket, - data = "{}", - ) - }, - after = null, - nextAfter = null, - )) + private suspend fun addDataLine( + bucket: String, + amount: Int, + ) { + syncLines.send( + SyncLine.SyncDataBucket( + bucket = bucket, + data = + List(amount) { + OplogEntry( + checksum = 0, + opId = (++lastOpId).toString(), + op = OpType.PUT, + rowId = lastOpId.toString(), + rowType = bucket, + data = "{}", + ) + }, + after = null, + nextAfter = null, + ), + ) } private suspend fun addCheckpointComplete(priority: BucketPriority? = null) { if (priority != null) { - syncLines.send(SyncLine.CheckpointPartiallyComplete( - lastOpId=lastOpId.toString(), - priority = priority, - )) + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = lastOpId.toString(), + priority = priority, + ), + ) } else { - syncLines.send(SyncLine.CheckpointComplete(lastOpId=lastOpId.toString())) + syncLines.send(SyncLine.CheckpointComplete(lastOpId = lastOpId.toString())) } } private suspend fun ReceiveTurbine.expectProgress( total: Pair, - priorities: Map> = emptyMap() + priorities: Map> = emptyMap(), ) { val item = awaitItem() val progress = item.downloadProgress ?: error("Expected download progress on $item") @@ -90,211 +101,248 @@ class SyncProgressTest: BaseInMemorySyncTest() { } @Test - fun withoutPriorities() = runTest { - val syncStream = syncStream() - database.connectInternal(syncStream, 1000L) - - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } - - // Send checkpoint with 10 ops, progress should be 0/10 - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="10", - checksums = listOf(bucket("a", 10)) - ))) - turbine.expectProgress(0 to 10) - - addDataLine("a", 10) - turbine.expectProgress(10 to 10) - - addCheckpointComplete() - turbine.expectNotDownloading() - - // Emit new data, progress should be 0/2 instead of 10/12 - syncLines.send(SyncLine.CheckpointDiff( - lastOpId="12", - updatedBuckets = listOf(bucket("a", 12)), - removedBuckets = emptyList(), - )) - turbine.expectProgress(0 to 2) - - addDataLine("a", 2) - turbine.expectProgress(2 to 2) - - addCheckpointComplete() - turbine.expectNotDownloading() - - turbine.cancel() - } - - database.close() - syncLines.close() - } - - @Test - fun interruptedSync() = runTest { - var syncStream = syncStream() - database.connectInternal(syncStream, 1000L) - - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } - - // Send checkpoint with 10 ops, progress should be 0/10 - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="10", - checksums = listOf(bucket("a", 10)) - ))) - turbine.expectProgress(0 to 10) - - addDataLine("a", 5) - turbine.expectProgress(5 to 10) - - turbine.cancel() - } + fun withoutPriorities() = + runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) - // Emulate the app closing - database.close() - syncLines.close() + addDataLine("a", 10) + turbine.expectProgress(10 to 10) - // And reconnecting - database = openDb() - syncLines = Channel() - syncStream = syncStream() - database.connectInternal(syncStream, 1000L) + addCheckpointComplete() + turbine.expectNotDownloading() - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } + // Emit new data, progress should be 0/2 instead of 10/12 + syncLines.send( + SyncLine.CheckpointDiff( + lastOpId = "12", + updatedBuckets = listOf(bucket("a", 12)), + removedBuckets = emptyList(), + ), + ) + turbine.expectProgress(0 to 2) - // Send the same checkpoint as before - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="10", - checksums = listOf(bucket("a", 10)) - ))) + addDataLine("a", 2) + turbine.expectProgress(2 to 2) - // Progress should be restored: 5 / 10 instead of 0/5 - turbine.expectProgress(5 to 10) + addCheckpointComplete() + turbine.expectNotDownloading() - addDataLine("a", 5) - turbine.expectProgress(10 to 10) - addCheckpointComplete() - turbine.expectNotDownloading() + turbine.cancel() + } - turbine.cancel() + database.close() + syncLines.close() } - database.close() - syncLines.close() - } - @Test - fun interruptedSyncWithNewCheckpoint() = runTest { - var syncStream = syncStream() - database.connectInternal(syncStream, 1000L) - - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="10", - checksums = listOf(bucket("a", 10)) - ))) - turbine.expectProgress(0 to 10) - - addDataLine("a", 5) - turbine.expectProgress(5 to 10) - - turbine.cancel() - } + fun interruptedSync() = + runTest { + var syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) - // Close and re-connect - database.close() - syncLines.close() - database = openDb() - syncLines = Channel() - syncStream = syncStream() - database.connectInternal(syncStream, 1000L) + addDataLine("a", 5) + turbine.expectProgress(5 to 10) - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } + turbine.cancel() + } - // Send checkpoint with two more ops - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="12", - checksums = listOf(bucket("a", 12)) - ))) + // Emulate the app closing + database.close() + syncLines.close() + + // And reconnecting + database = openDb() + syncLines = Channel() + syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send the same checkpoint as before + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) - turbine.expectProgress(5 to 12) + // Progress should be restored: 5 / 10 instead of 0/5 + turbine.expectProgress(5 to 10) - addDataLine("a", 7) - turbine.expectProgress(12 to 12) - addCheckpointComplete() - turbine.expectNotDownloading() + addDataLine("a", 5) + turbine.expectProgress(10 to 10) + addCheckpointComplete() + turbine.expectNotDownloading() - turbine.cancel() - } + turbine.cancel() + } - database.close() - syncLines.close() - } + database.close() + syncLines.close() + } @Test - fun differentPriorities() = runTest { - val syncStream = syncStream() - database.connectInternal(syncStream, 1000L) + fun interruptedSyncWithNewCheckpoint() = + runTest { + var syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected && !it.downloading } + addDataLine("a", 5) + turbine.expectProgress(5 to 10) - suspend fun expectProgress(prio0: Pair, prio2: Pair) { - turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2)) + turbine.cancel() } - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId="10", - checksums = listOf( - bucket("a", 5, BucketPriority(0)), - bucket("b", 5, BucketPriority(2)), + // Close and re-connect + database.close() + syncLines.close() + database = openDb() + syncLines = Channel() + syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with two more ops + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "12", + checksums = listOf(bucket("a", 12)), + ), + ), ) - ))) - expectProgress(0 to 5, 0 to 10) - addDataLine("a", 5) - expectProgress(5 to 5, 5 to 10) + turbine.expectProgress(5 to 12) + + addDataLine("a", 7) + turbine.expectProgress(12 to 12) + addCheckpointComplete() + turbine.expectNotDownloading() - addCheckpointComplete(BucketPriority(0)) - expectProgress(5 to 5, 5 to 10) + turbine.cancel() + } - addDataLine("b", 2) - expectProgress(5 to 5, 7 to 10) + database.close() + syncLines.close() + } - // Before syncing b fully, send a new checkpoint - syncLines.send(SyncLine.CheckpointDiff( - lastOpId="14", - updatedBuckets = listOf( - bucket("a", 8, BucketPriority(0)), - bucket("b", 6, BucketPriority(2)), - ), - removedBuckets = emptyList(), - )) - expectProgress(5 to 8, 7 to 14) + @Test + fun differentPriorities() = + runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + suspend fun expectProgress( + prio0: Pair, + prio2: Pair, + ) { + turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2)) + } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = + listOf( + bucket("a", 5, BucketPriority(0)), + bucket("b", 5, BucketPriority(2)), + ), + ), + ), + ) + expectProgress(0 to 5, 0 to 10) + + addDataLine("a", 5) + expectProgress(5 to 5, 5 to 10) + + addCheckpointComplete(BucketPriority(0)) + expectProgress(5 to 5, 5 to 10) + + addDataLine("b", 2) + expectProgress(5 to 5, 7 to 10) + + // Before syncing b fully, send a new checkpoint + syncLines.send( + SyncLine.CheckpointDiff( + lastOpId = "14", + updatedBuckets = + listOf( + bucket("a", 8, BucketPriority(0)), + bucket("b", 6, BucketPriority(2)), + ), + removedBuckets = emptyList(), + ), + ) + expectProgress(5 to 8, 7 to 14) - addDataLine("a", 3) - expectProgress(8 to 8, 10 to 14) - addDataLine("b", 4) - expectProgress(8 to 8, 14 to 14) + addDataLine("a", 3) + expectProgress(8 to 8, 10 to 14) + addDataLine("b", 4) + expectProgress(8 to 8, 14 to 14) - addCheckpointComplete() - turbine.expectNotDownloading() + addCheckpointComplete() + turbine.expectNotDownloading() - turbine.cancel() - } + turbine.cancel() + } - database.close() - syncLines.close() - } + database.close() + syncLines.close() + } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 5176b5ac..d8967c38 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -23,6 +23,7 @@ internal interface BucketStorage { suspend fun saveSyncData(syncDataBatch: SyncDataBatch) suspend fun getBucketStates(): List + suspend fun getBucketOperationProgress(): Map suspend fun removeBuckets(bucketsToDelete: List) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 06282171..273988e7 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -141,18 +141,21 @@ internal class BucketStorageImpl( }, ) - override suspend fun getBucketOperationProgress(): Map = buildMap { - val rows = db.getAll("SELECT name, count_at_last, count_since_last FROM ps_buckets") { cursor -> - cursor.getString(0)!! to LocalOperationCounters( - atLast = cursor.getLong(1)!!.toInt(), - sinceLast = cursor.getLong(2)!!.toInt(), - ) - } + override suspend fun getBucketOperationProgress(): Map = + buildMap { + val rows = + db.getAll("SELECT name, count_at_last, count_since_last FROM ps_buckets") { cursor -> + cursor.getString(0)!! to + LocalOperationCounters( + atLast = cursor.getLong(1)!!.toInt(), + sinceLast = cursor.getLong(2)!!.toInt(), + ) + } - for ((name, counters) in rows) { - put(name, counters) + for ((name, counters) in rows) { + put(name, counters) + } } - } override suspend fun removeBuckets(bucketsToDelete: List) { bucketsToDelete.forEach { bucketName -> @@ -321,14 +324,21 @@ internal class BucketStorageImpl( if (didApply && priority == null) { // Reset progress counters. We only do this for a complete sync, as we want a download progress to // always cover a complete checkpoint instead of resetting for partial completions. - tx.execute(""" + tx.execute( + """ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name WHERE ?1->name IS NOT NULL - """.trimIndent(), listOf(JsonUtil.json.encodeToString(buildMap { - for (bucket in checkpoint.checksums) { - bucket.count?.let { put(bucket.bucket, it) } - } - }))) + """.trimIndent(), + listOf( + JsonUtil.json.encodeToString( + buildMap { + for (bucket in checkpoint.checksums) { + bucket.count?.let { put(bucket.bucket, it) } + } + }, + ), + ), + ) } return@writeTransaction didApply diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 923663d8..78296ce2 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -362,12 +362,14 @@ internal class PowerSyncDatabaseImpl( syncSupervisorJob = null } - currentStatus.update { copy( - connected = false, - connecting = false, - downloading = false, - downloadProgress = null, - ) } + currentStatus.update { + copy( + connected = false, + connecting = false, + downloading = false, + downloadProgress = null, + ) + } } override suspend fun disconnectAndClear(clearLocal: Boolean) { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index e9c33c5b..d7e77f80 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -22,7 +22,7 @@ public data class ProgressWithOperations( */ public val fraction: Double get() { if (completed == 0) { - return 0.0; + return 0.0 } return completed.toDouble() / total.toDouble() @@ -45,24 +45,29 @@ public data class ProgressWithOperations( */ @ConsistentCopyVisibility public data class SyncDownloadProgress private constructor( - private val buckets: Map + private val buckets: Map, ) { /** * Creates download progress information from the local progress counters since the last full sync and the target * checkpoint. */ - internal constructor(localProgress: Map, target: Checkpoint) : this(buildMap { - for (entry in target.checksums) { - val savedProgress = localProgress[entry.bucket] + internal constructor(localProgress: Map, target: Checkpoint) : this( + buildMap { + for (entry in target.checksums) { + val savedProgress = localProgress[entry.bucket] - put(entry.bucket, BucketProgress( - priority = entry.priority, - atLast = savedProgress?.atLast ?: 0, - sinceLast = savedProgress?.sinceLast ?: 0, - targetCount = entry.count ?: 0, - )) - } - }) + put( + entry.bucket, + BucketProgress( + priority = entry.priority, + atLast = savedProgress?.atLast ?: 0, + sinceLast = savedProgress?.sinceLast ?: 0, + targetCount = entry.count ?: 0, + ), + ) + } + }, + ) /** * Download progress towards a complete checkpoint being received. @@ -83,33 +88,37 @@ public data class SyncDownloadProgress private constructor( return ProgressWithOperations(completed = completed, total = total) } - internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress { - return SyncDownloadProgress(buildMap { - putAll(this@SyncDownloadProgress.buckets) + internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress = + SyncDownloadProgress( + buildMap { + putAll(this@SyncDownloadProgress.buckets) - for (bucket in batch.buckets) { - val previous = get(bucket.bucket) ?: continue - put(bucket.bucket, previous.copy( - sinceLast = previous.sinceLast + bucket.data.size - )) - } - }) - } + for (bucket in batch.buckets) { + val previous = get(bucket.bucket) ?: continue + put( + bucket.bucket, + previous.copy( + sinceLast = previous.sinceLast + bucket.data.size, + ), + ) + } + }, + ) - private fun targetAndCompletedCounts(priority: BucketPriority): Pair { - return buckets.values.asSequence() + private fun targetAndCompletedCounts(priority: BucketPriority): Pair = + buckets.values + .asSequence() .filter { it.priority >= priority } .fold(0 to 0) { (prevTarget, prevCompleted), entry -> (prevTarget + entry.total) to (prevCompleted + entry.sinceLast) } - } } private data class BucketProgress( val priority: BucketPriority, val atLast: Int, val sinceLast: Int, - val targetCount: Int + val targetCount: Int, ) { val total get(): Int = targetCount - atLast } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 060c697d..5c42cb79 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -127,18 +127,20 @@ internal data class SyncStatusDataContainer( override val anyError get() = downloadError ?: uploadError - internal fun abortedDownload() = copy( - downloading = false, - downloadProgress = null, - ) - - internal fun copyWithCompletedDownload() = copy( - lastSyncedAt = Clock.System.now(), - downloading = false, - downloadProgress = null, - hasSynced = true, - downloadError = null, - ) + internal fun abortedDownload() = + copy( + downloading = false, + downloadProgress = null, + ) + + internal fun copyWithCompletedDownload() = + copy( + lastSyncedAt = Clock.System.now(), + downloading = false, + downloadProgress = null, + hasSynced = true, + downloadError = null, + ) } @ConsistentCopyVisibility diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 1d63c96a..0833e67b 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -306,10 +306,12 @@ internal class SyncStream( bucketsToDelete: List, ) { val progress = bucketStorage.getBucketOperationProgress() - status.update { copy( - downloading = true, - downloadProgress = SyncDownloadProgress(progress, checkpoint), - ) } + status.update { + copy( + downloading = true, + downloadProgress = SyncDownloadProgress(progress, checkpoint), + ) + } if (bucketsToDelete.isNotEmpty()) { logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" } @@ -364,18 +366,20 @@ internal class SyncStream( } status.update { - copy(priorityStatusEntries = - buildList { - // All states with a higher priority can be deleted since this partial sync includes them. - addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) - add( - PriorityStatusEntry( - priority = priority, - lastSyncedAt = Clock.System.now(), - hasSynced = true, - ), - ) - },) + copy( + priorityStatusEntries = + buildList { + // All states with a higher priority can be deleted since this partial sync includes them. + addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) + add( + PriorityStatusEntry( + priority = priority, + lastSyncedAt = Clock.System.now(), + hasSynced = true, + ), + ) + }, + ) } return state } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt index 048d20d0..b51211a9 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt @@ -4,7 +4,6 @@ import kotlin.test.Test import kotlin.test.assertEquals class ProgressTest { - @Test fun reportsFraction() { assertEquals(0.0, ProgressWithOperations(0, 10).fraction) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 1dd37a18..8e360896 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -144,7 +144,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), ) - syncStream.status.update { copy(connected=true) } + syncStream.status.update { copy(connected = true) } syncStream.triggerCrudUpload() testLogWriter.assertCount(2) From a7ca311d26844fcd79ea578e9a60c805d7e77757 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 12:24:37 +0200 Subject: [PATCH 04/11] Remove old copy method entirely --- .../kotlin/com/powersync/sync/SyncStatus.kt | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 5c42cb79..067080b9 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -154,38 +154,6 @@ public data class SyncStatus internal constructor( */ public fun asFlow(): SharedFlow = stateFlow.asSharedFlow() - /** - * Updates the internal sync status indicators and emits Flow updates - */ - @Deprecated("Use callback-based approach instead") - internal fun update( - connected: Boolean? = null, - connecting: Boolean? = null, - downloading: Boolean? = null, - uploading: Boolean? = null, - hasSynced: Boolean? = null, - lastSyncedAt: Instant? = null, - uploadError: Any? = null, - downloadError: Any? = null, - clearUploadError: Boolean = false, - clearDownloadError: Boolean = false, - priorityStatusEntries: List? = null, - ) { - data = - data.copy( - connected = connected ?: data.connected, - connecting = connecting ?: data.connecting, - downloading = downloading ?: data.downloading, - uploading = uploading ?: data.uploading, - lastSyncedAt = lastSyncedAt ?: data.lastSyncedAt, - hasSynced = hasSynced ?: data.hasSynced, - priorityStatusEntries = priorityStatusEntries ?: data.priorityStatusEntries, - uploadError = if (clearUploadError) null else uploadError, - downloadError = if (clearDownloadError) null else downloadError, - ) - stateFlow.value = data - } - /** * Updates the internal sync status indicators and emits Flow updates */ From 6dba5b65ae8a9a6df1d24850772430af82a0d4e2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 14:42:14 +0200 Subject: [PATCH 05/11] Add sync progress to example --- .../com/powersync/compose/DatabaseState.kt | 18 +++++ .../kotlin/com/powersync/sync/Progress.kt | 6 +- demos/supabase-todolist/settings.gradle.kts | 21 +++--- .../supabase-todolist/shared/build.gradle.kts | 1 + .../kotlin/com/powersync/demos/App.kt | 8 +-- .../powersync/demos/components/GuardBySync.kt | 66 +++++++++++++++++++ .../com/powersync/demos/screens/HomeScreen.kt | 39 ++++------- 7 files changed, 115 insertions(+), 44 deletions(-) create mode 100644 compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt create mode 100644 demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt new file mode 100644 index 00000000..14fb0fc3 --- /dev/null +++ b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt @@ -0,0 +1,18 @@ +package com.powersync.compose + +import androidx.compose.runtime.Composable +import androidx.compose.runtime.State +import androidx.compose.runtime.collectAsState +import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStatusData +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.debounce +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +@OptIn(FlowPreview::class) +@Composable +public fun SyncStatus.composeState(debounce: Duration=200.0.milliseconds): State = asFlow() + // Debouncing the status flow prevents flicker + .debounce(debounce) + .collectAsState(initial = this) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index d7e77f80..dc301a0b 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -20,12 +20,12 @@ public data class ProgressWithOperations( /** * The relative amount of [total] items that have been [completed], as a number between `0.0` and `1.0`. */ - public val fraction: Double get() { + public val fraction: Float get() { if (completed == 0) { - return 0.0 + return 0.0f } - return completed.toDouble() / total.toDouble() + return completed.toFloat() / total.toFloat() } } diff --git a/demos/supabase-todolist/settings.gradle.kts b/demos/supabase-todolist/settings.gradle.kts index 73016629..3e4cd6c1 100644 --- a/demos/supabase-todolist/settings.gradle.kts +++ b/demos/supabase-todolist/settings.gradle.kts @@ -44,15 +44,18 @@ val useReleasedVersions = localProperties.getProperty("USE_RELEASED_POWERSYNC_VE if (!useReleasedVersions) { includeBuild("../..") { dependencySubstitution { - substitute(module("com.powersync:core")) - .using(project(":core")) - .because("we want to auto-wire up sample dependency") - substitute(module("com.powersync:persistence")) - .using(project(":persistence")) - .because("we want to auto-wire up sample dependency") - substitute(module("com.powersync:connector-supabase")) - .using(project(":connectors:supabase")) - .because("we want to auto-wire up sample dependency") + val replacements = mapOf( + "core" to "core", + "persistence" to "persistence", + "connector-supabase" to "connectors:supabase", + "compose" to "compose" + ) + + replacements.forEach { (moduleName, projectName) -> + substitute(module("com.powersync:$moduleName")) + .using(project(":$projectName")) + .because("we want to auto-wire up sample dependency") + } } } } diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts index f2a59ab1..708ed52c 100644 --- a/demos/supabase-todolist/shared/build.gradle.kts +++ b/demos/supabase-todolist/shared/build.gradle.kts @@ -44,6 +44,7 @@ kotlin { // at: https://central.sonatype.com/artifact/com.powersync/core api("com.powersync:core:latest.release") implementation("com.powersync:connector-supabase:latest.release") + implementation("com.powersync:compose:latest.release") implementation(libs.uuid) implementation(compose.runtime) implementation(compose.foundation) diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt index c1a1cb95..5e7edd28 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -14,6 +14,7 @@ import androidx.compose.ui.Modifier import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.bucket.BucketPriority +import com.powersync.compose.composeState import com.powersync.connector.supabase.SupabaseConnector import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.demos.components.EditDialog @@ -25,7 +26,6 @@ import com.powersync.demos.screens.HomeScreen import com.powersync.demos.screens.SignInScreen import com.powersync.demos.screens.SignUpScreen import com.powersync.demos.screens.TodosScreen -import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.runBlocking import org.koin.compose.KoinApplication import org.koin.compose.koinInject @@ -71,11 +71,7 @@ fun AppContent( db: PowerSyncDatabase = koinInject(), modifier: Modifier = Modifier, ) { - // Debouncing the status flow prevents flicker - val status by db.currentStatus - .asFlow() - .debounce(200) - .collectAsState(initial = db.currentStatus) + val status by db.currentStatus.composeState() // This assumes that the buckets for lists has a priority of 1 (but it will work fine with sync // rules not defining any priorities at all too). When giving lists a higher priority than diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt new file mode 100644 index 00000000..47f851f2 --- /dev/null +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -0,0 +1,66 @@ +package com.powersync.demos.components + +import androidx.compose.foundation.background +import androidx.compose.foundation.layout.Arrangement +import androidx.compose.foundation.layout.Column +import androidx.compose.foundation.layout.fillMaxSize +import androidx.compose.foundation.layout.fillMaxWidth +import androidx.compose.foundation.layout.padding +import androidx.compose.material.LinearProgressIndicator +import androidx.compose.material.MaterialTheme +import androidx.compose.material.Text +import androidx.compose.runtime.Composable +import androidx.compose.runtime.getValue +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.unit.dp +import com.powersync.PowerSyncDatabase +import com.powersync.compose.composeState +import org.koin.compose.koinInject + +/** + * A component that renders its [content] only after a first complete sync was completed on [db]. + * + * Before that, a progress indicator is shown instead. + */ +@Composable +fun GuardBySync( + db: PowerSyncDatabase = koinInject(), + content: @Composable () -> Unit +) { + val state by db.currentStatus.composeState() + + if (state.hasSynced == true) { + content() + return + } + + Column( + modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background), + horizontalAlignment = Alignment.CenterHorizontally, + verticalArrangement = Arrangement.Center, + ) { + Text( + text = "Busy with initial sync...", + style = MaterialTheme.typography.h6, + ) + + val progress = state.downloadProgress?.untilCompletion + if (progress != null) { + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth().padding(8.dp), + progress = progress.fraction, + ) + + if (progress.total == progress.completed) { + Text("Applying server-side changes...") + } else { + Text("Downloaded ${progress.completed} out of ${progress.total}.") + } + } else { + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth().padding(8.dp), + ) + } + } +} diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index c543be58..a4da28f4 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -17,6 +17,7 @@ import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp import com.powersync.demos.Screen +import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input import com.powersync.demos.components.ListContent import com.powersync.demos.components.Menu @@ -57,34 +58,20 @@ internal fun HomeScreen( }, ) - when { - syncStatus.hasSynced == null || syncStatus.hasSynced == false -> { - Box( - modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background), - contentAlignment = Alignment.Center, - ) { - Text( - text = "Busy with initial sync...", - style = MaterialTheme.typography.h6, - ) - } - } + GuardBySync { + Input( + text = inputText, + onAddClicked = onAddItemClicked, + onTextChanged = onInputTextChanged, + screen = Screen.Home, + ) - else -> { - Input( - text = inputText, - onAddClicked = onAddItemClicked, - onTextChanged = onInputTextChanged, - screen = Screen.Home, + Box(Modifier.weight(1F)) { + ListContent( + items = items, + onItemClicked = onItemClicked, + onItemDeleteClicked = onItemDeleteClicked, ) - - Box(Modifier.weight(1F)) { - ListContent( - items = items, - onItemClicked = onItemClicked, - onItemDeleteClicked = onItemDeleteClicked, - ) - } } } } From 5aefa298dba8f3539a9a655220601e7790de426a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 14:52:59 +0200 Subject: [PATCH 06/11] Improve state on startup --- .../kotlin/com/powersync/demos/App.kt | 8 ------- .../powersync/demos/components/GuardBySync.kt | 24 +++++++++++++++---- .../com/powersync/demos/screens/HomeScreen.kt | 8 ++++++- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt index 5e7edd28..9d326537 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -73,14 +73,6 @@ fun AppContent( ) { val status by db.currentStatus.composeState() - // This assumes that the buckets for lists has a priority of 1 (but it will work fine with sync - // rules not defining any priorities at all too). When giving lists a higher priority than - // items, we can have a consistent snapshot of lists without items. In the case where many items - // exist (that might take longer to sync initially), this allows us to display lists earlier. - val hasSyncedLists by remember { - derivedStateOf { status.statusForPriority(BucketPriority(1)).hasSynced } - } - val authViewModel = koinViewModel() val navController = koinInject() val authState by authViewModel.authState.collectAsState() diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt index 47f851f2..001d912f 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -15,6 +15,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.unit.dp import com.powersync.PowerSyncDatabase +import com.powersync.bucket.BucketPriority import com.powersync.compose.composeState import org.koin.compose.koinInject @@ -26,6 +27,7 @@ import org.koin.compose.koinInject @Composable fun GuardBySync( db: PowerSyncDatabase = koinInject(), + priority: BucketPriority? = null, content: @Composable () -> Unit ) { val state by db.currentStatus.composeState() @@ -40,12 +42,24 @@ fun GuardBySync( horizontalAlignment = Alignment.CenterHorizontally, verticalArrangement = Arrangement.Center, ) { - Text( - text = "Busy with initial sync...", - style = MaterialTheme.typography.h6, - ) + // When we have no hasSynced information, the database is still being opened. We just show a + // generic progress bar in that case. + val databaseOpening = state.hasSynced == null - val progress = state.downloadProgress?.untilCompletion + if (!databaseOpening) { + Text( + text = "Busy with initial sync...", + style = MaterialTheme.typography.h6, + ) + } + + val progress = state.downloadProgress?.let { + if (priority == null) { + it.untilCompletion + } else { + it.untilPriority(priority) + } + } if (progress != null) { LinearProgressIndicator( modifier = Modifier.fillMaxWidth().padding(8.dp), diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index a4da28f4..4e53852e 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -16,6 +16,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp +import com.powersync.bucket.BucketPriority import com.powersync.demos.Screen import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input @@ -58,7 +59,12 @@ internal fun HomeScreen( }, ) - GuardBySync { + // This assumes that the buckets for lists has a priority of 1 (but it will work fine with + // sync rules not defining any priorities at all too). When giving lists a higher priority + // than items, we can have a consistent snapshot of lists without items. In the case where + // many items exist (that might take longer to sync initially), this allows us to display + // lists earlier. + GuardBySync(priority = BucketPriority(1)) { Input( text = inputText, onAddClicked = onAddItemClicked, From 7b6ce58ecd99afcc134534fdea59799f0a781137 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 15:11:40 +0200 Subject: [PATCH 07/11] Don't debounce by default --- .../kotlin/com/powersync/compose/DatabaseState.kt | 14 +++++++++----- .../commonMain/kotlin/com/powersync/demos/App.kt | 3 ++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt index 14fb0fc3..87599ddc 100644 --- a/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt +++ b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt @@ -6,13 +6,17 @@ import androidx.compose.runtime.collectAsState import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlin.time.Duration -import kotlin.time.Duration.Companion.milliseconds @OptIn(FlowPreview::class) @Composable -public fun SyncStatus.composeState(debounce: Duration=200.0.milliseconds): State = asFlow() - // Debouncing the status flow prevents flicker - .debounce(debounce) - .collectAsState(initial = this) +public fun SyncStatus.composeState(debounce: Duration=Duration.ZERO): State { + var flow: Flow = asFlow() + if (debounce.isPositive()) { + flow = flow.debounce(debounce) + } + + return flow.collectAsState(initial = this) +} diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt index 9d326537..de0bdd68 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -35,6 +35,7 @@ import org.koin.core.module.dsl.bind import org.koin.core.module.dsl.viewModelOf import org.koin.core.module.dsl.withOptions import org.koin.dsl.module +import kotlin.time.Duration.Companion.milliseconds val sharedAppModule = module { // This is overridden by the androidBackgroundSync example @@ -71,7 +72,7 @@ fun AppContent( db: PowerSyncDatabase = koinInject(), modifier: Modifier = Modifier, ) { - val status by db.currentStatus.composeState() + val status by db.currentStatus.composeState(debounce = 200.0.milliseconds) val authViewModel = koinViewModel() val navController = koinInject() From 0a92db70becff0a00d4e48842d539f18c252c0e1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 15:13:16 +0200 Subject: [PATCH 08/11] Add changelog entries --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca6dfc14..ac6b2ec8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## 1.0.0-BETA29 (unreleased) * Fix potential race condition between jobs in `connect()` and `disconnect()`. +* Report real-time progress information about downloads through `SyncStatus.downloadProgress`. +* Compose: Add `composeState()` extension method on `SyncStatus`. ## 1.0.0-BETA28 From a99e7913d24a79d7330d171f8103e9f1ed17c1a7 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 15:17:32 +0200 Subject: [PATCH 09/11] Fix progress test --- .../commonTest/kotlin/com/powersync/sync/ProgressTest.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt index b51211a9..49b46d14 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt @@ -6,10 +6,10 @@ import kotlin.test.assertEquals class ProgressTest { @Test fun reportsFraction() { - assertEquals(0.0, ProgressWithOperations(0, 10).fraction) - assertEquals(0.5, ProgressWithOperations(5, 10).fraction) - assertEquals(1.0, ProgressWithOperations(10, 10).fraction) + assertEquals(0.0f, ProgressWithOperations(0, 10).fraction) + assertEquals(0.5f, ProgressWithOperations(5, 10).fraction) + assertEquals(1.0f, ProgressWithOperations(10, 10).fraction) - assertEquals(0.0, ProgressWithOperations(0, 0).fraction) + assertEquals(0.0f, ProgressWithOperations(0, 0).fraction) } } From 5113f5c1d636173351ee59fe178d41d31f441182 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 15:36:10 +0200 Subject: [PATCH 10/11] Fix version check, raise minimum version --- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 18 ++------ .../powersync/db/internal/PowerSyncVersion.kt | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+), 14 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 78296ce2..3c7abb3d 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -14,6 +14,7 @@ import com.powersync.db.crud.CrudRow import com.powersync.db.crud.CrudTransaction import com.powersync.db.internal.InternalDatabaseImpl import com.powersync.db.internal.InternalTable +import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable import com.powersync.sync.PriorityStatusEntry @@ -461,20 +462,9 @@ internal class PowerSyncDatabaseImpl( * Check that a supported version of the powersync extension is loaded. */ private fun checkVersion(powerSyncVersion: String) { - // Parse version - val versionInts: List = - try { - powerSyncVersion - .split(Regex("[./]")) - .take(3) - .map { it.toInt() } - } catch (e: Exception) { - throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion. Details: $e") - } - - // Validate ^0.2.0 - if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) { - throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion") + val version = PowerSyncVersion.parse(powerSyncVersion) + if (version < PowerSyncVersion.MINIMUM) { + PowerSyncVersion.mismatchError(powerSyncVersion) } } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt new file mode 100644 index 00000000..779ef9c1 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt @@ -0,0 +1,44 @@ +package com.powersync.db.internal + +internal data class PowerSyncVersion(val major: Int, val minor: Int, val patch: Int): Comparable { + override fun compareTo(other: PowerSyncVersion): Int { + return when (val compareMajor = major.compareTo(other.major)) { + 0 -> when (val compareMinor = minor.compareTo(other.minor)) { + 0 -> patch.compareTo(other.patch) + else -> compareMinor + } + else -> compareMajor + } + } + + override fun toString(): String { + return "$major.$minor.$patch" + } + + companion object { + val MINIMUM: PowerSyncVersion = PowerSyncVersion(0, 3, 13) + + fun parse(from: String): PowerSyncVersion { + val versionInts: List = + try { + from + .split(Regex("[./]")) + .take(3) + .map { it.toInt() } + } catch (e: Exception) { + mismatchError(from, e.toString()) + } + + return PowerSyncVersion(versionInts[0], versionInts[1], versionInts[2]) + } + + fun mismatchError(actualVersion: String, details: String? = null): Nothing { + var message = "Unsupported PowerSync extension version (need ^$MINIMUM, got $actualVersion)." + if (details != null) { + message = " Details: $details" + } + + throw Exception(message) + } + } +} From 83f25bc484ad065f84128e0566ea7f2c637b1868 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 14 Apr 2025 14:15:27 +0200 Subject: [PATCH 11/11] Simplify API --- .../com/powersync/sync/SyncProgressTest.kt | 8 +-- .../kotlin/com/powersync/sync/Progress.kt | 57 ++++++++++++------- .../kotlin/com/powersync/sync/ProgressTest.kt | 8 +-- .../powersync/demos/components/GuardBySync.kt | 9 +-- 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index 3a6693e9..c3b0c9c0 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -82,14 +82,14 @@ class SyncProgressTest : BaseInMemorySyncTest() { val progress = item.downloadProgress ?: error("Expected download progress on $item") assertTrue { item.downloading } - assertEquals(total.first, progress.untilCompletion.completed) - assertEquals(total.second, progress.untilCompletion.total) + assertEquals(total.first, progress.downloadedOperations) + assertEquals(total.second, progress.totalOperations) priorities.forEach { (priority, expected) -> val (expectedDownloaded, expectedTotal) = expected val progress = progress.untilPriority(priority) - assertEquals(expectedDownloaded, progress.completed) - assertEquals(expectedTotal, progress.total) + assertEquals(expectedDownloaded, progress.downloadedOperations) + assertEquals(expectedTotal, progress.totalOperations) } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index dc301a0b..1d2158d7 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -7,31 +7,48 @@ import com.powersync.bucket.LocalOperationCounters /** * Information about a progressing download. * - * This reports the [total] amount of operations to download, how many of them have already been [completed] and finally - * a [fraction] indicating relative progress. + * This reports the [totalOperations] amount of operations to download, how many of them have already been + * [downloadedOperations] and finally a [fraction] indicating relative progress. * * To obtain a [ProgressWithOperations] instance, use a method on [SyncDownloadProgress] which in turn is available * on [SyncStatusData]. */ -public data class ProgressWithOperations( - val completed: Int, - val total: Int, -) { +public interface ProgressWithOperations { + /** + * How many operations need to be downloaded in total for the current download to complete. + */ + public val totalOperations: Int + + /** + * How many operations, out of [totalOperations], have already been downloaded. + */ + public val downloadedOperations: Int + /** - * The relative amount of [total] items that have been [completed], as a number between `0.0` and `1.0`. + * The relative amount of [totalOperations] to items in [downloadedOperations], as a number between `0.0` and `1.0`. */ public val fraction: Float get() { - if (completed == 0) { + if (totalOperations == 0) { return 0.0f } - return completed.toFloat() / total.toFloat() + return downloadedOperations.toFloat() / totalOperations.toFloat() } } +internal data class ProgressInfo( + override val downloadedOperations: Int, + override val totalOperations: Int, +): ProgressWithOperations + /** * Provides realtime progress on how PowerSync is downloading rows. * + * This type reports progress by implementing [ProgressWithOperations], meaning that the [totalOperations], + * [downloadedOperations] and [fraction] getters are available on this instance. + * Additionally, it's possible to obtain the progress towards a specific priority only (instead of tracking progress for + * the entire download) by using [untilPriority]. + * * The reported progress always reflects the status towards the end of a sync iteration (after which a consistent * snapshot of all buckets is available locally). * @@ -46,7 +63,17 @@ public data class ProgressWithOperations( @ConsistentCopyVisibility public data class SyncDownloadProgress private constructor( private val buckets: Map, -) { +): ProgressWithOperations { + + override val downloadedOperations: Int + override val totalOperations: Int + + init { + val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY) + totalOperations = target + downloadedOperations = completed + } + /** * Creates download progress information from the local progress counters since the last full sync and the target * checkpoint. @@ -69,14 +96,6 @@ public data class SyncDownloadProgress private constructor( }, ) - /** - * Download progress towards a complete checkpoint being received. - * - * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded - * in total and how many of them have already been received. - */ - public val untilCompletion: ProgressWithOperations get() = untilPriority(BucketPriority.FULL_SYNC_PRIORITY) - /** * Returns download progress towards all data up until the specified [priority] being received. * @@ -85,7 +104,7 @@ public data class SyncDownloadProgress private constructor( */ public fun untilPriority(priority: BucketPriority): ProgressWithOperations { val (total, completed) = targetAndCompletedCounts(priority) - return ProgressWithOperations(completed = completed, total = total) + return ProgressInfo(totalOperations = total, downloadedOperations = completed) } internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress = diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt index 49b46d14..c43e7c51 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt @@ -6,10 +6,10 @@ import kotlin.test.assertEquals class ProgressTest { @Test fun reportsFraction() { - assertEquals(0.0f, ProgressWithOperations(0, 10).fraction) - assertEquals(0.5f, ProgressWithOperations(5, 10).fraction) - assertEquals(1.0f, ProgressWithOperations(10, 10).fraction) + assertEquals(0.0f, ProgressInfo(0, 10).fraction) + assertEquals(0.5f, ProgressInfo(5, 10).fraction) + assertEquals(1.0f, ProgressInfo(10, 10).fraction) - assertEquals(0.0f, ProgressWithOperations(0, 0).fraction) + assertEquals(0.0f, ProgressInfo(0, 0).fraction) } } diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt index 001d912f..52e5b521 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -17,6 +17,7 @@ import androidx.compose.ui.unit.dp import com.powersync.PowerSyncDatabase import com.powersync.bucket.BucketPriority import com.powersync.compose.composeState +import com.powersync.sync.SyncStatusData import org.koin.compose.koinInject /** @@ -30,7 +31,7 @@ fun GuardBySync( priority: BucketPriority? = null, content: @Composable () -> Unit ) { - val state by db.currentStatus.composeState() + val state: SyncStatusData by db.currentStatus.composeState() if (state.hasSynced == true) { content() @@ -55,7 +56,7 @@ fun GuardBySync( val progress = state.downloadProgress?.let { if (priority == null) { - it.untilCompletion + it } else { it.untilPriority(priority) } @@ -66,10 +67,10 @@ fun GuardBySync( progress = progress.fraction, ) - if (progress.total == progress.completed) { + if (progress.downloadedOperations == progress.totalOperations) { Text("Applying server-side changes...") } else { - Text("Downloaded ${progress.completed} out of ${progress.total}.") + Text("Downloaded ${progress.downloadedOperations} out of ${progress.totalOperations}.") } } else { LinearProgressIndicator(