Skip to content

Commit d278237

Browse files
committed
Simplifying refreshSchema implementation by introducing exlusiveLock and refreshSchema in sqlite_database.
1 parent 3d34040 commit d278237

12 files changed

+186
-42
lines changed

packages/drift_sqlite_async/pubspec.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ environment:
1515
sdk: ">=3.0.0 <4.0.0"
1616
dependencies:
1717
drift: ">=2.15.0 <2.19.0"
18-
sqlite_async: ^0.8.1
18+
sqlite_async: ^0.8.2
1919
dev_dependencies:
2020
build_runner: ^2.4.8
2121
drift_dev: ">=2.15.0 <2.19.0"

packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart

-5
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,6 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries {
5050
Future<bool> getAutoCommit() async {
5151
return db.autocommit;
5252
}
53-
54-
@override
55-
Future<void> refreshSchema() async {
56-
db.execute("PRAGMA table_info('sqlite_master')");
57-
}
5853
}
5954

6055
class SyncReadContext implements SqliteReadContext {

packages/sqlite_async/lib/src/common/sqlite_database.dart

+15
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
3838
///
3939
/// Use this to access the database in background isolates.
4040
IsolateConnectionFactory isolateConnectionFactory();
41+
42+
/// TODO Improve on this definition by supporting a writeable context.
43+
Future<void> exclusiveLock<T>(
44+
Future<T> Function(SqliteReadContext ctx) callback,
45+
) {
46+
return writeLock(callback);
47+
}
48+
49+
/// Ensures that all connections are aware of the latest schema changes applied (if any).
50+
/// Queries and watch calls can potentially use outdated schema information after a schema update.
51+
Future<void> refreshSchema() {
52+
return exclusiveLock((ctx) async {
53+
return ctx.get("PRAGMA table_info('sqlite_master')");
54+
});
55+
}
4156
}
4257

4358
/// A SQLite database instance.

packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart

-5
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,4 @@ class SqliteDatabaseImpl
6464
Future<bool> getAutoCommit() {
6565
throw UnimplementedError();
6666
}
67-
68-
@override
69-
Future<void> refreshSchema() {
70-
throw UnimplementedError();
71-
}
7267
}

packages/sqlite_async/lib/src/native/database/connection_pool.dart

+63-9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,69 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5454
_writeConnection?.updates?.forEach(updatesController.add);
5555
}
5656

57+
/// Executes a provided callback function exclusively across all read and
58+
/// write connections in the pool.
59+
///
60+
/// This function first locks all read and write connections, collecting their
61+
/// contexts. It then executes the provided [callback] function on each of these
62+
/// contexts. After the [callback] completex for each context, the locks are released
63+
///
64+
/// Example usage:
65+
/// ```dart
66+
/// await runExclusive((ctx) async {
67+
/// // Perform some database operation with the ctx
68+
/// await ctx.execute('PRAGMA schema_version');
69+
/// });
70+
/// ```
71+
///
72+
///
73+
exclusiveLock<T>(
74+
Future<T> Function(SqliteReadContext tx) callback,
75+
) async {
76+
final List<Completer<SqliteReadContext>> completers = [];
77+
final List<Completer<void>> releasers = [];
78+
79+
for (final read in _allReadConnections) {
80+
final completer = Completer<SqliteReadContext>();
81+
82+
completers.add(completer);
83+
read.readLock((ctx) async {
84+
completer.complete(ctx);
85+
86+
final releaser = Completer();
87+
releasers.add(releaser);
88+
89+
// Keep this active, close the context when finished
90+
await releaser.future;
91+
});
92+
}
93+
94+
final writeCompleter = Completer<SqliteReadContext>();
95+
completers.add(writeCompleter);
96+
_writeConnection?.writeLock((ctx) async {
97+
writeCompleter.complete(ctx);
98+
99+
final releaser = Completer();
100+
releasers.add(releaser);
101+
await releaser.future;
102+
});
103+
104+
// Get all the connection contexts and execute the callback on each of them
105+
final List<SqliteReadContext> contexts = [];
106+
for (final completer in completers) {
107+
contexts.add(await completer.future);
108+
}
109+
110+
for (final c in contexts) {
111+
await callback(c);
112+
}
113+
114+
// Release all the releasers
115+
for (final r in releasers) {
116+
r.complete();
117+
}
118+
}
119+
57120
/// Returns true if the _write_ connection is currently in autocommit mode.
58121
@override
59122
Future<bool> getAutoCommit() async {
@@ -221,15 +284,6 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
221284
// read-only connections first.
222285
await _writeConnection?.close();
223286
}
224-
225-
@override
226-
Future<void> refreshSchema() async {
227-
final toRefresh = _allReadConnections.toList();
228-
229-
for (var connection in toRefresh) {
230-
await connection.refreshSchema();
231-
}
232-
}
233287
}
234288

235289
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);

packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart

-5
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,6 @@ class SqliteConnectionImpl
158158
});
159159
}, timeout: lockTimeout);
160160
}
161-
162-
@override
163-
Future<void> refreshSchema() async {
164-
await get("PRAGMA table_info('sqlite_master')");
165-
}
166161
}
167162

168163
int _nextCtxId = 1;

packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart

+3-2
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ class SqliteDatabaseImpl
168168
}
169169

170170
@override
171-
Future<void> refreshSchema() async {
172-
await _pool.refreshSchema();
171+
Future<void> exclusiveLock<T>(
172+
Future<T> Function(SqliteReadContext ctx) callback) {
173+
return _pool.exclusiveLock(callback);
173174
}
174175
}

packages/sqlite_async/lib/src/sqlite_connection.dart

-4
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,6 @@ abstract class SqliteConnection extends SqliteWriteContext {
128128
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
129129
{Duration? lockTimeout, String? debugContext});
130130

131-
/// Ensures that all connections are aware of the latest schema changes applied (if any).
132-
/// Queries and watch calls can potentially use outdated schema information after a schema update.
133-
Future<void> refreshSchema();
134-
135131
Future<void> close();
136132

137133
/// Returns true if the connection is closed

packages/sqlite_async/lib/src/web/database.dart

-5
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,6 @@ class WebDatabase
129129
}
130130
}
131131
}
132-
133-
@override
134-
Future<void> refreshSchema() async {
135-
_database.execute("PRAGMA table_info('sqlite_master')");
136-
}
137132
}
138133

139134
class _SharedContext implements SqliteReadContext {

packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart

-5
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,4 @@ class SqliteDatabaseImpl
139139
await isInitialized;
140140
return _connection.getAutoCommit();
141141
}
142-
143-
@override
144-
Future<void> refreshSchema() async {
145-
await _connection.refreshSchema();
146-
}
147142
}

packages/sqlite_async/pubspec.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: sqlite_async
22
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
3-
version: 0.8.1
3+
version: 0.8.2
44
repository: https://github.com/powersync-ja/sqlite_async.dart
55
environment:
66
sdk: ">=3.4.0 <4.0.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
@TestOn('!browser')
2+
import 'dart:async';
3+
4+
import 'package:sqlite_async/sqlite_async.dart';
5+
import 'package:sqlite_async/src/utils/shared_utils.dart';
6+
import 'package:test/test.dart';
7+
8+
import '../utils/test_utils_impl.dart';
9+
10+
final testUtils = TestUtils();
11+
12+
void main() {
13+
group('Schema Tests', () {
14+
late String path;
15+
16+
setUp(() async {
17+
path = testUtils.dbPath();
18+
await testUtils.cleanDb(path: path);
19+
});
20+
21+
tearDown(() async {
22+
await testUtils.cleanDb(path: path);
23+
});
24+
25+
createTables(SqliteDatabase db) async {
26+
await db.writeTransaction((tx) async {
27+
await tx.execute(
28+
'CREATE TABLE _customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)');
29+
await tx.execute(
30+
'CREATE TABLE _local_customers(id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)');
31+
await tx
32+
.execute('CREATE VIEW customers AS SELECT * FROM _local_customers');
33+
});
34+
}
35+
36+
updateTables(SqliteDatabase db) async {
37+
await db.writeTransaction((tx) async {
38+
await tx.execute('DROP VIEW IF EXISTS customers');
39+
await tx.execute('CREATE VIEW customers AS SELECT * FROM _customers');
40+
});
41+
}
42+
43+
test('should refresh schema views', () async {
44+
final db = await testUtils.setupDatabase(path: path);
45+
await createTables(db);
46+
47+
final customerTables =
48+
await getSourceTables(db, "select * from customers");
49+
expect(customerTables.contains('_local_customers'), true);
50+
await updateTables(db);
51+
52+
// without this, source tables are outdated
53+
await db.refreshSchema();
54+
55+
final updatedCustomerTables =
56+
await getSourceTables(db, "select * from customers");
57+
expect(updatedCustomerTables.contains('_customers'), true);
58+
});
59+
60+
test('should complete refresh schema after transaction', () async {
61+
var completer1 = Completer<void>();
62+
var transactionCompleted = false;
63+
64+
final db = await testUtils.setupDatabase(path: path);
65+
await createTables(db);
66+
67+
// Start a read transaction
68+
db.readTransaction((tx) async {
69+
completer1.complete();
70+
await tx.get('select test_sleep(2000)');
71+
72+
transactionCompleted = true;
73+
});
74+
75+
// Wait for the transaction to start
76+
await completer1.future;
77+
78+
var refreshSchemaFuture = db.refreshSchema();
79+
80+
// Setup check that refreshSchema completes after the transaction has completed
81+
var refreshAfterTransaction = false;
82+
refreshSchemaFuture.then((_) {
83+
if (transactionCompleted) {
84+
refreshAfterTransaction = true;
85+
}
86+
});
87+
88+
await refreshSchemaFuture;
89+
90+
expect(refreshAfterTransaction, isTrue,
91+
reason: 'refreshSchema completed before transaction finished');
92+
93+
// Sanity check
94+
expect(transactionCompleted, isTrue,
95+
reason: 'Transaction did not complete as expected');
96+
});
97+
});
98+
}
99+
100+
// For some reason, future.ignore() doesn't actually ignore errors in these tests.
101+
void ignore(Future future) {
102+
future.then((_) {}, onError: (_) {});
103+
}

0 commit comments

Comments
 (0)