diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 23b1e0f..3a08dbb 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:collection'; import 'mutex.dart'; import 'port_channel.dart'; @@ -12,7 +13,9 @@ import 'update_notification.dart'; class SqliteConnectionPool with SqliteQueries implements SqliteConnection { SqliteConnection? _writeConnection; - final List _readConnections = []; + final Set _allReadConnections = {}; + final Queue _availableReadConnections = Queue(); + final Queue<_PendingItem> _queue = Queue(); final SqliteOpenFactory _factory; final SerializedPortClient _upstreamPort; @@ -53,72 +56,84 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future getAutoCommit() async { if (_writeConnection == null) { - throw AssertionError('Closed'); + throw ClosedException(); } return await _writeConnection!.getAutoCommit(); } - @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { - await _expandPool(); - - return _runZoned(() async { - bool haveLock = false; - var completer = Completer(); + void _nextRead() { + if (_queue.isEmpty) { + // Wait for queue item + return; + } else if (closed) { + while (_queue.isNotEmpty) { + final nextItem = _queue.removeFirst(); + nextItem.completer.completeError(const ClosedException()); + } + return; + } - var futures = _readConnections.sublist(0).map((connection) async { - if (connection.closed) { - _readConnections.remove(connection); - } - try { - return await connection.readLock((ctx) async { - if (haveLock) { - // Already have a different lock - release this one. - return false; - } - haveLock = true; - - var future = callback(ctx); - completer.complete(future); - - // We have to wait for the future to complete before we can release the - // lock. - try { - await future; - } catch (_) { - // Ignore - } - - return true; - }, lockTimeout: lockTimeout, debugContext: debugContext); - } on TimeoutException { - return false; - } - }); + while (_availableReadConnections.isNotEmpty && + _availableReadConnections.last.closed) { + // Remove connections that may have errored + final connection = _availableReadConnections.removeLast(); + _allReadConnections.remove(connection); + } - final stream = Stream.fromFutures(futures); - var gotAny = await stream.any((element) => element); + if (_availableReadConnections.isEmpty && + _allReadConnections.length == maxReaders) { + // Wait for available connection + return; + } - if (!gotAny) { - // All TimeoutExceptions - throw TimeoutException('Failed to get a read connection', lockTimeout); + var nextItem = _queue.removeFirst(); + while (nextItem.completer.isCompleted) { + // This item already timed out - try the next one if available + if (_queue.isEmpty) { + return; } + nextItem = _queue.removeFirst(); + } + + nextItem.lockTimer?.cancel(); + nextItem.completer.complete(Future.sync(() async { + final nextConnection = _availableReadConnections.isEmpty + ? await _expandPool() + : _availableReadConnections.removeLast(); try { - return await completer.future; - } catch (e) { - // throw e; - rethrow; + // At this point the connection is expected to be available immediately. + // No need to calculate a new lockTimeout here. + final result = await nextConnection.readLock(nextItem.callback); + return result; + } finally { + _availableReadConnections.add(nextConnection); + Timer.run(_nextRead); } - }, debugContext: debugContext ?? 'get*()'); + })); + } + + @override + Future readLock(ReadCallback callback, + {Duration? lockTimeout, String? debugContext}) async { + if (closed) { + throw ClosedException(); + } + final zone = _getZone(debugContext: debugContext ?? 'get*()'); + final item = _PendingItem((ctx) { + return zone.runUnary(callback, ctx); + }, lockTimeout: lockTimeout); + _queue.add(item); + _nextRead(); + + return (await item.future) as T; } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { if (closed) { - throw AssertionError('Closed'); + throw ClosedException(); } if (_writeConnection?.closed == true) { _writeConnection = null; @@ -144,46 +159,52 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { /// connection (with a different lock). /// 2. Give a more specific error message when it happens. T _runZoned(T Function() callback, {required String debugContext}) { + return _getZone(debugContext: debugContext).run(callback); + } + + Zone _getZone({required String debugContext}) { if (Zone.current[this] != null) { throw LockError( 'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.'); } - var zone = Zone.current.fork(zoneValues: {this: true}); - return zone.run(callback); + return Zone.current.fork(zoneValues: {this: true}); } - Future _expandPool() async { - if (closed || _readConnections.length >= maxReaders) { - return; - } - bool hasCapacity = _readConnections.any((connection) => !connection.locked); - if (!hasCapacity) { - var name = debugName == null - ? null - : '$debugName-${_readConnections.length + 1}'; - var connection = SqliteConnectionImpl( - upstreamPort: _upstreamPort, - primary: false, - updates: updates, - debugName: name, - mutex: mutex, - readOnly: true, - openFactory: _factory); - _readConnections.add(connection); - - // Edge case: - // If we don't await here, there is a chance that a different connection - // is used for the transaction, and that it finishes and deletes the database - // while this one is still opening. This is specifically triggered in tests. - // To avoid that, we wait for the connection to be ready. - await connection.ready; - } + Future _expandPool() async { + var name = debugName == null + ? null + : '$debugName-${_allReadConnections.length + 1}'; + var connection = SqliteConnectionImpl( + upstreamPort: _upstreamPort, + primary: false, + updates: updates, + debugName: name, + mutex: mutex, + readOnly: true, + openFactory: _factory); + _allReadConnections.add(connection); + + // Edge case: + // If we don't await here, there is a chance that a different connection + // is used for the transaction, and that it finishes and deletes the database + // while this one is still opening. This is specifically triggered in tests. + // To avoid that, we wait for the connection to be ready. + await connection.ready; + return connection; } @override Future close() async { closed = true; - for (var connection in _readConnections) { + + // It is possible that `readLock()` removes connections from the pool while we're + // closing connections, but not possible for new connections to be added. + // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration" + final toClose = _allReadConnections.toList(); + for (var connection in toClose) { + // Wait for connection initialization, so that any existing readLock() + // requests go through before closing. + await connection.ready; await connection.close(); } // Closing the write connection cleans up the journal files (-shm and -wal files). @@ -192,3 +213,34 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await _writeConnection?.close(); } } + +typedef ReadCallback = Future Function(SqliteReadContext tx); + +class _PendingItem { + ReadCallback callback; + Completer completer = Completer.sync(); + late Future future = completer.future; + DateTime? deadline; + final Duration? lockTimeout; + late final Timer? lockTimer; + + _PendingItem(this.callback, {this.lockTimeout}) { + if (lockTimeout != null) { + deadline = DateTime.now().add(lockTimeout!); + lockTimer = Timer(lockTimeout!, () { + // Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available. + // This matches the behavior we need for a timeout on the lock, but not the entire operation. + if (!completer.isCompleted) { + // completer.completeError( + // TimeoutException('Failed to get a read connection', lockTimeout)); + completer.complete(Future.sync(() async { + throw TimeoutException( + 'Failed to get a read connection', lockTimeout); + })); + } + }); + } else { + lockTimer = null; + } + } +} diff --git a/lib/src/port_channel.dart b/lib/src/port_channel.dart index c32b7b9..8b05feb 100644 --- a/lib/src/port_channel.dart +++ b/lib/src/port_channel.dart @@ -18,8 +18,11 @@ abstract class PortClient { class ParentPortClient implements PortClient { late Future sendPortFuture; SendPort? sendPort; - ReceivePort receivePort = ReceivePort(); + final ReceivePort _receivePort = ReceivePort(); + final ReceivePort _errorPort = ReceivePort(); bool closed = false; + Object? _closeError; + String? _isolateDebugName; int _nextId = 1; Map> handlers = HashMap(); @@ -30,7 +33,7 @@ class ParentPortClient implements PortClient { sendPortFuture.then((value) { sendPort = value; }); - receivePort.listen((message) { + _receivePort.listen((message) { if (message is _InitMessage) { assert(!initCompleter.isCompleted); initCompleter.complete(message.port); @@ -57,24 +60,35 @@ class ParentPortClient implements PortClient { } close(); }); + _errorPort.listen((message) { + final [error, stackTraceString] = message; + final stackTrace = stackTraceString == null + ? null + : StackTrace.fromString(stackTraceString); + if (!initCompleter.isCompleted) { + initCompleter.completeError(error, stackTrace); + } + _close(IsolateError(cause: error, isolateDebugName: _isolateDebugName), + stackTrace); + }); } Future get ready async { await sendPortFuture; } - void _cancelAll(Object error) { + void _cancelAll(Object error, [StackTrace? stackTrace]) { var handlers = this.handlers; this.handlers = {}; for (var message in handlers.values) { - message.completeError(error); + message.completeError(error, stackTrace); } } @override Future post(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? const ClosedException(); } var completer = Completer.sync(); var id = _nextId++; @@ -87,27 +101,39 @@ class ParentPortClient implements PortClient { @override void fire(Object message) async { if (closed) { - throw ClosedException(); + throw _closeError ?? ClosedException(); } final port = sendPort ?? await sendPortFuture; port.send(_FireMessage(message)); } RequestPortServer server() { - return RequestPortServer(receivePort.sendPort); + return RequestPortServer(_receivePort.sendPort); } - void close() async { + void _close([Object? error, StackTrace? stackTrace]) { if (!closed) { closed = true; - receivePort.close(); - _cancelAll(const ClosedException()); + _receivePort.close(); + _errorPort.close(); + if (error == null) { + _cancelAll(const ClosedException()); + } else { + _closeError = error; + _cancelAll(error, stackTrace); + } } } + void close() { + _close(); + } + tieToIsolate(Isolate isolate) { - isolate.addOnExitListener(receivePort.sendPort, response: _closeMessage); + _isolateDebugName = isolate.debugName; + isolate.addErrorListener(_errorPort.sendPort); + isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage); } } @@ -261,6 +287,27 @@ class _RequestMessage { class ClosedException implements Exception { const ClosedException(); + + @override + String toString() { + return 'ClosedException'; + } +} + +class IsolateError extends Error { + final Object cause; + final String? isolateDebugName; + + IsolateError({required this.cause, this.isolateDebugName}); + + @override + String toString() { + if (isolateDebugName != null) { + return 'IsolateError in $isolateDebugName: $cause'; + } else { + return 'IsolateError: $cause'; + } + } } class _PortChannelResult { diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index 643152e..7057a73 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -88,6 +88,9 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { @override Future close() async { await _connectionMutex.lock(() async { + if (closed) { + return; + } if (readOnly) { await _isolateClient.post(const _SqliteIsolateConnectionClose()); } else { @@ -97,6 +100,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { await _isolateClient.post(const _SqliteIsolateConnectionClose()); }); } + _isolateClient.close(); _isolate.kill(); }); } diff --git a/test/basic_test.dart b/test/basic_test.dart index 55a7d12..a3676cf 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -334,8 +334,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific error message may change + expect( + caughtError.toString(), + equals( + "IsolateError in sqlite-writer: Invalid argument(s): uncaught async error")); // Check that we can still continue afterwards final computed = await db.computeWithDatabase((db) async { @@ -361,8 +364,11 @@ void main() { }).catchError((error) { caughtError = error; }); - // This may change into a better error in the future - expect(caughtError.toString(), equals("Instance of 'ClosedException'")); + // The specific message may change + expect( + caughtError.toString(), + matches(RegExp( + r'IsolateError in sqlite-\d+: Invalid argument\(s\): uncaught async error'))); } // Check that we can still continue afterwards @@ -401,6 +407,53 @@ void main() { expect(await savedTx!.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); + + test('closing', () async { + // Test race condition in SqliteConnectionPool: + // 1. Open two concurrent queries, which opens two connection. + // 2. Second connection takes longer to open than first. + // 3. Call db.close(). + // 4. Now second connection is ready. Second query has two connections to choose from. + // 5. However, first connection is closed, so it's removed from the pool. + // 6. Triggers `Concurrent modification during iteration: Instance(length:1) of '_GrowableList'` + final db = + SqliteDatabase.withFactory(testFactory(path: path, initStatements: [ + // Second connection to sleep more than first connection + 'SELECT test_sleep(test_connection_number() * 10)' + ])); + await db.initialize(); + + final future1 = db.get('SELECT test_sleep(10) as sleep'); + final future2 = db.get('SELECT test_sleep(10) as sleep'); + + await db.close(); + + await future1; + await future2; + }); + + test('lockTimeout', () async { + final db = + SqliteDatabase.withFactory(testFactory(path: path), maxReaders: 2); + await db.initialize(); + + final f1 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + final f2 = db.readTransaction((tx) async { + await tx.get('select test_sleep(100)'); + }, lockTimeout: const Duration(milliseconds: 200)); + + // At this point, both read connections are in use + await expectLater(() async { + await db.readLock((tx) async { + await tx.get('select test_sleep(10)'); + }, lockTimeout: const Duration(milliseconds: 2)); + }, throwsA((e) => e is TimeoutException)); + + await Future.wait([f1, f2]); + }); }); } diff --git a/test/util.dart b/test/util.dart index 12458fc..5bcbb50 100644 --- a/test/util.dart +++ b/test/util.dart @@ -14,11 +14,13 @@ const defaultSqlitePath = 'libsqlite3.so.0'; class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { String sqlitePath; + List initStatements; TestSqliteOpenFactory( {required super.path, super.sqliteOptions, - this.sqlitePath = defaultSqlitePath}); + this.sqlitePath = defaultSqlitePath, + this.initStatements = const []}); @override sqlite.Database open(SqliteOpenOptions options) { @@ -45,12 +47,29 @@ class TestSqliteOpenFactory extends DefaultSqliteOpenFactory { }, ); + db.createFunction( + functionName: 'test_connection_number', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + // write: 0, read: 1 - 5 + final name = Isolate.current.debugName ?? '-0'; + var nr = name.split('-').last; + return int.tryParse(nr) ?? 0; + }, + ); + + for (var s in initStatements) { + db.execute(s); + } + return db; } } -SqliteOpenFactory testFactory({String? path}) { - return TestSqliteOpenFactory(path: path ?? dbPath()); +SqliteOpenFactory testFactory( + {String? path, List initStatements = const []}) { + return TestSqliteOpenFactory( + path: path ?? dbPath(), initStatements: initStatements); } Future setupDatabase({String? path}) async {