Skip to content

Added refreshSchema() #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## 0.8.2

- Added `refreshSchema()` and `exclusiveLock()` to `SqliteDatabaseMixin`, allowing queries and watch calls to work against updated schemas.

## 0.8.1

- Added Navigator locks for web `Mutex`s.
- Added Navigator locks for web `Mutex`s.

## 0.8.0

Expand Down
15 changes: 15 additions & 0 deletions packages/sqlite_async/lib/src/common/sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
///
/// Use this to access the database in background isolates.
IsolateConnectionFactory isolateConnectionFactory();

/// TODO Improve on this definition by supporting a writeable context.
Future<void> exclusiveLock<T>(
Future<T> Function(SqliteReadContext ctx) callback,
) {
return writeLock(callback);
}

/// Ensures that all connections are aware of the latest schema changes applied (if any).
/// Queries and watch calls can potentially use outdated schema information after a schema update.
Future<void> refreshSchema() {
return exclusiveLock((ctx) async {
return ctx.get("PRAGMA table_info('sqlite_master')");
});
}
}

/// A SQLite database instance.
Expand Down
61 changes: 61 additions & 0 deletions packages/sqlite_async/lib/src/native/database/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,67 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
_writeConnection?.updates?.forEach(updatesController.add);
}

/// Executes a provided callback function exclusively across all read and
/// write connections in the pool.
///
/// This function first locks all read and write connections, collecting their
/// contexts. It then executes the provided [callback] function on each of these
/// contexts. After the [callback] completes for each context, the locks are released.
///
/// Example usage:
/// ```dart
/// await runExclusive((ctx) async {
/// // Perform some database operation with the ctx
/// await ctx.execute('PRAGMA schema_version');
/// });
/// ```
exclusiveLock<T>(
Future<T> Function(SqliteReadContext tx) callback,
) async {
final List<Completer<SqliteReadContext>> completers = [];
final List<Completer<void>> releasers = [];

for (final read in _allReadConnections) {
final completer = Completer<SqliteReadContext>();

completers.add(completer);
read.readLock((ctx) async {
completer.complete(ctx);

final releaser = Completer();
releasers.add(releaser);

// Keep this active, close the context when finished
await releaser.future;
});
}

final writeCompleter = Completer<SqliteReadContext>();
completers.add(writeCompleter);
_writeConnection?.writeLock((ctx) async {
writeCompleter.complete(ctx);

final releaser = Completer();
releasers.add(releaser);
await releaser.future;
});

// Get all the connection contexts and execute the callback on each of them
final List<SqliteReadContext> contexts = [];
for (final completer in completers) {
contexts.add(await completer.future);
}

for (final c in contexts) {
await callback(c);
}

// Release all the releasers
for (final r in releasers) {
r.complete();
}
}

/// Returns true if the _write_ connection is currently in autocommit mode.
@override
Future<bool> getAutoCommit() async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,10 @@ class SqliteDatabaseImpl
readOnly: false,
openFactory: openFactory);
}

@override
Future<void> exclusiveLock<T>(
Future<T> Function(SqliteReadContext ctx) callback) {
return _pool.exclusiveLock(callback);
}
}
2 changes: 1 addition & 1 deletion packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: sqlite_async
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
version: 0.8.1
version: 0.8.2
repository: https://github.com/powersync-ja/sqlite_async.dart
environment:
sdk: ">=3.4.0 <4.0.0"
Expand Down
103 changes: 103 additions & 0 deletions packages/sqlite_async/test/native/schema_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
@TestOn('!browser')
import 'dart:async';

import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/utils/shared_utils.dart';
import 'package:test/test.dart';

import '../utils/test_utils_impl.dart';

final testUtils = TestUtils();

void main() {
group('Schema Tests', () {
late String path;

setUp(() async {
path = testUtils.dbPath();
await testUtils.cleanDb(path: path);
});

tearDown(() async {
await testUtils.cleanDb(path: path);
});

createTables(SqliteDatabase db) async {
await db.writeTransaction((tx) async {
await tx.execute(
'CREATE TABLE _customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)');
await tx.execute(
'CREATE TABLE _local_customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)');
await tx
.execute('CREATE VIEW customers AS SELECT * FROM _local_customers');
});
}

updateTables(SqliteDatabase db) async {
await db.writeTransaction((tx) async {
await tx.execute('DROP VIEW IF EXISTS customers');
await tx.execute('CREATE VIEW customers AS SELECT * FROM _customers');
});
}

test('should refresh schema views', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);

final customerTables =
await getSourceTables(db, "select * from customers");
expect(customerTables.contains('_local_customers'), true);
await updateTables(db);

// without this, source tables are outdated
await db.refreshSchema();

final updatedCustomerTables =
await getSourceTables(db, "select * from customers");
expect(updatedCustomerTables.contains('_customers'), true);
});

test('should complete refresh schema after transaction', () async {
var completer1 = Completer<void>();
var transactionCompleted = false;

final db = await testUtils.setupDatabase(path: path);
await createTables(db);

// Start a read transaction
db.readTransaction((tx) async {
completer1.complete();
await tx.get('select test_sleep(2000)');

transactionCompleted = true;
});

// Wait for the transaction to start
await completer1.future;

var refreshSchemaFuture = db.refreshSchema();

// Setup check that refreshSchema completes after the transaction has completed
var refreshAfterTransaction = false;
refreshSchemaFuture.then((_) {
if (transactionCompleted) {
refreshAfterTransaction = true;
}
});

await refreshSchemaFuture;

expect(refreshAfterTransaction, isTrue,
reason: 'refreshSchema completed before transaction finished');

// Sanity check
expect(transactionCompleted, isTrue,
reason: 'Transaction did not complete as expected');
});
});
}

// For some reason, future.ignore() doesn't actually ignore errors in these tests.
void ignore(Future future) {
future.then((_) {}, onError: (_) {});
}
Loading