diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 23b1e0f..d0c8912 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -53,7 +53,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future getAutoCommit() async { if (_writeConnection == null) { - throw AssertionError('Closed'); + throw ClosedException(); } return await _writeConnection!.getAutoCommit(); } @@ -94,6 +94,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { }, lockTimeout: lockTimeout, debugContext: debugContext); } on TimeoutException { return false; + } on ClosedException { + return false; } }); @@ -118,7 +120,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { if (closed) { - throw AssertionError('Closed'); + throw ClosedException(); } if (_writeConnection?.closed == true) { _writeConnection = null; @@ -156,7 +158,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (closed || _readConnections.length >= maxReaders) { return; } - bool hasCapacity = _readConnections.any((connection) => !connection.locked); + bool hasCapacity = _readConnections + .any((connection) => !connection.locked && !connection.closed); if (!hasCapacity) { var name = debugName == null ? null @@ -183,7 +186,15 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future close() async { closed = true; - for (var connection in _readConnections) { + + // It is possible that `readLock()` removes connections from the pool while we're + // closing connections, but not possible for new connections to be added. + // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" + final toClose = _readConnections.sublist(0); + for (var connection in toClose) { + // Wait for connection initialization, so that any existing readLock() + // requests go through before closing. + await connection.ready; await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index c32b7b9..8b05feb 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -18,8 +18,11 @@ abstract class PortClient { class ParentPortClient implements PortClient { late Future sendPortFuture; SendPort? sendPort; - ReceivePort receivePort = ReceivePort(); + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); bool closed = false; + Object? _closeError; + String? _isolateDebugName; int _nextId = 1; Map> handlers = HashMap(); @@ -30,7 +33,7 @@ class ParentPortClient implements PortClient { sendPortFuture.then((value) { sendPort = value; }); - receivePort.listen((message) { + _receivePort.listen((message) { if (message is _InitMessage) { assert(!initCompleter.isCompleted); initCompleter.complete(message.port); @@ -57,24 +60,35 @@ class ParentPortClient implements PortClient { } close(); }); + _errorPort.listen((message) { + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); + if (!initCompleter.isCompleted) { + initCompleter.completeError(error, stackTrace); + } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); + }); } Future get ready async { await sendPortFuture; } - void _cancelAll(Object error) { + void _cancelAll(Object error, [StackTrace? stackTrace]) { var handlers = this.handlers; this.handlers = {}; for (var message in handlers.values) { - message.completeError(error); + message.completeError(error, stackTrace); } } @override Future post(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? const ClosedException(); } var completer = Completer.sync(); var id = _nextId++; @@ -87,27 +101,39 @@ class ParentPortClient implements PortClient { @override void fire(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? ClosedException(); } final port = sendPort ?? await sendPortFuture; port.send(_FireMessage(message)); } RequestPortServer server() { - return RequestPortServer(receivePort.sendPort); + return RequestPortServer(_receivePort.sendPort); } - void close() async { + void _close([Object? error, StackTrace? stackTrace]) { if (!closed) { closed = true; - receivePort.close(); - _cancelAll(const ClosedException()); + _receivePort.close(); + _errorPort.close(); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } } } + void close() { + _close(); + } + tieToIsolate(Isolate isolate) { - isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage); + _isolateDebugName = isolate.debugName; + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } } @@ -261,6 +287,27 @@ class _RequestMessage { class ClosedException implements Exception { const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } } class _PortChannelResult { diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index 643152e..7057a73 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -88,6 +88,9 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future close() async { await _connectionMutex.lock(() async { + if (closed) { + return; + } if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); } else { @@ -97,6 +100,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { await _isolateClient.post(const _SqliteIsolateConnectionClose()); }); } + _isolateClient.close(); _isolate.kill(); }); } diff --git a/test/basic_test.dart b/test/basic_test.dart index 55a7d12..fa0d5ef 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -4,6 +4,7 @@ import 'dart:math'; import 'package:sqlite3/sqlite3.dart' as sqlite; import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:test/expect.dart'; import 'package:test/test.dart'; import 'util.dart'; @@ -334,8 +335,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific error message may change + expect( + caughtError.toString(), + equals( + "IsolateError in sqlite-writer: Invalid argument(s): uncaught async error")); // Check that we can still continue afterwards final computed = await db.computeWithDatabase((db) async { @@ -361,8 +365,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific message may change + expect( + caughtError.toString(), + matches(RegExp( + r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error'))); } // Check that we can still continue afterwards @@ -401,6 +408,30 @@ void main() { expect(await savedTx!.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); + + test('closing', () async { + // Test race condition in SqliteConnectionPool: + // 1. Open two concurrent queries, which opens two connection. + // 2. Second connection takes longer to open than first. + // 3. Call db.close(). + // 4. Now second connection is ready. Second query has two connections to choose from. + // 5. However, first connection is closed, so it's removed from the pool. + // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` + final db = + SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ + // Second connection to sleep more than first connection + 'SELECT test_sleep(test_connection_number() * 10)' + ])); + await db.initialize(); + + final future1 = db.get('SELECT test_sleep(10) as sleep'); + final future2 = db.get('SELECT test_sleep(10) as sleep'); + + await db.close(); + + await future1; + await future2; + }); }); } diff --git a/test/util.dart b/test/util.dart index 12458fc..5bcbb50 100644 --- a/test/util.dart +++ b/test/util.dart @@ -14,11 +14,13 @@ const defaultSqlitePath = 'libsqlite3.so.0'; class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { String sqlitePath; + List initStatements; TestSqliteOpenFactory( {required super.path, super.sqliteOptions, - this.sqlitePath = defaultSqlitePath}); + this.sqlitePath = defaultSqlitePath, + this.initStatements = const []}); @override sqlite.Database open(SqliteOpenOptions options) { @@ -45,12 +47,29 @@ class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { }, ); + db.createFunction( + functionName: 'test_connection_number', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + // write: 0, read: 1 - 5 + final name = Isolate.current.debugName ?? '-0'; + var nr = name.split('-').last; + return int.tryParse(nr) ?? 0; + }, + ); + + for (var s in initStatements) { + db.execute(s); + } + return db; } } -SqliteOpenFactory testFactory({String? path}) { - return TestSqliteOpenFactory(path: path ?? dbPath()); +SqliteOpenFactory testFactory( + {String? path, List initStatements = const []}) { + return TestSqliteOpenFactory( + path: path ?? dbPath(), initStatements: initStatements); } Future setupDatabase({String? path}) async {