Skip to content

fix: transactions resulting in error when using different threads #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let package = Package(
targets: ["PowerSyncSwift"]),
],
dependencies: [
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA6.0"),
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA9.0"),
.package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.1"..<"0.4.0"),
],
targets: [
Expand Down
79 changes: 32 additions & 47 deletions Sources/PowerSyncSwift/Kotlin/KotlinPowerSyncDatabaseImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import PowerSync

final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
private let kotlinDatabase: PowerSync.PowerSyncDatabase

var currentStatus: SyncStatus {
get { kotlinDatabase.currentStatus }
}

init(
schema: Schema,
dbFilename: String
Expand All @@ -19,55 +19,55 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
dbFilename: dbFilename
)
}

init(kotlinDatabase: KotlinPowerSyncDatabase) {
self.kotlinDatabase = kotlinDatabase
}

func waitForFirstSync() async throws {
try await kotlinDatabase.waitForFirstSync()
}

func connect(
connector: PowerSyncBackendConnector,
crudThrottleMs: Int64 = 1000,
retryDelayMs: Int64 = 5000,
params: [String: JsonParam?] = [:]
) async throws {
let connectorProxy = PowerSyncBackendConnectorAdapter(swiftBackendConnector: connector)
let connectorAdapter = PowerSyncBackendConnectorAdapter(swiftBackendConnector: connector)

try await kotlinDatabase.connect(
connector: connectorProxy,
connector: connectorAdapter,
crudThrottleMs: crudThrottleMs,
retryDelayMs: retryDelayMs,
params: params
)
}

func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? {
try await kotlinDatabase.getCrudBatch(limit: limit)
}

func getNextCrudTransaction() async throws -> CrudTransaction? {
try await kotlinDatabase.getNextCrudTransaction()
}

func getPowerSyncVersion() async throws -> String {
try await kotlinDatabase.getPowerSyncVersion()
}

func disconnect() async throws {
try await kotlinDatabase.disconnect()
}

func disconnectAndClear(clearLocal: Bool = true) async throws {
try await kotlinDatabase.disconnectAndClear(clearLocal: clearLocal)
}

func execute(sql: String, parameters: [Any]?) async throws -> Int64 {
Int64(truncating: try await kotlinDatabase.execute(sql: sql, parameters: parameters))
}

func get<RowType>(
sql: String,
parameters: [Any]?,
Expand All @@ -79,7 +79,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
mapper: mapper
) as! RowType
}

func getAll<RowType>(
sql: String,
parameters: [Any]?,
Expand All @@ -91,7 +91,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
mapper: mapper
) as! [RowType]
}

func getOptional<RowType>(
sql: String,
parameters: [Any]?,
Expand All @@ -103,7 +103,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
mapper: mapper
) as! RowType?
}

func watch<RowType>(
sql: String,
parameters: [Any]?,
Expand All @@ -122,31 +122,17 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
}
}
}

func writeTransaction<R>(callback: @escaping (any PowerSyncTransactionProtocol) async throws -> R) async throws -> R {
let wrappedCallback = SuspendTaskWrapper { [kotlinDatabase] in
// Create a wrapper that converts the KMP transaction to our Swift protocol
if let kmpTransaction = kotlinDatabase as? PowerSyncTransactionProtocol {
return try await callback(kmpTransaction)
} else {
throw PowerSyncError.invalidTransaction
}
}

return try await kotlinDatabase.writeTransaction(callback: wrappedCallback) as! R
}

func readTransaction<R>(callback: @escaping (any PowerSyncTransactionProtocol) async throws -> R) async throws -> R {
let wrappedCallback = SuspendTaskWrapper { [kotlinDatabase] in
// Create a wrapper that converts the KMP transaction to our Swift protocol
if let kmpTransaction = kotlinDatabase as? PowerSyncTransactionProtocol {
return try await callback(kmpTransaction)
} else {
throw PowerSyncError.invalidTransaction
}
}

return try await kotlinDatabase.readTransaction(callback: wrappedCallback) as! R

public func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) async throws -> R) async throws -> R {
return try await kotlinDatabase.writeTransaction(callback: SuspendTaskWrapper { transaction in
return try await callback(transaction)
}) as! R
}

public func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) async throws -> R) async throws -> R {
return try await kotlinDatabase.writeTransaction(callback: SuspendTaskWrapper { transaction in
return try await callback(transaction)
}) as! R
}
}

Expand All @@ -155,17 +141,16 @@ enum PowerSyncError: Error {
}

class SuspendTaskWrapper: KotlinSuspendFunction1 {
let handle: () async throws -> Any
let handle: (any PowerSyncTransaction) async throws -> Any

init(_ handle: @escaping () async throws -> Any) {
init(_ handle: @escaping (any PowerSyncTransaction) async throws -> Any) {
self.handle = handle
}

@MainActor
func __invoke(p1: Any?, completionHandler: @escaping (Any?, Error?) -> Void) {
Task {
do {
let result = try await self.handle()
let result = try await self.handle(p1 as! any PowerSyncTransaction)
completionHandler(result, nil)
} catch {
completionHandler(nil, error)
Expand Down
29 changes: 0 additions & 29 deletions Sources/PowerSyncSwift/PowerSyncTransactionProtocol.swift

This file was deleted.

41 changes: 38 additions & 3 deletions Sources/PowerSyncSwift/QueriesProtocol.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import Combine
import PowerSync

public protocol Queries {
/// Execute a write query (INSERT, UPDATE, DELETE)
Expand Down Expand Up @@ -37,8 +38,42 @@ public protocol Queries {
) -> AsyncStream<[RowType]>

/// Execute a write transaction with the given callback
func writeTransaction<R>(callback: @escaping (PowerSyncTransactionProtocol) async throws -> R) async throws -> R
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) async throws -> R) async throws -> R

/// Execute a read transaction with the given callback
func readTransaction<R>(callback: @escaping (PowerSyncTransactionProtocol) async throws -> R) async throws -> R
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) async throws -> R) async throws -> R
}

extension Queries {
public func execute(_ sql: String) async throws -> Int64 {
return try await execute(sql: sql, parameters: [])
}

public func get<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> RowType {
return try await get(sql: sql, parameters: [], mapper: mapper)
}

public func getAll<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> [RowType] {
return try await getAll(sql: sql, parameters: [], mapper: mapper)
}

public func getOptional<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> RowType? {
return try await getOptional(sql: sql, parameters: [], mapper: mapper)
}

public func watch<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) -> AsyncStream<[RowType]> {
return watch(sql: sql, parameters: [], mapper: mapper)
}
}
Loading