Skip to content

Feat: Expose worker connection #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.10.0

- Add the `exposeEndpoint()` method available on web databases. It returns a serializable
description of the database endpoint that can be sent across workers.
This allows sharing an opened database connection across workers.

## 0.9.1

- Support version ^0.2.0 of package:sqlite3_web
Expand Down
21 changes: 20 additions & 1 deletion packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import 'package:sqlite3/common.dart';
import 'package:sqlite3_web/sqlite3_web.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';
import 'package:sqlite_async/web.dart';
import 'protocol.dart';
import 'web_mutex.dart';

class WebDatabase
with SqliteQueries, SqliteDatabaseMixin
implements SqliteDatabase {
implements SqliteDatabase, WebSqliteConnection {
final Database _database;
final Mutex? _mutex;

Expand All @@ -24,6 +26,9 @@ class WebDatabase
closed = true;
}

@override
Future<void> get closedFuture => _database.closed;

@override
Future<bool> getAutoCommit() async {
final response = await _database.customRequest(
Expand Down Expand Up @@ -56,6 +61,20 @@ class WebDatabase
/// Not relevant for web.
Never get openFactory => throw UnimplementedError();

@override
Future<WebDatabaseEndpoint> exposeEndpoint() async {
final endpoint = await _database.additionalConnection();

return (
connectPort: endpoint.$1,
connectName: endpoint.$2,
lockName: switch (_mutex) {
MutexImpl(:final resolvedIdentifier) => resolvedIdentifier,
_ => null,
},
);
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import 'package:sqlite_async/src/sqlite_options.dart';
import 'package:sqlite_async/src/update_notification.dart';
import 'package:sqlite_async/src/web/web_mutex.dart';
import 'package:sqlite_async/src/web/web_sqlite_open_factory.dart';
import 'package:sqlite_async/web.dart';

import '../database.dart';

/// Web implementation of [SqliteDatabase]
/// Uses a web worker for SQLite connection
class SqliteDatabaseImpl
with SqliteQueries, SqliteDatabaseMixin
implements SqliteDatabase {
implements SqliteDatabase, WebSqliteConnection {
@override
bool get closed {
return _connection.closed;
}

@override
Future<void> get closedFuture => _connection.closedFuture;

final StreamController<UpdateNotification> updatesController =
StreamController.broadcast();

Expand All @@ -38,7 +44,7 @@ class SqliteDatabaseImpl
AbstractDefaultSqliteOpenFactory openFactory;

late final Mutex mutex;
late final SqliteConnection _connection;
late final WebDatabase _connection;

/// Open a SqliteDatabase.
///
Expand Down Expand Up @@ -78,8 +84,8 @@ class SqliteDatabaseImpl

Future<void> _init() async {
_connection = await openFactory.openConnection(SqliteOpenOptions(
primaryConnection: true, readOnly: false, mutex: mutex));
_connection.updates!.forEach((update) {
primaryConnection: true, readOnly: false, mutex: mutex)) as WebDatabase;
_connection.updates.forEach((update) {
updatesController.add(update);
});
}
Expand Down Expand Up @@ -139,4 +145,9 @@ class SqliteDatabaseImpl
await isInitialized;
return _connection.getAutoCommit();
}

@override
Future<WebDatabaseEndpoint> exposeEndpoint() async {
return await _connection.exposeEndpoint();
}
}
6 changes: 3 additions & 3 deletions packages/sqlite_async/lib/src/web/web_mutex.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ external Navigator get _navigator;
class MutexImpl implements Mutex {
late final mutex.Mutex fallback;
String? identifier;
final String _resolvedIdentifier;
final String resolvedIdentifier;

MutexImpl({this.identifier})

Expand All @@ -29,7 +29,7 @@ class MutexImpl implements Mutex {
/// - The uuid package could be added for better uniqueness if required.
/// This would add another package dependency to `sqlite_async` which is potentially unnecessary at this point.
/// An identifier should be supplied for better exclusion.
: _resolvedIdentifier = identifier ??
: resolvedIdentifier = identifier ??
"${DateTime.now().microsecondsSinceEpoch}-${Random().nextDouble()}" {
fallback = mutex.Mutex();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ class MutexImpl implements Mutex {
final lockOptions = JSObject();
lockOptions['signal'] = controller.signal;
final promise = _navigator.locks
.request(_resolvedIdentifier, lockOptions, jsCallback.toJS);
.request(resolvedIdentifier, lockOptions, jsCallback.toJS);
// A timeout abort will throw an exception which needs to be handled.
// There should not be any other unhandled lock errors.
js_util.promiseToFuture(promise).catchError((error) {});
Expand Down
68 changes: 68 additions & 0 deletions packages/sqlite_async/lib/web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/// Exposes interfaces implemented by database implementations on the web.
///
/// These expose methods allowing database instances to be shared across web
/// workers.
library sqlite_async.web;

import 'package:sqlite3_web/sqlite3_web.dart';
import 'package:web/web.dart';
import 'sqlite_async.dart';
import 'src/web/database.dart';

/// An endpoint that can be used, by any running JavaScript context in the same
/// website, to connect to an existing [WebSqliteConnection].
///
/// These endpoints are created by calling [WebSqliteConnection.exposeEndpoint]
/// and consist of a [MessagePort] and two [String]s internally identifying the
/// connection. Both objects can be transferred over send ports towards another
/// worker or context. That context can then use
/// [WebSqliteConnection.connectToEndpoint] to connect to the port already
/// opened.
typedef WebDatabaseEndpoint = ({
MessagePort connectPort,
String connectName,
String? lockName,
});

/// A [SqliteConnection] interface implemented by opened connections when
/// running on the web.
///
/// This adds the [exposeEndpoint], which uses `dart:js_interop` types not
/// supported on native Dart platforms. The method can be used to access an
/// opened database across different JavaScript contexts
/// (e.g. document windows and workers).
abstract class WebSqliteConnection implements SqliteConnection {
/// Returns a future that completes when this connection is closed.
///
/// This usually only happens when calling [close], but on the web
/// specifically, it can also happen when a remote context closes a database
/// accessed via [connectToEndpoint].
Future<void> get closedFuture;

/// Returns a [WebDatabaseEndpoint] - a structure that consists only of types
/// that can be transferred across a [MessagePort] in JavaScript.
///
/// After transferring this endpoint to another JavaScript context (e.g. a
/// worker), the worker can call [connectToEndpoint] to obtain a connection to
/// the same sqlite database.
Future<WebDatabaseEndpoint> exposeEndpoint();

/// Connect to an endpoint obtained through [exposeEndpoint].
///
/// The endpoint is transferrable in JavaScript, allowing multiple JavaScript
/// contexts to exchange opened database connections.
static Future<WebSqliteConnection> connectToEndpoint(
WebDatabaseEndpoint endpoint) async {
final rawSqlite = await WebSqlite.connectToPort(
(endpoint.connectPort, endpoint.connectName));

final database = WebDatabase(
rawSqlite,
switch (endpoint.lockName) {
var lock? => Mutex(identifier: lock),
null => null,
},
);
return database;
}
}
2 changes: 1 addition & 1 deletion packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: sqlite_async
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
version: 0.9.1
version: 0.10.0
repository: https://github.com/powersync-ja/sqlite_async.dart
environment:
sdk: ">=3.4.0 <4.0.0"
Expand Down
Loading