Skip to content

Commit 7d62a23

Browse files
authored
Merge pull request #148 from powersync-ja/refactor-connector-mutexes
Refactor mutexes and duplicate instance detection logic
2 parents 74e904e + 0d893f0 commit 7d62a23

File tree

10 files changed

+252
-99
lines changed

10 files changed

+252
-99
lines changed

core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ class AndroidDatabaseTest {
3434

3535
@After
3636
fun tearDown() {
37-
runBlocking { database.disconnectAndClear(true) }
37+
runBlocking {
38+
database.disconnectAndClear(true)
39+
database.close()
40+
}
3841
}
3942

4043
@Test

core/build.gradle.kts

+15-5
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,11 @@ kotlin {
211211
dependsOn(commonTest.get())
212212
}
213213

214+
val commonJava by creating {
215+
kotlin.srcDir("commonJava")
216+
dependsOn(commonMain.get())
217+
}
218+
214219
commonMain.dependencies {
215220
implementation(libs.uuid)
216221
implementation(libs.kotlin.stdlib)
@@ -226,13 +231,18 @@ kotlin {
226231
api(libs.kermit)
227232
}
228233

229-
androidMain.dependencies {
230-
implementation(libs.ktor.client.okhttp)
234+
androidMain {
235+
dependsOn(commonJava)
236+
dependencies.implementation(libs.ktor.client.okhttp)
231237
}
232238

233-
jvmMain.dependencies {
234-
implementation(libs.ktor.client.okhttp)
235-
implementation(libs.sqlite.jdbc)
239+
jvmMain {
240+
dependsOn(commonJava)
241+
242+
dependencies {
243+
implementation(libs.ktor.client.okhttp)
244+
implementation(libs.sqlite.jdbc)
245+
}
236246
}
237247

238248
iosMain.dependencies {

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import co.touchlab.kermit.Logger
66
import co.touchlab.kermit.Severity
77
import co.touchlab.kermit.TestConfig
88
import co.touchlab.kermit.TestLogWriter
9-
import com.powersync.db.PowerSyncDatabaseImpl
9+
import com.powersync.db.ActiveDatabaseGroup
1010
import com.powersync.db.schema.Schema
1111
import com.powersync.testutils.UserRow
1212
import com.powersync.testutils.waitFor
@@ -122,7 +122,7 @@ class DatabaseTest {
122122
waitFor {
123123
assertNotNull(
124124
logWriter.logs.find {
125-
it.message == PowerSyncDatabaseImpl.multipleInstancesMessage
125+
it.message == ActiveDatabaseGroup.multipleInstancesMessage
126126
},
127127
)
128128
}

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class SyncIntegrationTest {
120120
fun testPartialSync() =
121121
runTest {
122122
val syncStream = syncStream()
123-
database.connect(syncStream, 1000L)
123+
database.connectInternal(syncStream, 1000L)
124124

125125
val checksums =
126126
buildList {
@@ -214,7 +214,7 @@ class SyncIntegrationTest {
214214
fun testRemembersLastPartialSync() =
215215
runTest {
216216
val syncStream = syncStream()
217-
database.connect(syncStream, 1000L)
217+
database.connectInternal(syncStream, 1000L)
218218

219219
syncLines.send(
220220
SyncLine.FullCheckpoint(
@@ -253,7 +253,7 @@ class SyncIntegrationTest {
253253
fun setsDownloadingState() =
254254
runTest {
255255
val syncStream = syncStream()
256-
database.connect(syncStream, 1000L)
256+
database.connectInternal(syncStream, 1000L)
257257

258258
turbineScope(timeout = 10.0.seconds) {
259259
val turbine = database.currentStatus.asFlow().testIn(this)
@@ -291,7 +291,7 @@ class SyncIntegrationTest {
291291
val syncStream = syncStream()
292292
val turbine = database.currentStatus.asFlow().testIn(this)
293293

294-
database.connect(syncStream, 1000L)
294+
database.connectInternal(syncStream, 1000L)
295295
turbine.waitFor { it.connecting }
296296

297297
database.disconnect()
@@ -308,7 +308,7 @@ class SyncIntegrationTest {
308308
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
309309
runTest {
310310
val syncStream = syncStream()
311-
database.connect(syncStream, 1000L)
311+
database.connectInternal(syncStream, 1000L)
312312

313313
turbineScope(timeout = 10.0.seconds) {
314314
val turbine = database.currentStatus.asFlow().testIn(this)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.powersync.db
2+
3+
internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any {
4+
// We can't do this on Java 8 :(
5+
return object {}
6+
}
7+
8+
// This would require Java 9+
9+
10+
/*
11+
import java.lang.ref.Cleaner
12+
13+
internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any {
14+
// Note: It's important that the returned object does not reference the resource directly
15+
val wrapper = CleanableWrapper()
16+
CleanableWrapper.cleaner.register(wrapper, resource::dispose)
17+
return wrapper
18+
}
19+
20+
private class CleanableWrapper {
21+
var cleanable: Cleaner.Cleanable? = null
22+
23+
companion object {
24+
val cleaner: Cleaner = Cleaner.create()
25+
}
26+
}
27+
*/
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,94 @@
11
package com.powersync.db
22

3-
import com.powersync.PowerSyncDatabase
4-
import com.powersync.utils.ExclusiveMethodProvider
5-
6-
internal class ActiveInstanceStore : ExclusiveMethodProvider() {
7-
private val instances = mutableListOf<PowerSyncDatabase>()
8-
9-
/**
10-
* Registers an instance. Returns true if multiple instances with the same identifier are
11-
* present.
12-
*/
13-
suspend fun registerAndCheckInstance(db: PowerSyncDatabase) =
14-
exclusiveMethod("instances") {
15-
instances.add(db)
16-
return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1
3+
import co.touchlab.kermit.Logger
4+
import co.touchlab.stately.concurrency.AtomicBoolean
5+
import co.touchlab.stately.concurrency.Synchronizable
6+
import co.touchlab.stately.concurrency.synchronize
7+
import kotlinx.coroutines.sync.Mutex
8+
9+
/**
10+
* Returns an object that, when deallocated, calls [ActiveDatabaseResource.dispose].
11+
*/
12+
internal expect fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any
13+
14+
/**
15+
* An collection of PowerSync databases with the same path / identifier.
16+
*
17+
* We expect that each group will only ever have one database because we encourage users to write their databases as
18+
* singletons. We print a warning when two databases are part of the same group.
19+
* Additionally, we want to avoid two databases in the same group having a sync stream open at the same time to avoid
20+
* duplicate resources being used. For this reason, each active database group has a coroutine mutex guarding the
21+
* sync job.
22+
*/
23+
internal class ActiveDatabaseGroup(
24+
val identifier: String,
25+
private val collection: GroupsCollection,
26+
) {
27+
internal var refCount = 0 // Guarded by companion object
28+
internal val syncMutex = Mutex()
29+
30+
fun removeUsage() {
31+
collection.synchronize {
32+
if (--refCount == 0) {
33+
collection.allGroups.remove(this)
34+
}
1735
}
36+
}
37+
38+
internal open class GroupsCollection : Synchronizable() {
39+
internal val allGroups = mutableListOf<ActiveDatabaseGroup>()
40+
41+
private fun findGroup(
42+
warnOnDuplicate: Logger,
43+
identifier: String,
44+
): ActiveDatabaseGroup =
45+
synchronize {
46+
val existing = allGroups.asSequence().firstOrNull { it.identifier == identifier }
47+
val resolvedGroup =
48+
if (existing == null) {
49+
val added = ActiveDatabaseGroup(identifier, this)
50+
allGroups.add(added)
51+
added
52+
} else {
53+
existing
54+
}
55+
56+
if (resolvedGroup.refCount++ != 0) {
57+
warnOnDuplicate.w { multipleInstancesMessage }
58+
}
59+
60+
resolvedGroup
61+
}
62+
63+
internal fun referenceDatabase(
64+
warnOnDuplicate: Logger,
65+
identifier: String,
66+
): Pair<ActiveDatabaseResource, Any> {
67+
val group = findGroup(warnOnDuplicate, identifier)
68+
val resource = ActiveDatabaseResource(group)
69+
70+
return resource to disposeWhenDeallocated(resource)
71+
}
72+
}
73+
74+
companion object : GroupsCollection() {
75+
internal val multipleInstancesMessage =
76+
"""
77+
Multiple PowerSync instances for the same database have been detected.
78+
This can cause unexpected results.
79+
Please check your PowerSync client instantiation logic if this is not intentional.
80+
""".trimIndent()
81+
}
82+
}
83+
84+
internal class ActiveDatabaseResource(
85+
val group: ActiveDatabaseGroup,
86+
) {
87+
val disposed = AtomicBoolean(false)
1888

19-
suspend fun removeInstance(db: PowerSyncDatabase) =
20-
exclusiveMethod("instances") {
21-
instances.remove(db)
89+
fun dispose() {
90+
if (disposed.compareAndSet(false, true)) {
91+
group.removeUsage()
2292
}
93+
}
2394
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

+23-30
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import com.powersync.sync.PriorityStatusEntry
2020
import com.powersync.sync.SyncStatus
2121
import com.powersync.sync.SyncStatusData
2222
import com.powersync.sync.SyncStream
23-
import com.powersync.utils.ExclusiveMethodProvider
2423
import com.powersync.utils.JsonParam
2524
import com.powersync.utils.JsonUtil
2625
import com.powersync.utils.throttle
@@ -35,6 +34,8 @@ import kotlinx.coroutines.flow.filter
3534
import kotlinx.coroutines.flow.first
3635
import kotlinx.coroutines.launch
3736
import kotlinx.coroutines.runBlocking
37+
import kotlinx.coroutines.sync.Mutex
38+
import kotlinx.coroutines.sync.withLock
3839
import kotlinx.datetime.Instant
3940
import kotlinx.datetime.LocalDateTime
4041
import kotlinx.datetime.TimeZone
@@ -57,8 +58,7 @@ internal class PowerSyncDatabaseImpl(
5758
private val dbFilename: String,
5859
val logger: Logger = Logger,
5960
driver: PsSqlDriver = factory.createDriver(scope, dbFilename),
60-
) : ExclusiveMethodProvider(),
61-
PowerSyncDatabase {
61+
) : PowerSyncDatabase {
6262
companion object {
6363
internal val streamConflictMessage =
6464
"""
@@ -68,41 +68,33 @@ internal class PowerSyncDatabaseImpl(
6868
This connection attempt will be queued and will only be executed after
6969
currently connecting clients are disconnected.
7070
""".trimIndent()
71-
72-
internal val multipleInstancesMessage =
73-
"""
74-
Multiple PowerSync instances for the same database have been detected.
75-
This can cause unexpected results.
76-
Please check your PowerSync client instantiation logic if this is not intentional.
77-
""".trimIndent()
78-
79-
internal val instanceStore = ActiveInstanceStore()
8071
}
8172

8273
override val identifier = dbFilename
8374

8475
private val internalDb = InternalDatabaseImpl(driver, scope)
8576
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
77+
private val resource: ActiveDatabaseResource
78+
private val clearResourceWhenDisposed: Any
79+
8680
var closed = false
8781

8882
/**
8983
* The current sync status.
9084
*/
9185
override val currentStatus: SyncStatus = SyncStatus()
9286

87+
private val mutex = Mutex()
9388
private var syncStream: SyncStream? = null
94-
9589
private var syncJob: Job? = null
96-
9790
private var uploadJob: Job? = null
9891

9992
init {
100-
val db = this
93+
val res = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
94+
resource = res.first
95+
clearResourceWhenDisposed = res.second
96+
10197
runBlocking {
102-
val isMultiple = instanceStore.registerAndCheckInstance(db)
103-
if (isMultiple) {
104-
logger.w { multipleInstancesMessage }
105-
}
10698
val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne()
10799
logger.d { "SQLiteVersion: $sqliteVersion" }
108100
checkVersion()
@@ -126,10 +118,10 @@ internal class PowerSyncDatabaseImpl(
126118
crudThrottleMs: Long,
127119
retryDelayMs: Long,
128120
params: Map<String, JsonParam?>,
129-
) = exclusiveMethod("connect") {
130-
disconnect()
121+
) = mutex.withLock {
122+
disconnectInternal()
131123

132-
connect(
124+
connectInternal(
133125
SyncStream(
134126
bucketStorage = bucketStorage,
135127
connector = connector,
@@ -143,7 +135,7 @@ internal class PowerSyncDatabaseImpl(
143135
}
144136

145137
@OptIn(FlowPreview::class)
146-
internal fun connect(
138+
internal fun connectInternal(
147139
stream: SyncStream,
148140
crudThrottleMs: Long,
149141
) {
@@ -154,8 +146,7 @@ internal class PowerSyncDatabaseImpl(
154146
syncJob =
155147
scope.launch {
156148
// Get a global lock for checking mutex maps
157-
val streamMutex =
158-
globalMutexFor("streaming-$identifier")
149+
val streamMutex = resource.group.syncMutex
159150

160151
// Poke the streaming mutex to see if another client is using it
161152
var obtainedLock = false
@@ -337,7 +328,9 @@ internal class PowerSyncDatabaseImpl(
337328
}
338329
}
339330

340-
override suspend fun disconnect() {
331+
override suspend fun disconnect() = mutex.withLock { disconnectInternal() }
332+
333+
private suspend fun disconnectInternal() {
341334
if (syncJob != null && syncJob!!.isActive) {
342335
syncJob?.cancelAndJoin()
343336
}
@@ -431,13 +424,13 @@ internal class PowerSyncDatabaseImpl(
431424
}
432425

433426
override suspend fun close() =
434-
exclusiveMethod("close") {
427+
mutex.withLock {
435428
if (closed) {
436-
return@exclusiveMethod
429+
return@withLock
437430
}
438-
disconnect()
431+
disconnectInternal()
439432
internalDb.close()
440-
instanceStore.removeInstance(this)
433+
resource.dispose()
441434
closed = true
442435
}
443436

0 commit comments

Comments
 (0)