Skip to content

Commit e197601

Browse files
authored
Merge pull request #68 from powersync-ja/fix-watch-transactions
Debounce update notifications
2 parents 9c23ce6 + deaaba5 commit e197601

File tree

4 files changed

+242
-5
lines changed

4 files changed

+242
-5
lines changed

Diff for: packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart

+15-4
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,22 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
286286
Object? txError;
287287

288288
void maybeFireUpdates() {
289-
if (updatedTables.isNotEmpty) {
289+
// We keep buffering the set of updated tables until we are not
290+
// in a transaction. Firing transactions inside a transaction
291+
// has multiple issues:
292+
// 1. Watched queries would detect changes to the underlying tables,
293+
// but the data would not be visible to queries yet.
294+
// 2. It would trigger many more notifications than required.
295+
//
296+
// This still includes updates for transactions that are rolled back.
297+
// We could handle those better at a later stage.
298+
299+
if (updatedTables.isNotEmpty && db.autocommit) {
290300
client.fire(UpdateNotification(updatedTables));
291301
updatedTables.clear();
292-
updateDebouncer?.cancel();
293-
updateDebouncer = null;
294302
}
303+
updateDebouncer?.cancel();
304+
updateDebouncer = null;
295305
}
296306

297307
db.updates.listen((event) {
@@ -301,11 +311,12 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
301311
// 1. Update arrived after _SqliteIsolateClose (not sure if this could happen).
302312
// 2. Long-running _SqliteIsolateClosure that should fire updates while running.
303313
updateDebouncer ??=
304-
Timer(const Duration(milliseconds: 10), maybeFireUpdates);
314+
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
305315
});
306316

307317
server.open((data) async {
308318
if (data is _SqliteIsolateClose) {
319+
// This is a transaction close message
309320
if (txId != null) {
310321
if (!db.autocommit) {
311322
db.execute('ROLLBACK');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import 'dart:async';
2+
3+
import 'package:sqlite_async/sqlite3_common.dart';
4+
5+
/// Wrap a CommonDatabase to throttle its updates stream.
6+
/// This is so that we can throttle the updates _within_
7+
/// the worker process, avoiding mass notifications over
8+
/// the MessagePort.
9+
class ThrottledCommonDatabase extends CommonDatabase {
10+
final CommonDatabase _db;
11+
final StreamController<bool> _transactionController =
12+
StreamController.broadcast();
13+
14+
ThrottledCommonDatabase(this._db);
15+
16+
@override
17+
int get userVersion => _db.userVersion;
18+
19+
@override
20+
set userVersion(int userVersion) {
21+
_db.userVersion = userVersion;
22+
}
23+
24+
@override
25+
bool get autocommit => _db.autocommit;
26+
27+
@override
28+
DatabaseConfig get config => _db.config;
29+
30+
@override
31+
void createAggregateFunction<V>(
32+
{required String functionName,
33+
required AggregateFunction<V> function,
34+
AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(),
35+
bool deterministic = false,
36+
bool directOnly = true}) {
37+
_db.createAggregateFunction(functionName: functionName, function: function);
38+
}
39+
40+
@override
41+
void createCollation(
42+
{required String name, required CollatingFunction function}) {
43+
_db.createCollation(name: name, function: function);
44+
}
45+
46+
@override
47+
void createFunction(
48+
{required String functionName,
49+
required ScalarFunction function,
50+
AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(),
51+
bool deterministic = false,
52+
bool directOnly = true}) {
53+
_db.createFunction(functionName: functionName, function: function);
54+
}
55+
56+
@override
57+
void dispose() {
58+
_db.dispose();
59+
}
60+
61+
@override
62+
void execute(String sql, [List<Object?> parameters = const []]) {
63+
_db.execute(sql, parameters);
64+
}
65+
66+
@override
67+
int getUpdatedRows() {
68+
// ignore: deprecated_member_use
69+
return _db.getUpdatedRows();
70+
}
71+
72+
@override
73+
int get lastInsertRowId => _db.lastInsertRowId;
74+
75+
@override
76+
CommonPreparedStatement prepare(String sql,
77+
{bool persistent = false, bool vtab = true, bool checkNoTail = false}) {
78+
return _db.prepare(sql,
79+
persistent: persistent, vtab: vtab, checkNoTail: checkNoTail);
80+
}
81+
82+
@override
83+
List<CommonPreparedStatement> prepareMultiple(String sql,
84+
{bool persistent = false, bool vtab = true}) {
85+
return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab);
86+
}
87+
88+
@override
89+
ResultSet select(String sql, [List<Object?> parameters = const []]) {
90+
bool preAutocommit = _db.autocommit;
91+
final result = _db.select(sql, parameters);
92+
bool postAutocommit = _db.autocommit;
93+
if (!preAutocommit && postAutocommit) {
94+
_transactionController.add(true);
95+
}
96+
return result;
97+
}
98+
99+
@override
100+
int get updatedRows => _db.updatedRows;
101+
102+
@override
103+
Stream<SqliteUpdate> get updates {
104+
return throttledUpdates(_db, _transactionController.stream);
105+
}
106+
}
107+
108+
/// This throttles the database update stream to:
109+
/// 1. Trigger max once every 1ms.
110+
/// 2. Only trigger _after_ transactions.
111+
Stream<SqliteUpdate> throttledUpdates(
112+
CommonDatabase source, Stream transactionStream) {
113+
StreamController<SqliteUpdate>? controller;
114+
Set<SqliteUpdate> pendingUpdates = {};
115+
var paused = false;
116+
117+
Timer? updateDebouncer;
118+
119+
void maybeFireUpdates() {
120+
updateDebouncer?.cancel();
121+
updateDebouncer = null;
122+
123+
if (paused) {
124+
// Continue collecting updates, but don't fire any
125+
return;
126+
}
127+
128+
if (!source.autocommit) {
129+
// Inside a transaction - do not fire updates
130+
return;
131+
}
132+
133+
if (pendingUpdates.isNotEmpty) {
134+
for (var update in pendingUpdates) {
135+
controller!.add(update);
136+
}
137+
138+
pendingUpdates.clear();
139+
}
140+
}
141+
142+
void collectUpdate(SqliteUpdate event) {
143+
// We merge updates with the same kind and tableName.
144+
// rowId is never used in sqlite_async.
145+
pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0));
146+
147+
updateDebouncer ??=
148+
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
149+
}
150+
151+
StreamSubscription? txSubscription;
152+
StreamSubscription? sourceSubscription;
153+
154+
controller = StreamController(onListen: () {
155+
txSubscription = transactionStream.listen((event) {
156+
maybeFireUpdates();
157+
}, onError: (error) {
158+
controller?.addError(error);
159+
});
160+
161+
sourceSubscription = source.updates.listen(collectUpdate, onError: (error) {
162+
controller?.addError(error);
163+
});
164+
}, onPause: () {
165+
paused = true;
166+
}, onResume: () {
167+
paused = false;
168+
maybeFireUpdates();
169+
}, onCancel: () {
170+
txSubscription?.cancel();
171+
sourceSubscription?.cancel();
172+
});
173+
174+
return controller.stream;
175+
}

Diff for: packages/sqlite_async/lib/src/web/worker/worker_utils.dart

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'dart:js_util' as js_util;
44
import 'package:mutex/mutex.dart';
55
import 'package:sqlite3/wasm.dart';
66
import 'package:sqlite3_web/sqlite3_web.dart';
7+
import 'throttled_common_database.dart';
78

89
import '../protocol.dart';
910

@@ -18,7 +19,9 @@ base class AsyncSqliteController extends DatabaseController {
1819

1920
// Register any custom functions here if needed
2021

21-
return AsyncSqliteDatabase(database: db);
22+
final throttled = ThrottledCommonDatabase(db);
23+
24+
return AsyncSqliteDatabase(database: throttled);
2225
}
2326

2427
@override

Diff for: packages/sqlite_async/test/watch_test.dart

+48
Original file line numberDiff line numberDiff line change
@@ -253,5 +253,53 @@ void main() {
253253
done = true;
254254
}
255255
});
256+
257+
test('watch with transaction', () async {
258+
final db = await testUtils.setupDatabase(path: path);
259+
await createTables(db);
260+
261+
const baseTime = 10;
262+
263+
const throttleDuration = Duration(milliseconds: baseTime);
264+
// delay must be bigger than throttleDuration, and bigger
265+
// than any internal throttles.
266+
const delay = Duration(milliseconds: baseTime * 3);
267+
268+
final stream = db.watch('SELECT count() AS count FROM assets',
269+
throttle: throttleDuration);
270+
271+
List<int> counts = [];
272+
273+
final subscription = stream.listen((e) {
274+
counts.add(e.first['count']);
275+
});
276+
await Future.delayed(delay);
277+
278+
await db.writeTransaction((tx) async {
279+
await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test1']);
280+
await Future.delayed(delay);
281+
await tx.execute('INSERT INTO assets(make) VALUES (?)', ['test2']);
282+
await Future.delayed(delay);
283+
});
284+
await Future.delayed(delay);
285+
286+
subscription.cancel();
287+
288+
expect(
289+
counts,
290+
equals([
291+
// one event when starting the subscription
292+
0,
293+
// one event after the transaction
294+
2
295+
]));
296+
297+
// Other observed results (failure scenarios):
298+
// [0, 0, 0]: The watch is triggered during the transaction
299+
// and executes concurrently with the transaction.
300+
// [0, 2, 2]: The watch is triggered during the transaction,
301+
// but executes after the transaction (single connection).
302+
// [0]: No updates triggered.
303+
});
256304
});
257305
}

0 commit comments

Comments
 (0)