Skip to content

Commit 9cb306f

Browse files
Watched Query Fixes (#26)
1 parent 7a32cfb commit 9cb306f

File tree

8 files changed

+95
-59
lines changed

8 files changed

+95
-59
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 1.0.0-Beta.8
4+
5+
* Improved watch query internals. Added the ability to throttle watched queries.
6+
* Added support for sync bucket priorities.
7+
38
## 1.0.0-Beta.7
49

510
* Fixed an issue where throwing exceptions in the query `mapper` could cause a runtime crash.

Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"originHash" : "25b8cd5d97789d7e497d6a5e0b04419a426018d83f0e80ab6817b213aa976748",
2+
"originHash" : "33297127250b66812faa920958a24bae46bf9e9d1c38ea6b84ca413efaf16afd",
33
"pins" : [
44
{
55
"identity" : "anycodable",
@@ -15,17 +15,17 @@
1515
"kind" : "remoteSourceControl",
1616
"location" : "https://github.com/powersync-ja/powersync-kotlin.git",
1717
"state" : {
18-
"revision" : "203db74889df8a20e3c6ac38aede6b0186d2e3b5",
19-
"version" : "1.0.0-BETA23.0"
18+
"revision" : "cb1a7d186144290b5ad706f5c2c9b67ff707356e",
19+
"version" : "1.0.0-BETA27.0"
2020
}
2121
},
2222
{
2323
"identity" : "powersync-sqlite-core-swift",
2424
"kind" : "remoteSourceControl",
2525
"location" : "https://github.com/powersync-ja/powersync-sqlite-core-swift.git",
2626
"state" : {
27-
"revision" : "5de629f7ddc649a1e89c64fde6113fe113fe14de",
28-
"version" : "0.3.9"
27+
"revision" : "fb313c473b17457d79bf3847905f5a288901d493",
28+
"version" : "0.3.11"
2929
}
3030
},
3131
{

Demo/PowerSyncExample/PowerSync/SystemManager.swift

+12-11
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,17 @@ class SystemManager {
3636
func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async {
3737
do {
3838
for try await lists in try self.db.watch<ListContent>(
39-
sql: "SELECT * FROM \(LISTS_TABLE)",
40-
parameters: [],
41-
mapper: { cursor in
42-
try ListContent(
43-
id: cursor.getString(name: "id"),
44-
name: cursor.getString(name: "name"),
45-
createdAt: cursor.getString(name: "created_at"),
46-
ownerId: cursor.getString(name: "owner_id")
47-
)
48-
}
39+
options: WatchOptions(
40+
sql: "SELECT * FROM \(LISTS_TABLE)",
41+
mapper: { cursor in
42+
try ListContent(
43+
id: cursor.getString(name: "id"),
44+
name: cursor.getString(name: "name"),
45+
createdAt: cursor.getString(name: "created_at"),
46+
ownerId: cursor.getString(name: "owner_id")
47+
)
48+
}
49+
)
4950
) {
5051
callback(lists)
5152
}
@@ -55,7 +56,7 @@ class SystemManager {
5556
}
5657

5758
func insertList(_ list: NewListContent) async throws {
58-
_ = try await self.db.execute(
59+
let result = try await self.db.execute(
5960
sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
6061
parameters: [list.name, connector.currentUserID]
6162
)

Package.resolved

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
"kind" : "remoteSourceControl",
66
"location" : "https://github.com/powersync-ja/powersync-kotlin.git",
77
"state" : {
8-
"revision" : "0541a4744088ea24084c47c158ab116db35f9345",
9-
"version" : "1.0.0-BETA26.0"
8+
"revision" : "cb1a7d186144290b5ad706f5c2c9b67ff707356e",
9+
"version" : "1.0.0-BETA27.0"
1010
}
1111
},
1212
{

Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ let package = Package(
1616
targets: ["PowerSync"]),
1717
],
1818
dependencies: [
19-
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA26.0"),
19+
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA27.0"),
2020
.package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.11"..<"0.4.0")
2121
],
2222
targets: [

Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift

+26-23
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
2525
func waitForFirstSync() async throws {
2626
try await kotlinDatabase.waitForFirstSync()
2727
}
28-
28+
2929
func waitForFirstSync(priority: Int32) async throws {
3030
try await kotlinDatabase.waitForFirstSync(priority: priority)
3131
}
@@ -165,38 +165,42 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
165165
parameters: [Any]?,
166166
mapper: @escaping (SqlCursor) -> RowType
167167
) throws -> AsyncThrowingStream<[RowType], Error> {
168-
AsyncThrowingStream { continuation in
169-
Task {
170-
do {
171-
for await values in try self.kotlinDatabase.watch(
172-
sql: sql,
173-
parameters: parameters,
174-
mapper: mapper
175-
) {
176-
try continuation.yield(safeCast(values, to: [RowType].self))
177-
}
178-
continuation.finish()
179-
} catch {
180-
continuation.finish(throwing: error)
181-
}
182-
}
183-
}
168+
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
184169
}
185170

186171
func watch<RowType>(
187172
sql: String,
188173
parameters: [Any]?,
189174
mapper: @escaping (SqlCursor) throws -> RowType
175+
) throws -> AsyncThrowingStream<[RowType], Error> {
176+
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
177+
}
178+
179+
func watch<RowType>(
180+
options: WatchOptions<RowType>
190181
) throws -> AsyncThrowingStream<[RowType], Error> {
191182
AsyncThrowingStream { continuation in
192183
Task {
193184
do {
194185
var mapperError: Error?
186+
// HACK!
187+
// SKIEE doesn't support custom exceptions in Flows
188+
// Exceptions which occur in the Flow itself cause runtime crashes.
189+
// The most probable crash would be the internal EXPLAIN statement.
190+
// This attempts to EXPLAIN the query before passing it to Kotlin
191+
// We could introduce an onChange API in Kotlin which we use to implement watches here.
192+
// This would prevent most issues with exceptions.
193+
_ = try await self.kotlinDatabase.getAll(
194+
sql: "EXPLAIN \(options.sql)",
195+
parameters: options.parameters,
196+
mapper: { _ in "" }
197+
)
195198
for try await values in try self.kotlinDatabase.watch(
196-
sql: sql,
197-
parameters: parameters,
199+
sql: options.sql,
200+
parameters: options.parameters,
201+
throttleMs: KotlinLong(value: options.throttleMs),
198202
mapper: { cursor in do {
199-
return try mapper(cursor)
203+
return try options.mapper(cursor)
200204
} catch {
201205
mapperError = error
202206
// The value here does not matter. We will throw the exception later
@@ -217,12 +221,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
217221
}
218222
}
219223

220-
public func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
224+
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
221225
return try safeCast(await kotlinDatabase.writeTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
222226
}
223227

224-
public func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
228+
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
225229
return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
226230
}
227231
}
228-

Sources/PowerSync/QueriesProtocol.swift

+27-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,23 @@
1-
import Foundation
21
import Combine
2+
import Foundation
33
import PowerSyncKotlin
44

5+
public let DEFAULT_WATCH_THROTTLE_MS = Int64(30)
6+
7+
public struct WatchOptions<RowType> {
8+
public var sql: String
9+
public var parameters: [Any]
10+
public var throttleMs: Int64
11+
public var mapper: (SqlCursor) throws -> RowType
12+
13+
public init(sql: String, parameters: [Any]? = [], throttleMs: Int64? = DEFAULT_WATCH_THROTTLE_MS, mapper: @escaping (SqlCursor) throws -> RowType) {
14+
self.sql = sql
15+
self.parameters = parameters ?? [] // Default to empty array if nil
16+
self.throttleMs = throttleMs ?? DEFAULT_WATCH_THROTTLE_MS // Default to the constant if nil
17+
self.mapper = mapper
18+
}
19+
}
20+
521
public protocol Queries {
622
/// Execute a write query (INSERT, UPDATE, DELETE)
723
/// Using `RETURNING *` will result in an error.
@@ -69,40 +85,44 @@ public protocol Queries {
6985
mapper: @escaping (SqlCursor) throws -> RowType
7086
) throws -> AsyncThrowingStream<[RowType], Error>
7187

88+
func watch<RowType>(
89+
options: WatchOptions<RowType>
90+
) throws -> AsyncThrowingStream<[RowType], Error>
91+
7292
/// Execute a write transaction with the given callback
7393
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R
7494

7595
/// Execute a read transaction with the given callback
7696
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R
7797
}
7898

79-
extension Queries {
80-
public func execute(_ sql: String) async throws -> Int64 {
99+
public extension Queries {
100+
func execute(_ sql: String) async throws -> Int64 {
81101
return try await execute(sql: sql, parameters: [])
82102
}
83103

84-
public func get<RowType>(
104+
func get<RowType>(
85105
_ sql: String,
86106
mapper: @escaping (SqlCursor) -> RowType
87107
) async throws -> RowType {
88108
return try await get(sql: sql, parameters: [], mapper: mapper)
89109
}
90110

91-
public func getAll<RowType>(
111+
func getAll<RowType>(
92112
_ sql: String,
93113
mapper: @escaping (SqlCursor) -> RowType
94114
) async throws -> [RowType] {
95115
return try await getAll(sql: sql, parameters: [], mapper: mapper)
96116
}
97117

98-
public func getOptional<RowType>(
118+
func getOptional<RowType>(
99119
_ sql: String,
100120
mapper: @escaping (SqlCursor) -> RowType
101121
) async throws -> RowType? {
102122
return try await getOptional(sql: sql, parameters: [], mapper: mapper)
103123
}
104124

105-
public func watch<RowType>(
125+
func watch<RowType>(
106126
_ sql: String,
107127
mapper: @escaping (SqlCursor) -> RowType
108128
) throws -> AsyncThrowingStream<[RowType], Error> {

Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift

+17-10
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,13 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
198198

199199
// Create an actor to handle concurrent mutations
200200
actor ResultsStore {
201-
private var results: [[String]] = []
201+
private var results: Set<String> = []
202202

203203
func append(_ names: [String]) {
204-
results.append(names)
204+
results.formUnion(names)
205205
}
206206

207-
func getResults() -> [[String]] {
207+
func getResults() -> Set<String> {
208208
results
209209
}
210210

@@ -213,14 +213,16 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
213213
}
214214
}
215215

216+
216217
let resultsStore = ResultsStore()
217218

218219
let stream = try database.watch(
219-
sql: "SELECT name FROM users ORDER BY id",
220-
parameters: nil
221-
) { cursor in
222-
cursor.getString(index: 0)!
223-
}
220+
options: WatchOptions(
221+
sql: "SELECT name FROM users ORDER BY id",
222+
mapper: { cursor in
223+
cursor.getString(index: 0)!
224+
}
225+
))
224226

225227
let watchTask = Task {
226228
for try await names in stream {
@@ -240,13 +242,18 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
240242
sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
241243
parameters: ["2", "User 2", "[email protected]"]
242244
)
245+
243246

244247
await fulfillment(of: [expectation], timeout: 5)
245248
watchTask.cancel()
246249

247250
let finalResults = await resultsStore.getResults()
248-
XCTAssertEqual(finalResults.count, 2)
249-
XCTAssertEqual(finalResults[1], ["User 1", "User 2"])
251+
// The count of invocations here can vary a lot depending on the order of execution
252+
// In some cases the creation of the users can fire before the initial watched query
253+
// has emitted a result.
254+
// However the watched query should always emit the latest result set.
255+
XCTAssertLessThanOrEqual(finalResults.count, 3)
256+
XCTAssertEqual(finalResults, ["User 1", "User 2"])
250257
}
251258

252259
func testWatchError() async throws {

0 commit comments

Comments
 (0)