From eef16ade3d7ea7fb02dc34ffc90694bdf710ff8e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:04:17 +0200 Subject: [PATCH 1/8] Add failing test. --- test/basic_test.dart | 24 ++++++++++++++++++++++++ test/util.dart | 25 ++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/test/basic_test.dart b/test/basic_test.dart index 9b563c8..2e4804a 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -368,6 +368,30 @@ void main() { expect(await savedTx!.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); + + test('closing', () async { + // Test race condition in SqliteConnectionPool: + // 1. Open two concurrent queries, which opens two connection. + // 2. Second connection takes longer to open than first. + // 3. Call db.close(). + // 4. Now second connection is ready. Second query has two connections to choose from. + // 5. However, first connection is closed, so it's removed from the pool. + // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` + + final db = + SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ + // Second connection to sleep more than first connection + 'SELECT test_sleep(test_connection_number() * 10)' + ])); + await createTables(db); + + final future1 = db.get('SELECT test_sleep(10) as sleep'); + final future2 = db.get('SELECT test_sleep(10) as sleep'); + final closeFuture = db.close(); + await closeFuture; + await future1; + await future2; + }); }); } diff --git a/test/util.dart b/test/util.dart index 12458fc..5bcbb50 100644 --- a/test/util.dart +++ b/test/util.dart @@ -14,11 +14,13 @@ const defaultSqlitePath = 'libsqlite3.so.0'; class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { String sqlitePath; + List initStatements; TestSqliteOpenFactory( {required super.path, super.sqliteOptions, - this.sqlitePath = defaultSqlitePath}); + this.sqlitePath = defaultSqlitePath, + this.initStatements = const []}); @override sqlite.Database open(SqliteOpenOptions options) { @@ -45,12 +47,29 @@ class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { }, ); + db.createFunction( + functionName: 'test_connection_number', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + // write: 0, read: 1 - 5 + final name = Isolate.current.debugName ?? '-0'; + var nr = name.split('-').last; + return int.tryParse(nr) ?? 0; + }, + ); + + for (var s in initStatements) { + db.execute(s); + } + return db; } } -SqliteOpenFactory testFactory({String? path}) { - return TestSqliteOpenFactory(path: path ?? dbPath()); +SqliteOpenFactory testFactory( + {String? path, List initStatements = const []}) { + return TestSqliteOpenFactory( + path: path ?? dbPath(), initStatements: initStatements); } Future setupDatabase({String? path}) async { From 84173344b0e2b3fed0be4553cb2a6c5e8cd778b9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:04:48 +0200 Subject: [PATCH 2/8] Use ClosedException. --- lib/src/connection_pool.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 23b1e0f..5b5316b 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -53,7 +53,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future getAutoCommit() async { if (_writeConnection == null) { - throw AssertionError('Closed'); + throw ClosedException(); } return await _writeConnection!.getAutoCommit(); } @@ -118,7 +118,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { if (closed) { - throw AssertionError('Closed'); + throw ClosedException(); } if (_writeConnection?.closed == true) { _writeConnection = null; From a7221567a4427913609c12ffd69528748fba2ce7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:07:10 +0200 Subject: [PATCH 3/8] Fix "Concurrent modification during iteration" errors. --- lib/src/connection_pool.dart | 7 ++++++- test/basic_test.dart | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 5b5316b..6a99a59 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -183,7 +183,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future close() async { closed = true; - for (var connection in _readConnections) { + + // It is possible that `readLock()` removes connections from the pool while we're + // closing connections, but not possible for new connections to be added. + // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" + final toClose = _readConnections.sublist(0); + for (var connection in toClose) { await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). diff --git a/test/basic_test.dart b/test/basic_test.dart index 2e4804a..4bc9a42 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -383,7 +383,6 @@ void main() { // Second connection to sleep more than first connection 'SELECT test_sleep(test_connection_number() * 10)' ])); - await createTables(db); final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); From dea85050b910e2c7629151370e8845f648095b38 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:39:29 +0200 Subject: [PATCH 4/8] Handle ClosedException in SqliteConnectionPool. --- lib/src/connection_pool.dart | 2 ++ test/basic_test.dart | 1 + 2 files changed, 3 insertions(+) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 6a99a59..f6ae300 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -94,6 +94,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { }, lockTimeout: lockTimeout, debugContext: debugContext); } on TimeoutException { return false; + } on ClosedException { + return false; } }); diff --git a/test/basic_test.dart b/test/basic_test.dart index 4bc9a42..8ed43dc 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -383,6 +383,7 @@ void main() { // Second connection to sleep more than first connection 'SELECT test_sleep(test_connection_number() * 10)' ])); + await db.initialize(); final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); From dd2bce05cca0951ae53f55b9a33e75a01417ffa5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 12:41:56 +0200 Subject: [PATCH 5/8] Fix race condition in closing. --- lib/src/connection_pool.dart | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index f6ae300..7852b8e 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -191,6 +191,9 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" final toClose = _readConnections.sublist(0); for (var connection in toClose) { + // Wait for connection initialization, so that any existing readLock() + // requests go through before closing. + await connection.ready; await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). From 5cf85db6ebd9fafa0c3c2780f99c37cc0cd27b0c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:10:01 +0200 Subject: [PATCH 6/8] Slightly better handling of connection initialization errors. --- lib/src/port_channel.dart | 24 +++++++++++++++++++----- lib/src/sqlite_connection_impl.dart | 4 ++++ test/basic_test.dart | 6 +++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index c32b7b9..9701023 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -18,7 +18,8 @@ abstract class PortClient { class ParentPortClient implements PortClient { late Future sendPortFuture; SendPort? sendPort; - ReceivePort receivePort = ReceivePort(); + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); bool closed = false; int _nextId = 1; @@ -30,7 +31,7 @@ class ParentPortClient implements PortClient { sendPortFuture.then((value) { sendPort = value; }); - receivePort.listen((message) { + _receivePort.listen((message) { if (message is _InitMessage) { assert(!initCompleter.isCompleted); initCompleter.complete(message.port); @@ -57,6 +58,17 @@ class ParentPortClient implements PortClient { } close(); }); + _errorPort.listen((message) { + var [error, stackTrace] = message; + print('got an error ${initCompleter.isCompleted} $error'); + if (!initCompleter.isCompleted) { + if (stackTrace == null) { + initCompleter.completeError(error); + } else { + initCompleter.completeError(error, StackTrace.fromString(stackTrace)); + } + } + }); } Future get ready async { @@ -94,20 +106,22 @@ class ParentPortClient implements PortClient { } RequestPortServer server() { - return RequestPortServer(receivePort.sendPort); + return RequestPortServer(_receivePort.sendPort); } void close() async { if (!closed) { closed = true; - receivePort.close(); + _receivePort.close(); + _errorPort.close(); _cancelAll(const ClosedException()); } } tieToIsolate(Isolate isolate) { - isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage); + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } } diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index 643152e..7057a73 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -88,6 +88,9 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future close() async { await _connectionMutex.lock(() async { + if (closed) { + return; + } if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); } else { @@ -97,6 +100,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { await _isolateClient.post(const _SqliteIsolateConnectionClose()); }); } + _isolateClient.close(); _isolate.kill(); }); } diff --git a/test/basic_test.dart b/test/basic_test.dart index 8ed43dc..113ccb2 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -377,7 +377,6 @@ void main() { // 4. Now second connection is ready. Second query has two connections to choose from. // 5. However, first connection is closed, so it's removed from the pool. // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` - final db = SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ // Second connection to sleep more than first connection @@ -387,8 +386,9 @@ void main() { final future1 = db.get('SELECT test_sleep(10) as sleep'); final future2 = db.get('SELECT test_sleep(10) as sleep'); - final closeFuture = db.close(); - await closeFuture; + + await db.close(); + await future1; await future2; }); From 3b79bce346cc9dfe800f2f9b6cd5eee0d92c7073 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:14:15 +0200 Subject: [PATCH 7/8] Cleanup. --- lib/src/port_channel.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index 9701023..4b952bf 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -60,7 +60,6 @@ class ParentPortClient implements PortClient { }); _errorPort.listen((message) { var [error, stackTrace] = message; - print('got an error ${initCompleter.isCompleted} $error'); if (!initCompleter.isCompleted) { if (stackTrace == null) { initCompleter.completeError(error); From 1af625b2d6c6ad6e9cce811644253eccb6b312c5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 3 Apr 2024 13:40:12 +0200 Subject: [PATCH 8/8] Improve handling of uncaught errors in Isolates. --- lib/src/connection_pool.dart | 3 +- lib/src/port_channel.dart | 58 ++++++++++++++++++++++++++++-------- test/basic_test.dart | 15 +++++++--- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 7852b8e..d0c8912 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -158,7 +158,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { if (closed || _readConnections.length >= maxReaders) { return; } - bool hasCapacity = _readConnections.any((connection) => !connection.locked); + bool hasCapacity = _readConnections + .any((connection) => !connection.locked && !connection.closed); if (!hasCapacity) { var name = debugName == null ? null diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index 4b952bf..8b05feb 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -21,6 +21,8 @@ class ParentPortClient implements PortClient { final ReceivePort _receivePort = ReceivePort(); final ReceivePort _errorPort = ReceivePort(); bool closed = false; + Object? _closeError; + String? _isolateDebugName; int _nextId = 1; Map> handlers = HashMap(); @@ -59,14 +61,15 @@ class ParentPortClient implements PortClient { close(); }); _errorPort.listen((message) { - var [error, stackTrace] = message; + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); if (!initCompleter.isCompleted) { - if (stackTrace == null) { - initCompleter.completeError(error); - } else { - initCompleter.completeError(error, StackTrace.fromString(stackTrace)); - } + initCompleter.completeError(error, stackTrace); } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); }); } @@ -74,18 +77,18 @@ class ParentPortClient implements PortClient { await sendPortFuture; } - void _cancelAll(Object error) { + void _cancelAll(Object error, [StackTrace? stackTrace]) { var handlers = this.handlers; this.handlers = {}; for (var message in handlers.values) { - message.completeError(error); + message.completeError(error, stackTrace); } } @override Future post(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? const ClosedException(); } var completer = Completer.sync(); var id = _nextId++; @@ -98,7 +101,7 @@ class ParentPortClient implements PortClient { @override void fire(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? ClosedException(); } final port = sendPort ?? await sendPortFuture; port.send(_FireMessage(message)); @@ -108,17 +111,27 @@ class ParentPortClient implements PortClient { return RequestPortServer(_receivePort.sendPort); } - void close() async { + void _close([Object? error, StackTrace? stackTrace]) { if (!closed) { closed = true; _receivePort.close(); _errorPort.close(); - _cancelAll(const ClosedException()); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } } } + void close() { + _close(); + } + tieToIsolate(Isolate isolate) { + _isolateDebugName = isolate.debugName; isolate.addErrorListener(_errorPort.sendPort); isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } @@ -274,6 +287,27 @@ class _RequestMessage { class ClosedException implements Exception { const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } } class _PortChannelResult { diff --git a/test/basic_test.dart b/test/basic_test.dart index 113ccb2..c3ad150 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -4,6 +4,7 @@ import 'dart:math'; import 'package:sqlite3/sqlite3.dart' as sqlite; import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:test/expect.dart'; import 'package:test/test.dart'; import 'util.dart'; @@ -301,8 +302,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific error message may change + expect( + caughtError.toString(), + equals( + "IsolateError in sqlite-writer: Invalid argument(s): uncaught async error")); // Check that we can still continue afterwards final computed = await db.computeWithDatabase((db) async { @@ -328,8 +332,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific message may change + expect( + caughtError.toString(), + matches(RegExp( + r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error'))); } // Check that we can still continue afterwards