Skip to content

Support being compiled with build_web_compilers #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ jobs:
run: |
export LD_LIBRARY_PATH=$(pwd)/sqlite-autoconf-${{ matrix.sqlite_version }}/.libs
melos test
melos test_build
8 changes: 8 additions & 0 deletions melos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ scripts:
# as they could change the behaviour of how tests filter packages.
env:
MELOS_TEST: true

test_build:
description: Runs tests with build_test
run: dart run build_runner test -- -p chrome
exec:
concurrency: 1
packageFilters:
dependsOn: build_test
4 changes: 4 additions & 0 deletions packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.11.3

- Support being compiled with `package:build_web_compilers`.

## 0.11.2

- Support latest version of `package:sqlite3_web`.
Expand Down
354 changes: 2 additions & 352 deletions packages/sqlite_async/lib/src/common/port_channel.dart
Original file line number Diff line number Diff line change
@@ -1,352 +1,2 @@
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';

abstract class PortClient {
Future<T> post<T>(Object message);
void fire(Object message);

factory PortClient.parent() {
return ParentPortClient();
}

factory PortClient.child(SendPort upstream) {
return ChildPortClient(upstream);
}
}

class ParentPortClient implements PortClient {
late Future<SendPort> sendPortFuture;
SendPort? sendPort;
final ReceivePort _receivePort = ReceivePort();
final ReceivePort _errorPort = ReceivePort();
bool closed = false;
Object? _closeError;
String? _isolateDebugName;
int _nextId = 1;

Map<int, Completer<Object?>> handlers = HashMap();

ParentPortClient() {
final initCompleter = Completer<SendPort>.sync();
sendPortFuture = initCompleter.future;
sendPortFuture.then((value) {
sendPort = value;
});
_receivePort.listen((message) {
if (message is _InitMessage) {
assert(!initCompleter.isCompleted);
initCompleter.complete(message.port);
} else if (message is _PortChannelResult) {
final handler = handlers.remove(message.requestId);
assert(handler != null);
if (message.success) {
handler!.complete(message.result);
} else {
handler!.completeError(message.error, message.stackTrace);
}
} else if (message == _closeMessage) {
close();
}
}, onError: (e) {
if (!initCompleter.isCompleted) {
initCompleter.completeError(e);
}

close();
}, onDone: () {
if (!initCompleter.isCompleted) {
initCompleter.completeError(ClosedException());
}
close();
});
_errorPort.listen((message) {
final [error, stackTraceString] = message;
final stackTrace = stackTraceString == null
? null
: StackTrace.fromString(stackTraceString);
if (!initCompleter.isCompleted) {
initCompleter.completeError(error, stackTrace);
}
_close(IsolateError(cause: error, isolateDebugName: _isolateDebugName),
stackTrace);
});
}

Future<void> get ready async {
await sendPortFuture;
}

void _cancelAll(Object error, [StackTrace? stackTrace]) {
var handlers = this.handlers;
this.handlers = {};
for (var message in handlers.values) {
message.completeError(error, stackTrace);
}
}

@override
Future<T> post<T>(Object message) async {
if (closed) {
throw _closeError ?? const ClosedException();
}
var completer = Completer<T>.sync();
var id = _nextId++;
handlers[id] = completer;
final port = sendPort ?? await sendPortFuture;
port.send(_RequestMessage(id, message, null));
return await completer.future;
}

@override
void fire(Object message) async {
if (closed) {
throw _closeError ?? ClosedException();
}
final port = sendPort ?? await sendPortFuture;
port.send(_FireMessage(message));
}

RequestPortServer server() {
return RequestPortServer(_receivePort.sendPort);
}

void _close([Object? error, StackTrace? stackTrace]) {
if (!closed) {
closed = true;

_receivePort.close();
_errorPort.close();
if (error == null) {
_cancelAll(const ClosedException());
} else {
_closeError = error;
_cancelAll(error, stackTrace);
}
}
}

void close() {
_close();
}

tieToIsolate(Isolate isolate) {
_isolateDebugName = isolate.debugName;
isolate.addErrorListener(_errorPort.sendPort);
isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage);
}
}

