diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c9fa2df..ccebbe8 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -11,13 +11,15 @@ jobs: steps: - uses: actions/checkout@v3 - uses: dart-lang/setup-dart@v1 - - name: Install dependencies run: dart pub get - name: Check formatting run: dart format --output=none --set-exit-if-changed . - name: Lint - run: dart analyze + run: dart analyze lib test example + # This fails when clong into sqlite_async.dart. Disable for now. + # - name: Test Fixes + # run: dart fix --compare-to-golden test_fixes - name: Publish dry-run run: dart pub publish --dry-run - name: Check publish score @@ -32,19 +34,19 @@ jobs: include: - sqlite_version: "3440200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3440200.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3430200" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3430200.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3420000" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3420000.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3410100" sqlite_url: "https://www.sqlite.org/2023/sqlite-autoconf-3410100.tar.gz" - dart_sdk: 3.2.4 + dart_sdk: 3.3.3 - sqlite_version: "3380000" sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" - dart_sdk: 3.2.0 + dart_sdk: 3.3.3 steps: - uses: actions/checkout@v3 - uses: dart-lang/setup-dart@v1 diff --git a/example/basic_example.dart b/example/basic_example.dart index 63fda84..2fee948 100644 --- a/example/basic_example.dart +++ b/example/basic_example.dart @@ -28,7 +28,7 @@ void main() async { // Combine multiple statements into a single write transaction for: // 1. Atomic persistence (all updates are either applied or rolled back). // 2. Improved throughput. - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']); await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']); }); diff --git a/lib/fix_data.yaml b/lib/fix_data.yaml new file mode 100644 index 0000000..b101257 --- /dev/null +++ b/lib/fix_data.yaml @@ -0,0 +1,70 @@ +# This provides automatic fixes of deprecations using `dart fix`. +# See: https://github.com/flutter/flutter/wiki/Data-driven-Fixes +version: 1 +transforms: + - title: 'Rename writeLock to lock' + date: 2024-04-02 + element: + uris: ['src/sqlite_connection.dart', 'sqlite_async.dart'] + method: 'writeLock' + inClass: 'SqliteConnection' + changes: + - kind: 'rename' + newName: 'lock' + - title: 'Rename writeTransaction to transaction' + date: 2024-04-02 + element: + uris: ['src/sqlite_connection.dart', 'sqlite_async.dart'] + method: 'writeTransaction' + inClass: 'SqliteConnection' + changes: + - kind: 'rename' + newName: 'transaction' + - title: 'Rename writeLock to lock' + date: 2024-04-02 + element: + uris: ['src/sqlite_database.dart', 'sqlite_async.dart'] + method: 'writeLock' + inClass: 'SqliteDatabase' + changes: + - kind: 'rename' + newName: 'lock' + - title: 'Rename writeTransaction to transaction' + date: 2024-04-02 + element: + uris: ['src/sqlite_database.dart', 'sqlite_async.dart'] + method: 'writeTransaction' + inClass: 'SqliteDatabase' + changes: + - kind: 'rename' + newName: 'transaction' + - title: 'Rename readTransaction to transaction' + date: 2024-04-02 + element: + uris: ['src/sqlite_database.dart', 'sqlite_async.dart'] + method: 'readTransaction' + inClass: 'SqliteDatabase' + changes: + - kind: 'addParameter' + index: 1 + name: 'readOnly' + style: required_named + argumentValue: + expression: 'true' + - kind: 'rename' + newName: 'transaction' + - title: 'Rename readLock to lock' + date: 2024-04-02 + element: + uris: ['src/sqlite_database.dart', 'sqlite_async.dart'] + method: 'readLock' + inClass: 'SqliteDatabase' + changes: + - kind: 'addParameter' + index: 1 + name: 'readOnly' + style: required_named + argumentValue: + expression: 'true' + - kind: 'rename' + newName: 'lock' diff --git a/lib/src/connection_pool.dart b/lib/src/connection_pool.dart index 23b1e0f..e562f03 100644 --- a/lib/src/connection_pool.dart +++ b/lib/src/connection_pool.dart @@ -10,7 +10,7 @@ import 'update_notification.dart'; /// A connection pool with a single write connection and multiple read connections. class SqliteConnectionPool with SqliteQueries implements SqliteConnection { - SqliteConnection? _writeConnection; + SqliteConnectionImpl? _writeConnection; final List _readConnections = []; @@ -42,7 +42,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { SqliteConnectionPool(this._factory, {this.updates, this.maxReaders = 5, - SqliteConnection? writeConnection, + SqliteConnectionImpl? writeConnection, this.debugName, required this.mutex, required SerializedPortClient upstreamPort}) @@ -72,7 +72,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { _readConnections.remove(connection); } try { - return await connection.readLock((ctx) async { + return await connection.lock((ctx) async { if (haveLock) { // Already have a different lock - release this one. return false; @@ -91,7 +91,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { } return true; - }, lockTimeout: lockTimeout, debugContext: debugContext); + }, + lockTimeout: lockTimeout, + readOnly: true, + debugContext: debugContext); } on TimeoutException { return false; } @@ -117,6 +120,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { + return _writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext, global: true); + } + + Future _writeLock(Future Function(SqliteWriteContext tx) callback, + {Duration? lockTimeout, String? debugContext, required bool global}) { if (closed) { throw AssertionError('Closed'); } @@ -132,11 +141,30 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { readOnly: false, openFactory: _factory); return _runZoned(() { - return _writeConnection!.writeLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + if (global) { + // ignore: deprecated_member_use_from_same_package + return _writeConnection!.writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + } else { + return _writeConnection!.lock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + } }, debugContext: debugContext ?? 'execute()'); } + @override + Future lock(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout, String? debugContext}) { + if (readOnly == true) { + // ignore: deprecated_member_use_from_same_package + return readLock((ctx) => callback(ctx as SqliteWriteContext), + lockTimeout: lockTimeout, debugContext: debugContext); + } else { + return _writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext, global: false); + } + } + /// The [Mutex] on individual connections do already error in recursive locks. /// /// We duplicate the same check here, to: diff --git a/lib/src/sqlite_connection.dart b/lib/src/sqlite_connection.dart index 156f967..660c178 100644 --- a/lib/src/sqlite_connection.dart +++ b/lib/src/sqlite_connection.dart @@ -72,11 +72,28 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// Abstract class representing a connection to the SQLite database. abstract class SqliteConnection extends SqliteWriteContext { + /// Open a transaction. + /// + /// Opens a read-write transaction by default. Set [readOnly] to open a + /// read-only transaction. + /// + /// Only one read-write transaction can execute against the database at a time. + /// + /// Statements within the transaction must be done on the provided + /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] + /// instance will error. + /// + /// [lockTimeout] only controls the timeout for locking the connection. + /// Timeout for database-level locks can be configured on [SqliteOpenOptions]. + Future transaction(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout}); + /// Open a read-only transaction. /// /// Statements within the transaction must be done on the provided /// [SqliteReadContext] - attempting statements on the [SqliteConnection] /// instance will error. + @Deprecated('Use [transaction(callback)] instead.') Future readTransaction( Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout}); @@ -89,6 +106,7 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] /// instance will error. + @Deprecated('Use [transaction(callback)] instead.') Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}); @@ -102,18 +120,31 @@ abstract class SqliteConnection extends SqliteWriteContext { {List parameters = const [], Duration throttle = const Duration(milliseconds: 30)}); - /// Takes a read lock, without starting a transaction. + /// Takes a read lock on this connection, without starting a transaction. /// - /// In most cases, [readTransaction] should be used instead. + /// This is a low-level API. In most cases, [transaction] should be used instead. + @Deprecated('Use [lock] instead.') Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}); - /// Takes a global lock, without starting a transaction. + /// Takes a global write lock, without starting a transaction. /// - /// In most cases, [writeTransaction] should be used instead. + /// This is a low-level API. In most cases, [transaction] should be used instead. + @Deprecated('Use [lock] instead.') Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}); + /// Lock a connection for exclusive usage. + /// + /// When using a connection pool, use [readOnly] to select a read connection. + /// In other contexts, [readOnly] has no effect. + /// + /// This is a low-level API. In most cases, [transaction] should be used instead. + /// + /// Any direct query methods such as [getAll] or [execute] uses [lock] internally. + Future lock(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout, String? debugContext}); + Future close(); /// Returns true if the connection is closed diff --git a/lib/src/sqlite_connection_impl.dart b/lib/src/sqlite_connection_impl.dart index 643152e..ff1ed2a 100644 --- a/lib/src/sqlite_connection_impl.dart +++ b/lib/src/sqlite_connection_impl.dart @@ -106,8 +106,8 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { } @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { + Future lock(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout, String? debugContext}) async { // Private lock to synchronize this with other statements on the same connection, // to ensure that transactions aren't interleaved. return _connectionMutex.lock(() async { @@ -120,6 +120,12 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection { }, timeout: lockTimeout); } + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {Duration? lockTimeout, String? debugContext}) async { + return lock(callback, lockTimeout: lockTimeout, debugContext: debugContext); + } + @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { @@ -271,7 +277,6 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - Timer? updateDebouncer; Set updatedTables = {}; int? txId; Object? txError; @@ -280,25 +285,18 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, if (updatedTables.isNotEmpty) { client.fire(UpdateNotification(updatedTables)); updatedTables.clear(); - updateDebouncer?.cancel(); - updateDebouncer = null; } } db.updates.listen((event) { updatedTables.add(event.tableName); - - // This handles two cases: - // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). - // 2. Long-running _SqliteIsolateClosure that should fire updates while running. - updateDebouncer ??= - Timer(const Duration(milliseconds: 10), maybeFireUpdates); }); server.open((data) async { if (data is _SqliteIsolateClose) { if (txId != null) { if (!db.autocommit) { + updatedTables.clear(); db.execute('ROLLBACK'); } txId = null; diff --git a/lib/src/sqlite_database.dart b/lib/src/sqlite_database.dart index c937743..0dc1095 100644 --- a/lib/src/sqlite_database.dart +++ b/lib/src/sqlite_database.dart @@ -117,26 +117,11 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection { } void _listenForEvents() { - UpdateNotification? updates; - Map subscriptions = {}; _eventsPort = PortServer((message) async { if (message is UpdateNotification) { - if (updates == null) { - updates = message; - // Use the mutex to only send updates after the current transaction. - // Do take care to avoid getting a lock for each individual update - - // that could add massive performance overhead. - mutex.lock(() async { - if (updates != null) { - _updatesController.add(updates!); - updates = null; - } - }); - } else { - updates!.tables.addAll(message.tables); - } + _updatesController.add(message); return null; } else if (message is InitDb) { await _initialized; @@ -199,10 +184,12 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection { /// Changes from any write transaction are not visible to read transactions /// started before it. @override + @Deprecated('Use [transaction] instead.') Future readTransaction( Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout}) { - return _pool.readTransaction(callback, lockTimeout: lockTimeout); + return _pool.transaction(callback, + readOnly: true, lockTimeout: lockTimeout); } /// Open a read-write transaction. @@ -213,20 +200,23 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection { /// The write transaction is automatically committed when the callback finishes, /// or rolled back on any error. @override + @Deprecated('Use [transaction] instead.') Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) { - return _pool.writeTransaction(callback, lockTimeout: lockTimeout); + return _pool.transaction(callback, lockTimeout: lockTimeout); } @override + @Deprecated('Use [lock] instead.') Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) { - return _pool.readLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + return _pool.lock(callback, + readOnly: true, lockTimeout: lockTimeout, debugContext: debugContext); } @override + @Deprecated('Use [lock] instead.') Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext}) { return _pool.writeLock(callback, diff --git a/lib/src/sqlite_migrations.dart b/lib/src/sqlite_migrations.dart index fe8e701..66afdf1 100644 --- a/lib/src/sqlite_migrations.dart +++ b/lib/src/sqlite_migrations.dart @@ -90,7 +90,7 @@ class SqliteMigrations { Future migrate(SqliteConnection db) async { _validateCreateDatabase(); - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute( 'CREATE TABLE IF NOT EXISTS $migrationTable(id INTEGER PRIMARY KEY, down_migrations TEXT)'); diff --git a/lib/src/sqlite_open_factory.dart b/lib/src/sqlite_open_factory.dart index 4f1845a..b4779ef 100644 --- a/lib/src/sqlite_open_factory.dart +++ b/lib/src/sqlite_open_factory.dart @@ -1,6 +1,6 @@ import 'dart:async'; -import 'package:sqlite3/sqlite3.dart' as sqlite; +import 'package:sqlite_async/sqlite3.dart' as sqlite; import 'sqlite_options.dart'; @@ -29,6 +29,12 @@ class DefaultSqliteOpenFactory implements SqliteOpenFactory { List pragmaStatements(SqliteOpenOptions options) { List statements = []; + if (sqliteOptions.lockTimeout != null) { + // May be replaced by a Dart-level retry mechanism in the future + statements.add( + 'PRAGMA busy_timeout = ${sqliteOptions.lockTimeout!.inMilliseconds}'); + } + if (options.primaryConnection && sqliteOptions.journalMode != null) { // Persisted - only needed on the primary connection statements @@ -51,8 +57,21 @@ class DefaultSqliteOpenFactory implements SqliteOpenFactory { final mode = options.openMode; var db = sqlite.sqlite3.open(path, mode: mode, mutex: false); + // Pragma statements don't have the same BUSY_TIMEOUT behavior as normal statements. + // We add a manual retry loop for those. for (var statement in pragmaStatements(options)) { - db.execute(statement); + for (var tries = 0; tries < 30; tries++) { + try { + db.execute(statement); + break; + } on sqlite.SqliteException catch (e) { + if (e.resultCode == sqlite.SqlError.SQLITE_BUSY && tries < 29) { + continue; + } else { + rethrow; + } + } + } } return db; } diff --git a/lib/src/sqlite_options.dart b/lib/src/sqlite_options.dart index 36beb7c..9602fdd 100644 --- a/lib/src/sqlite_options.dart +++ b/lib/src/sqlite_options.dart @@ -11,15 +11,22 @@ class SqliteOptions { /// attempt to truncate the file afterwards. final int? journalSizeLimit; + /// Timeout waiting for locks to be released by other connections. + /// Defaults to 30 seconds. + /// Set to null or [Duration.zero] to fail immediately when the database is locked. + final Duration? lockTimeout; + const SqliteOptions.defaults() : journalMode = SqliteJournalMode.wal, journalSizeLimit = 6 * 1024 * 1024, // 1.5x the default checkpoint size - synchronous = SqliteSynchronous.normal; + synchronous = SqliteSynchronous.normal, + lockTimeout = const Duration(seconds: 30); const SqliteOptions( {this.journalMode = SqliteJournalMode.wal, this.journalSizeLimit = 6 * 1024 * 1024, - this.synchronous = SqliteSynchronous.normal}); + this.synchronous = SqliteSynchronous.normal, + this.lockTimeout = const Duration(seconds: 30)}); } /// SQLite journal mode. Set on the primary connection. diff --git a/lib/src/sqlite_queries.dart b/lib/src/sqlite_queries.dart index 055c11c..3fadd43 100644 --- a/lib/src/sqlite_queries.dart +++ b/lib/src/sqlite_queries.dart @@ -6,8 +6,7 @@ import 'update_notification.dart'; /// Mixin to provide default query functionality. /// -/// Classes using this need to implement [SqliteConnection.readLock] -/// and [SqliteConnection.writeLock]. +/// Classes using this need to implement [SqliteConnection.lock]. mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { /// Broadcast stream that is notified of any table updates Stream? get updates; @@ -15,7 +14,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { @override Future execute(String sql, [List parameters = const []]) async { - return writeLock((ctx) async { + return lock((ctx) async { return ctx.execute(sql, parameters); }, debugContext: 'execute()'); } @@ -23,24 +22,24 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { @override Future getAll(String sql, [List parameters = const []]) { - return readLock((ctx) async { + return lock((ctx) async { return ctx.getAll(sql, parameters); - }, debugContext: 'getAll()'); + }, readOnly: true, debugContext: 'getAll()'); } @override Future get(String sql, [List parameters = const []]) { - return readLock((ctx) async { + return lock((ctx) async { return ctx.get(sql, parameters); - }, debugContext: 'get()'); + }, readOnly: true, debugContext: 'get()'); } @override Future getOptional(String sql, [List parameters = const []]) { - return readLock((ctx) async { + return lock((ctx) async { return ctx.getOptional(sql, parameters); - }, debugContext: 'getOptional()'); + }, readOnly: true, debugContext: 'getOptional()'); } @override @@ -102,6 +101,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future readTransaction( Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout}) async { + // ignore: deprecated_member_use_from_same_package return readLock((ctx) async { return await internalReadTransaction(ctx, callback); }, lockTimeout: lockTimeout, debugContext: 'readTransaction()'); @@ -111,6 +111,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) async { + // ignore: deprecated_member_use_from_same_package return writeLock((ctx) async { return await internalWriteTransaction(ctx, callback); }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); @@ -140,4 +141,34 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { return tx.executeBatch(sql, parameterSets); }); } + + @override + Future lock(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout, String? debugContext}) { + if (readOnly == true) { + // ignore: deprecated_member_use_from_same_package + return readLock((ctx) => callback(ctx as SqliteWriteContext), + lockTimeout: lockTimeout, debugContext: debugContext); + } else { + // ignore: deprecated_member_use_from_same_package + return writeLock(callback, + lockTimeout: lockTimeout, debugContext: debugContext); + } + } + + @override + Future transaction(Future Function(SqliteWriteContext tx) callback, + {bool? readOnly, Duration? lockTimeout}) { + if (readOnly == true) { + // ignore: deprecated_member_use_from_same_package + return readTransaction((ctx) => callback(ctx as SqliteWriteContext), + lockTimeout: lockTimeout); + } else { + // Uses connection-level lock, unlike writeTransaction which uses a + // database-level lock. + return lock((ctx) async { + return await internalWriteTransaction(ctx, callback); + }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); + } + } } diff --git a/scripts/benchmark.dart b/scripts/benchmark.dart index 10a133e..6549843 100644 --- a/scripts/benchmark.dart +++ b/scripts/benchmark.dart @@ -25,7 +25,7 @@ class SqliteBenchmark { List benchmarks = [ SqliteBenchmark('Insert: JSON1', (SqliteDatabase db, List> parameters) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { for (var i = 0; i < parameters.length; i += 5000) { var sublist = parameters.sublist(i, min(parameters.length, i + 5000)); await tx.execute( @@ -37,23 +37,23 @@ List benchmarks = [ }, maxBatchSize: 20000), SqliteBenchmark('Read: JSON1', (SqliteDatabase db, List> parameters) async { - await db.readTransaction((tx) async { + await db.transaction((tx) async { for (var i = 0; i < parameters.length; i += 10000) { var sublist = List.generate(10000, (index) => index); await tx.getAll( 'SELECT name, email FROM customers WHERE id IN (SELECT e.value FROM json_each(?) e)', [jsonEncode(sublist)]); } - }); + }, readOnly: true); }, maxBatchSize: 200000, enabled: false), - SqliteBenchmark('writeLock in isolate', + SqliteBenchmark('lock in isolate', (SqliteDatabase db, List> parameters) async { var factory = db.isolateConnectionFactory(); var len = parameters.length; await Isolate.run(() async { final db = factory.open(); for (var i = 0; i < len; i++) { - await db.writeLock((tx) async {}); + await db.lock((tx) async {}); } await db.close(); }); @@ -61,13 +61,14 @@ List benchmarks = [ SqliteBenchmark('Write lock', (SqliteDatabase db, List> parameters) async { for (var _ in parameters) { + // ignore: deprecated_member_use_from_same_package await db.writeLock((tx) async {}); } }, maxBatchSize: 5000, enabled: false), SqliteBenchmark('Read lock', (SqliteDatabase db, List> parameters) async { for (var _ in parameters) { - await db.readLock((tx) async {}); + await db.lock((tx) async {}, readOnly: true); } }, maxBatchSize: 5000, enabled: false), SqliteBenchmark('Insert: Direct', @@ -79,7 +80,7 @@ List benchmarks = [ }, maxBatchSize: 500), SqliteBenchmark('Insert: writeTransaction', (SqliteDatabase db, List> parameters) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { for (var params in parameters) { await tx.execute( 'INSERT INTO customers(name, email) VALUES(?, ?)', params); @@ -110,7 +111,7 @@ List benchmarks = [ }, maxBatchSize: 2000), SqliteBenchmark('Insert: writeTransaction no await', (SqliteDatabase db, List> parameters) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { for (var params in parameters) { tx.execute('INSERT INTO customers(name, email) VALUES(?, ?)', params); } @@ -139,7 +140,7 @@ List benchmarks = [ }), SqliteBenchmark('Insert: executeBatch', (SqliteDatabase db, List> parameters) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.executeBatch( 'INSERT INTO customers(name, email) VALUES(?, ?)', parameters); }); @@ -167,7 +168,7 @@ void main() async { 20000, (index) => ['Test user $index', 'user$index@example.org']); createTables(SqliteDatabase db) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute('DROP TABLE IF EXISTS customers'); await tx.execute( 'CREATE TABLE customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT)'); diff --git a/test/basic_test.dart b/test/basic_test.dart index 9b563c8..2e30931 100644 --- a/test/basic_test.dart +++ b/test/basic_test.dart @@ -22,7 +22,7 @@ void main() { }); createTables(SqliteDatabase db) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute( 'CREATE TABLE test_data(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)'); }); @@ -64,6 +64,39 @@ void main() { } }); + test('Concurrency 2', () async { + final db1 = + SqliteDatabase.withFactory(testFactory(path: path), maxReaders: 3); + + final db2 = + SqliteDatabase.withFactory(testFactory(path: path), maxReaders: 3); + await db1.initialize(); + await createTables(db1); + await db2.initialize(); + print("${DateTime.now()} start"); + + var futures1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11].map((i) { + return db1.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + i, + 5 + Random().nextInt(20) + ]).then((value) => print("${DateTime.now()} $value")); + }).toList(); + + var futures2 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11].map((i) { + return db2.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 2 ' || datetime() as connection RETURNING *", + [ + i, + 5 + Random().nextInt(20) + ]).then((value) => print("${DateTime.now()} $value")); + }).toList(); + await Future.wait(futures1); + await Future.wait(futures2); + print("${DateTime.now()} done"); + }); + test('read-only transactions', () async { final db = await setupDatabase(path: path); await createTables(db); @@ -94,7 +127,7 @@ void main() { e.message .contains('attempt to write in a read-only transaction'))); - await db.writeTransaction((tx) async { + await db.transaction((tx) async { // Within a write transaction, this is fine await tx.getAll( 'INSERT INTO test_data(description) VALUES(?) RETURNING *', @@ -107,7 +140,7 @@ void main() { final db = await setupDatabase(path: path); await createTables(db); - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await expectLater(() async { await db.execute( 'INSERT INTO test_data(description) VALUES(?)', ['test']); @@ -120,7 +153,7 @@ void main() { final db = await setupDatabase(path: path); await createTables(db); - await db.writeTransaction((tx) async { + await db.transaction((tx) async { // This uses a different connection, so it _could_ work. // But it's likely unintentional and could cause weird bugs, so we don't // allow it by default. @@ -129,7 +162,7 @@ void main() { }, throwsA((e) => e is LockError && e.message.contains('tx.getAll'))); }); - await db.readTransaction((tx) async { + await db.transaction((tx) async { // This does actually attempt a lock on the same connection, so it // errors. // This also exposes an interesting test case where the read transaction @@ -137,7 +170,7 @@ void main() { await expectLater(() async { await db.getAll('SELECT * FROM test_data'); }, throwsA((e) => e is LockError && e.message.contains('tx.getAll'))); - }); + }, readOnly: true); }); test('should not allow read-only db calls within lock callback', () async { @@ -145,7 +178,7 @@ void main() { await createTables(db); // Locks - should behave the same as transactions above - await db.writeLock((tx) async { + await db.lock((tx) async { await expectLater(() async { await db.getOptional('SELECT * FROM test_data'); }, @@ -153,13 +186,13 @@ void main() { (e) => e is LockError && e.message.contains('tx.getOptional'))); }); - await db.readLock((tx) async { + await db.lock((tx) async { await expectLater(() async { await db.getOptional('SELECT * FROM test_data'); }, throwsA( (e) => e is LockError && e.message.contains('tx.getOptional'))); - }); + }, readOnly: true); }); test( @@ -174,27 +207,27 @@ void main() { // Each of these are fine, since it could use a separate connection. // Note: In highly concurrent cases, it could exhaust the connection pool and cause a deadlock. - await db.writeTransaction((tx) async { + await db.transaction((tx) async { // Use the parent zone to avoid the "recursive lock" error. await zone.fork().run(() async { await db.getAll('SELECT * FROM test_data'); }); }); - await db.readTransaction((tx) async { + await db.transaction((tx) async { await zone.fork().run(() async { await db.getAll('SELECT * FROM test_data'); }); - }); + }, readOnly: true); - await db.readTransaction((tx) async { + await db.transaction((tx) async { await zone.fork().run(() async { await db.execute('SELECT * FROM test_data'); }); - }); + }, readOnly: true); // Note: This would deadlock, since it shares a global write lock. - // await db.writeTransaction((tx) async { + // await db.transaction((tx) async { // await zone.fork().run(() async { // await db.execute('SELECT * FROM test_data'); // }); @@ -221,7 +254,7 @@ void main() { final db = await setupDatabase(path: path); await createTables(db); - var tp = db.writeTransaction((tx) async { + var tp = db.transaction((tx) async { await tx.execute( 'INSERT OR ROLLBACK INTO test_data(id, description) VALUES(?, ?)', [1, 'test1']); @@ -258,7 +291,7 @@ void main() { equals({'count': 0})); // Check that we can open another transaction afterwards - await db.writeTransaction((tx) async {}); + await db.transaction((tx) async {}); }); test('should error on dangling transactions', () async { @@ -317,7 +350,7 @@ void main() { for (var i = 0; i < 10; i++) { Object? caughtError; - await db.readTransaction((ctx) async { + await db.transaction((ctx) async { await ctx.computeWithDatabase((db) async { Future asyncCompute() async { throw ArgumentError('uncaught async error'); @@ -325,7 +358,7 @@ void main() { asyncCompute(); }); - }).catchError((error) { + }, readOnly: true).catchError((error) { caughtError = error; }); // This may change into a better error in the future @@ -333,11 +366,11 @@ void main() { } // Check that we can still continue afterwards - final computed = await db.readTransaction((ctx) async { + final computed = await db.transaction((ctx) async { return await ctx.computeWithDatabase((db) async { return 5; }); - }); + }, readOnly: true); expect(computed, equals(5)); }); @@ -345,7 +378,7 @@ void main() { final db = await setupDatabase(path: path); await createTables(db); SqliteWriteContext? savedTx; - await db.writeTransaction((tx) async { + await db.transaction((tx) async { savedTx = tx; var caught = false; try { diff --git a/test/close_test.dart b/test/close_test.dart index be3e933..d269489 100644 --- a/test/close_test.dart +++ b/test/close_test.dart @@ -19,7 +19,7 @@ void main() { }); createTables(SqliteDatabase db) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute( 'CREATE TABLE test_data(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)'); }); diff --git a/test/json1_test.dart b/test/json1_test.dart index 62bd6c3..04ef509 100644 --- a/test/json1_test.dart +++ b/test/json1_test.dart @@ -33,7 +33,7 @@ void main() { }); createTables(SqliteDatabase db) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute( 'CREATE TABLE users(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT)'); }); diff --git a/test/watch_test.dart b/test/watch_test.dart index f102001..2e467c7 100644 --- a/test/watch_test.dart +++ b/test/watch_test.dart @@ -11,7 +11,7 @@ import 'util.dart'; void main() { createTables(SqliteDatabase db) async { - await db.writeTransaction((tx) async { + await db.transaction((tx) async { await tx.execute( 'CREATE TABLE assets(id INTEGER PRIMARY KEY AUTOINCREMENT, make TEXT, customer_id INTEGER)'); await tx.execute('CREATE INDEX assets_customer ON assets(customer_id)'); diff --git a/test_fixes/sqlite_database.dart b/test_fixes/sqlite_database.dart new file mode 100644 index 0000000..0a0631d --- /dev/null +++ b/test_fixes/sqlite_database.dart @@ -0,0 +1,13 @@ +import 'package:sqlite_async/src/sqlite_database.dart'; + +var db = SqliteDatabase(path: 'test.db'); + +void main() async { + db.writeTransaction((tx) async { + await tx.execute('select 1'); + }); + + db.readTransaction((tx) async { + await tx.getAll('select 1'); + }); +} diff --git a/test_fixes/sqlite_database.dart.expect b/test_fixes/sqlite_database.dart.expect new file mode 100644 index 0000000..dc6b8ee --- /dev/null +++ b/test_fixes/sqlite_database.dart.expect @@ -0,0 +1,13 @@ +import 'package:sqlite_async/src/sqlite_database.dart'; + +var db = SqliteDatabase(path: 'test.db'); + +void main() async { + db.transaction((tx) async { + await tx.execute('select 1'); + }); + + db.transaction((tx) async { + await tx.getAll('select 1'); + }, readOnly: true); +}