Skip to content

Commit 74e904e

Browse files
Multiple Client Connection Locks (#146)
1 parent bca578f commit 74e904e

File tree

11 files changed

+472
-67
lines changed

11 files changed

+472
-67
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.0.0-BETA29
4+
5+
* Added queing protection and warnings when connecting multiple PowerSync clients to the same database file.
6+
37
## 1.0.0-BETA28
48

59
* Update PowerSync SQLite core extension to 0.3.12.

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

+48-6
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,53 @@
11
package com.powersync
22

33
import app.cash.turbine.turbineScope
4+
import co.touchlab.kermit.ExperimentalKermitApi
5+
import co.touchlab.kermit.Logger
6+
import co.touchlab.kermit.Severity
7+
import co.touchlab.kermit.TestConfig
8+
import co.touchlab.kermit.TestLogWriter
9+
import com.powersync.db.PowerSyncDatabaseImpl
410
import com.powersync.db.schema.Schema
511
import com.powersync.testutils.UserRow
12+
import com.powersync.testutils.waitFor
613
import kotlinx.coroutines.runBlocking
714
import kotlinx.coroutines.test.runTest
815
import kotlin.test.AfterTest
916
import kotlin.test.BeforeTest
1017
import kotlin.test.Test
1118
import kotlin.test.assertEquals
19+
import kotlin.test.assertNotNull
1220

21+
@OptIn(ExperimentalKermitApi::class)
1322
class DatabaseTest {
23+
private val logWriter =
24+
TestLogWriter(
25+
loggable = Severity.Debug,
26+
)
27+
28+
private val logger =
29+
Logger(
30+
TestConfig(
31+
minSeverity = Severity.Debug,
32+
logWriterList = listOf(logWriter),
33+
),
34+
)
35+
1436
private lateinit var database: PowerSyncDatabase
1537

38+
private fun openDB() =
39+
PowerSyncDatabase(
40+
factory = com.powersync.testutils.factory,
41+
schema = Schema(UserRow.table),
42+
dbFilename = "testdb",
43+
logger = logger,
44+
)
45+
1646
@BeforeTest
1747
fun setupDatabase() {
18-
database =
19-
PowerSyncDatabase(
20-
factory = com.powersync.testutils.factory,
21-
schema = Schema(UserRow.table),
22-
dbFilename = "testdb",
23-
)
48+
logWriter.reset()
49+
50+
database = openDB()
2451

2552
runBlocking {
2653
database.disconnectAndClear(true)
@@ -86,4 +113,19 @@ class DatabaseTest {
86113
query.cancel()
87114
}
88115
}
116+
117+
@Test
118+
fun warnsMultipleInstances() =
119+
runTest {
120+
// Opens a second DB with the same database filename
121+
val db2 = openDB()
122+
waitFor {
123+
assertNotNull(
124+
logWriter.logs.find {
125+
it.message == PowerSyncDatabaseImpl.multipleInstancesMessage
126+
},
127+
)
128+
}
129+
db2.close()
130+
}
89131
}

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

+153-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.powersync
22

33
import app.cash.turbine.turbineScope
4+
import co.touchlab.kermit.ExperimentalKermitApi
45
import co.touchlab.kermit.Logger
56
import co.touchlab.kermit.Severity
67
import co.touchlab.kermit.TestConfig
8+
import co.touchlab.kermit.TestLogWriter
79
import com.powersync.bucket.BucketChecksum
810
import com.powersync.bucket.BucketPriority
911
import com.powersync.bucket.Checkpoint
@@ -18,6 +20,7 @@ import com.powersync.sync.SyncStream
1820
import com.powersync.testutils.MockSyncService
1921
import com.powersync.testutils.UserRow
2022
import com.powersync.testutils.cleanup
23+
import com.powersync.testutils.factory
2124
import com.powersync.testutils.waitFor
2225
import com.powersync.utils.JsonUtil
2326
import dev.mokkery.answering.returns
@@ -35,16 +38,22 @@ import kotlin.test.BeforeTest
3538
import kotlin.test.Test
3639
import kotlin.test.assertEquals
3740
import kotlin.test.assertFalse
41+
import kotlin.test.assertNotNull
3842
import kotlin.test.assertTrue
3943
import kotlin.time.Duration.Companion.seconds
4044

41-
@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class)
45+
@OptIn(ExperimentalKermitApi::class)
4246
class SyncIntegrationTest {
47+
private val logWriter =
48+
TestLogWriter(
49+
loggable = Severity.Debug,
50+
)
51+
4352
private val logger =
4453
Logger(
4554
TestConfig(
4655
minSeverity = Severity.Debug,
47-
logWriterList = listOf(),
56+
logWriterList = listOf(logWriter),
4857
),
4958
)
5059
private lateinit var database: PowerSyncDatabaseImpl
@@ -54,6 +63,7 @@ class SyncIntegrationTest {
5463
@BeforeTest
5564
fun setup() {
5665
cleanup("testdb")
66+
logWriter.reset()
5767
database = openDb()
5868
connector =
5969
mock<PowerSyncBackendConnector> {
@@ -75,12 +85,15 @@ class SyncIntegrationTest {
7585

7686
@AfterTest
7787
fun teardown() {
88+
runBlocking {
89+
database.close()
90+
}
7891
cleanup("testdb")
7992
}
8093

8194
private fun openDb() =
8295
PowerSyncDatabase(
83-
factory = com.powersync.testutils.factory,
96+
factory = factory,
8497
schema = Schema(UserRow.table),
8598
dbFilename = "testdb",
8699
) as PowerSyncDatabaseImpl
@@ -271,6 +284,26 @@ class SyncIntegrationTest {
271284
syncLines.close()
272285
}
273286

287+
@Test
288+
fun setsConnectingState() =
289+
runTest {
290+
turbineScope(timeout = 10.0.seconds) {
291+
val syncStream = syncStream()
292+
val turbine = database.currentStatus.asFlow().testIn(this)
293+
294+
database.connect(syncStream, 1000L)
295+
turbine.waitFor { it.connecting }
296+
297+
database.disconnect()
298+
299+
turbine.waitFor { !it.connecting && !it.connected }
300+
turbine.cancel()
301+
}
302+
303+
database.close()
304+
syncLines.close()
305+
}
306+
274307
@Test
275308
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
276309
runTest {
@@ -312,6 +345,123 @@ class SyncIntegrationTest {
312345
turbine.cancel()
313346
}
314347

348+
database.close()
349+
syncLines.close()
350+
}
351+
352+
@Test
353+
fun warnsMultipleConnectionAttempts() =
354+
runTest {
355+
val db2 =
356+
PowerSyncDatabase(
357+
factory = factory,
358+
schema = Schema(UserRow.table),
359+
dbFilename = "testdb",
360+
logger = logger,
361+
) as PowerSyncDatabaseImpl
362+
363+
turbineScope(timeout = 10.0.seconds) {
364+
// Connect the first database
365+
database.connect(connector, 1000L)
366+
db2.connect(connector)
367+
368+
waitFor {
369+
assertNotNull(
370+
logWriter.logs.find {
371+
it.message == PowerSyncDatabaseImpl.streamConflictMessage
372+
},
373+
)
374+
}
375+
376+
db2.disconnect()
377+
database.disconnect()
378+
}
379+
380+
db2.close()
381+
database.close()
382+
syncLines.close()
383+
}
384+
385+
@Test
386+
fun queuesMultipleConnectionAttempts() =
387+
runTest {
388+
val db2 =
389+
PowerSyncDatabase(
390+
factory = factory,
391+
schema = Schema(UserRow.table),
392+
dbFilename = "testdb",
393+
logger = Logger,
394+
) as PowerSyncDatabaseImpl
395+
396+
turbineScope(timeout = 10.0.seconds) {
397+
val turbine1 = database.currentStatus.asFlow().testIn(this)
398+
val turbine2 = db2.currentStatus.asFlow().testIn(this)
399+
400+
// Connect the first database
401+
database.connect(connector, 1000L)
402+
403+
turbine1.waitFor { it.connecting }
404+
db2.connect(connector)
405+
406+
// Should not be connecting yet
407+
assertEquals(false, db2.currentStatus.connecting)
408+
409+
database.disconnect()
410+
turbine1.waitFor { !it.connecting }
411+
412+
// Should start connecting after the other database disconnected
413+
turbine2.waitFor { it.connecting }
414+
db2.disconnect()
415+
turbine2.waitFor { !it.connecting }
416+
417+
turbine1.cancel()
418+
turbine2.cancel()
419+
}
420+
421+
db2.close()
422+
database.close()
423+
syncLines.close()
424+
}
425+
426+
@Test
427+
fun reconnectsAfterDisconnecting() =
428+
runTest {
429+
turbineScope(timeout = 10.0.seconds) {
430+
val turbine = database.currentStatus.asFlow().testIn(this)
431+
432+
database.connect(connector, 1000L)
433+
turbine.waitFor { it.connecting }
434+
435+
database.disconnect()
436+
turbine.waitFor { !it.connecting }
437+
438+
database.connect(connector, 1000L)
439+
turbine.waitFor { it.connecting }
440+
database.disconnect()
441+
turbine.waitFor { !it.connecting }
442+
443+
turbine.cancel()
444+
}
445+
446+
database.close()
447+
syncLines.close()
448+
}
449+
450+
@Test
451+
fun reconnects() =
452+
runTest {
453+
turbineScope(timeout = 10.0.seconds) {
454+
val turbine = database.currentStatus.asFlow().testIn(this)
455+
456+
database.connect(connector, 1000L, retryDelayMs = 5000)
457+
turbine.waitFor { it.connecting }
458+
459+
database.connect(connector, 1000L, retryDelayMs = 5000)
460+
turbine.waitFor { it.connecting }
461+
462+
turbine.cancel()
463+
}
464+
315465
database.close()
316466
syncLines.close()
317467
}

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ import kotlin.coroutines.cancellation.CancellationException
1919
* All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded.
2020
*/
2121
public interface PowerSyncDatabase : Queries {
22+
/**
23+
* Identifies the database client.
24+
* This is typically the database name.
25+
*/
26+
public val identifier: String
27+
2228
/**
2329
* The current sync status.
2430
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.powersync.db
2+
3+
import com.powersync.PowerSyncDatabase
4+
import com.powersync.utils.ExclusiveMethodProvider
5+
6+
internal class ActiveInstanceStore : ExclusiveMethodProvider() {
7+
private val instances = mutableListOf<PowerSyncDatabase>()
8+
9+
/**
10+
* Registers an instance. Returns true if multiple instances with the same identifier are
11+
* present.
12+
*/
13+
suspend fun registerAndCheckInstance(db: PowerSyncDatabase) =
14+
exclusiveMethod("instances") {
15+
instances.add(db)
16+
return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1
17+
}
18+
19+
suspend fun removeInstance(db: PowerSyncDatabase) =
20+
exclusiveMethod("instances") {
21+
instances.remove(db)
22+
}
23+
}

0 commit comments

Comments
 (0)