class SerializedPortClient {
final SendPort sendPort;

SerializedPortClient(this.sendPort);

ChildPortClient open() {
return ChildPortClient(sendPort);
}
}

class ChildPortClient implements PortClient {
final SendPort sendPort;
final ReceivePort receivePort = ReceivePort();
int _nextId = 1;
bool closed = false;

final Map<int, Completer<Object?>> handlers = HashMap();

ChildPortClient(this.sendPort) {
receivePort.listen((message) {
if (message is _PortChannelResult) {
final handler = handlers.remove(message.requestId);
assert(handler != null);
if (message.success) {
handler!.complete(message.result);
} else {
handler!.completeError(message.error, message.stackTrace);
}
}
});
}

@override
Future<T> post<T>(Object message) async {
if (closed) {
throw const ClosedException();
}
var completer = Completer<T>.sync();
var id = _nextId++;
handlers[id] = completer;
sendPort.send(_RequestMessage(id, message, receivePort.sendPort));
return await completer.future;
}

@override
void fire(Object message) {
if (closed) {
throw ClosedException();
}
sendPort.send(_FireMessage(message));
}

void _cancelAll(Object error) {
var handlers = HashMap<int, Completer<Object?>>.from(this.handlers);
this.handlers.clear();
for (var message in handlers.values) {
message.completeError(error);
}
}

void close() {
closed = true;
_cancelAll(const ClosedException());
receivePort.close();
}
}

class RequestPortServer {
final SendPort port;

RequestPortServer(this.port);

open(Future<Object?> Function(Object? message) handle) {
return PortServer.forSendPort(port, handle);
}
}

class PortServer {
final ReceivePort _receivePort = ReceivePort();
final Future<Object?> Function(Object? message) handle;
final SendPort? replyPort;

PortServer(this.handle) : replyPort = null {
_init();
}

PortServer.forSendPort(SendPort port, this.handle) : replyPort = port {
port.send(_InitMessage(_receivePort.sendPort));
_init();
}

SendPort get sendPort {
return _receivePort.sendPort;
}

SerializedPortClient client() {
return SerializedPortClient(sendPort);
}

void close() {
_receivePort.close();
}

void _init() {
_receivePort.listen((request) async {
if (request is _FireMessage) {
handle(request.message);
} else if (request is _RequestMessage) {
if (request.id == 0) {
// Fire and forget
handle(request.message);
} else {
final replyPort = request.reply ?? this.replyPort;
try {
var result = await handle(request.message);
replyPort!.send(_PortChannelResult.success(request.id, result));
} catch (e, stacktrace) {
replyPort!
.send(_PortChannelResult.error(request.id, e, stacktrace));
}
}
}
});
}
}

const _closeMessage = '_Close';

class _InitMessage {
final SendPort port;

_InitMessage(this.port);
}

class _FireMessage {
final Object message;

const _FireMessage(this.message);
}

class _RequestMessage {
final int id;
final Object message;
final SendPort? reply;

_RequestMessage(this.id, this.message, this.reply);
}

class ClosedException implements Exception {
const ClosedException();

@override
String toString() {
return 'ClosedException';
}
}

class IsolateError extends Error {
final Object cause;
final String? isolateDebugName;

IsolateError({required this.cause, this.isolateDebugName});

@override
String toString() {
if (isolateDebugName != null) {
return 'IsolateError in $isolateDebugName: $cause';
} else {
return 'IsolateError: $cause';
}
}
}

class _PortChannelResult<T> {
final int requestId;
final bool success;
final T? _result;
final Object? _error;
final StackTrace? stackTrace;

const _PortChannelResult.success(this.requestId, T result)
: success = true,
_error = null,
stackTrace = null,
_result = result;
const _PortChannelResult.error(this.requestId, Object error,
[this.stackTrace])
: success = false,
_result = null,
_error = error;

T get value {
if (success) {
return _result as T;
} else {
if (_error != null && stackTrace != null) {
Error.throwWithStackTrace(_error, stackTrace!);
} else {
throw _error!;
}
}
}

T get result {
assert(success);
return _result as T;
}

Object get error {
assert(!success);
return _error!;
}
}
export 'port_channel_native.dart'
if (dart.library.js_interop) 'port_channel_stub.dart';
Loading