Skip to content

Fix errors when closing #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
19 changes: 15 additions & 4 deletions lib/src/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
@override
Future<bool> getAutoCommit() async {
if (_writeConnection == null) {
throw AssertionError('Closed');
throw ClosedException();
}
return await _writeConnection!.getAutoCommit();
}
Expand Down Expand Up @@ -94,6 +94,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
}, lockTimeout: lockTimeout, debugContext: debugContext);
} on TimeoutException {
return false;
} on ClosedException {
return false;
}
});

Expand All @@ -118,7 +120,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
if (closed) {
throw AssertionError('Closed');
throw ClosedException();
}
if (_writeConnection?.closed == true) {
_writeConnection = null;
Expand Down Expand Up @@ -156,7 +158,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
if (closed || _readConnections.length >= maxReaders) {
return;
}
bool hasCapacity = _readConnections.any((connection) => !connection.locked);
bool hasCapacity = _readConnections
.any((connection) => !connection.locked && !connection.closed);
if (!hasCapacity) {
var name = debugName == null
? null
Expand All @@ -183,7 +186,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
@override
Future<void> close() async {
closed = true;
for (var connection in _readConnections) {

// It is possible that `readLock()` removes connections from the pool while we're
// closing connections, but not possible for new connections to be added.
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
final toClose = _readConnections.sublist(0);
for (var connection in toClose) {
// Wait for connection initialization, so that any existing readLock()
// requests go through before closing.
await connection.ready;
await connection.close();
}
// Closing the write connection cleans up the journal files (-shm and -wal files).
Expand Down
69 changes: 58 additions & 11 deletions lib/src/port_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ abstract class PortClient {
class ParentPortClient implements PortClient {
late Future<SendPort> sendPortFuture;
SendPort? sendPort;
ReceivePort receivePort = ReceivePort();
final ReceivePort _receivePort = ReceivePort();
final ReceivePort _errorPort = ReceivePort();
bool closed = false;
Object? _closeError;
String? _isolateDebugName;
int _nextId = 1;

Map<int, Completer<Object?>> handlers = HashMap();
Expand All @@ -30,7 +33,7 @@ class ParentPortClient implements PortClient {
sendPortFuture.then((value) {
sendPort = value;
});
receivePort.listen((message) {
_receivePort.listen((message) {
if (message is _InitMessage) {
assert(!initCompleter.isCompleted);
initCompleter.complete(message.port);
Expand All @@ -57,24 +60,35 @@ class ParentPortClient implements PortClient {
}
close();
});
_errorPort.listen((message) {
final [error, stackTraceString] = message;
final stackTrace = stackTraceString == null
? null
: StackTrace.fromString(stackTraceString);
if (!initCompleter.isCompleted) {
initCompleter.completeError(error, stackTrace);
}
_close(IsolateError(cause: error, isolateDebugName: _isolateDebugName),
stackTrace);
});
}

Future<void> get ready async {
await sendPortFuture;
}

void _cancelAll(Object error) {
void _cancelAll(Object error, [StackTrace? stackTrace]) {
var handlers = this.handlers;
this.handlers = {};
for (var message in handlers.values) {
message.completeError(error);
message.completeError(error, stackTrace);
}
}

@override
Future<T> post<T>(Object message) async {
if (closed) {
throw ClosedException();
throw _closeError ?? const ClosedException();
}
var completer = Completer<T>.sync();
var id = _nextId++;
Expand All @@ -87,27 +101,39 @@ class ParentPortClient implements PortClient {
@override
void fire(Object message) async {
if (closed) {
throw ClosedException();
throw _closeError ?? ClosedException();
}
final port = sendPort ?? await sendPortFuture;
port.send(_FireMessage(message));
}

RequestPortServer server() {
return RequestPortServer(receivePort.sendPort);
return RequestPortServer(_receivePort.sendPort);
}

void close() async {
void _close([Object? error, StackTrace? stackTrace]) {
if (!closed) {
closed = true;

receivePort.close();
_cancelAll(const ClosedException());
_receivePort.close();
_errorPort.close();
if (error == null) {
_cancelAll(const ClosedException());
} else {
_closeError = error;
_cancelAll(error, stackTrace);
}
}
}

void close() {
_close();
}

tieToIsolate(Isolate isolate) {
isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage);
_isolateDebugName = isolate.debugName;
isolate.addErrorListener(_errorPort.sendPort);
isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage);
}
}

Expand Down Expand Up @@ -261,6 +287,27 @@ class _RequestMessage {

class ClosedException implements Exception {
const ClosedException();

@override
String toString() {
return 'ClosedException';
}
}

class IsolateError extends Error {
final Object cause;
final String? isolateDebugName;

IsolateError({required this.cause, this.isolateDebugName});

@override
String toString() {
if (isolateDebugName != null) {
return 'IsolateError in $isolateDebugName: $cause';
} else {
return 'IsolateError: $cause';
}
}
}

class _PortChannelResult<T> {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/sqlite_connection_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
@override
Future<void> close() async {
await _connectionMutex.lock(() async {
if (closed) {
return;
}
if (readOnly) {
await _isolateClient.post(const _SqliteIsolateConnectionClose());
} else {
Expand All @@ -97,6 +100,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
await _isolateClient.post(const _SqliteIsolateConnectionClose());
});
}
_isolateClient.close();
_isolate.kill();
});
}
Expand Down
39 changes: 35 additions & 4 deletions test/basic_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:math';
import 'package:sqlite3/sqlite3.dart' as sqlite;
import 'package:sqlite_async/mutex.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:test/expect.dart';
import 'package:test/test.dart';

import 'util.dart';
Expand Down Expand Up @@ -334,8 +335,11 @@ void main() {
}).catchError((error) {
caughtError = error;
});
// This may change into a better error in the future
expect(caughtError.toString(), equals("Instance of 'ClosedException'"));
// The specific error message may change
expect(
caughtError.toString(),
equals(
"IsolateError in sqlite-writer: Invalid argument(s): uncaught async error"));

// Check that we can still continue afterwards
final computed = await db.computeWithDatabase((db) async {
Expand All @@ -361,8 +365,11 @@ void main() {
}).catchError((error) {
caughtError = error;
});
// This may change into a better error in the future
expect(caughtError.toString(), equals("Instance of 'ClosedException'"));
// The specific message may change
expect(
caughtError.toString(),
matches(RegExp(
r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error')));
}

// Check that we can still continue afterwards
Expand Down Expand Up @@ -401,6 +408,30 @@ void main() {
expect(await savedTx!.getAutoCommit(), equals(true));
expect(savedTx!.closed, equals(true));
});

test('closing', () async {
// Test race condition in SqliteConnectionPool:
// 1. Open two concurrent queries, which opens two connection.
// 2. Second connection takes longer to open than first.
// 3. Call db.close().
// 4. Now second connection is ready. Second query has two connections to choose from.
// 5. However, first connection is closed, so it's removed from the pool.
// 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'`
final db =
SqliteDatabase.withFactory(testFactory(path: path, initStatements: [
// Second connection to sleep more than first connection
'SELECT test_sleep(test_connection_number() * 10)'
]));
await db.initialize();

final future1 = db.get('SELECT test_sleep(10) as sleep');
final future2 = db.get('SELECT test_sleep(10) as sleep');

await db.close();

await future1;
await future2;
});
});
}

Expand Down
25 changes: 22 additions & 3 deletions test/util.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ const defaultSqlitePath = 'libsqlite3.so.0';

class TestSqliteOpenFactory extends DefaultSqliteOpenFactory {
String sqlitePath;
List<String> initStatements;

TestSqliteOpenFactory(
{required super.path,
super.sqliteOptions,
this.sqlitePath = defaultSqlitePath});
this.sqlitePath = defaultSqlitePath,
this.initStatements = const []});

@override
sqlite.Database open(SqliteOpenOptions options) {
Expand All @@ -45,12 +47,29 @@ class TestSqliteOpenFactory extends DefaultSqliteOpenFactory {
},
);

db.createFunction(
functionName: 'test_connection_number',
argumentCount: const sqlite.AllowedArgumentCount(0),
function: (args) {
// write: 0, read: 1 - 5
final name = Isolate.current.debugName ?? '-0';
var nr = name.split('-').last;
return int.tryParse(nr) ?? 0;
},
);

for (var s in initStatements) {
db.execute(s);
}

return db;
}
}

SqliteOpenFactory testFactory({String? path}) {
return TestSqliteOpenFactory(path: path ?? dbPath());
SqliteOpenFactory testFactory(
{String? path, List<String> initStatements = const []}) {
return TestSqliteOpenFactory(
path: path ?? dbPath(), initStatements: initStatements);
}

Future<SqliteDatabase> setupDatabase({String? path}) async {
Expand Down
Loading