Skip to content

Commit 96ea53b

Browse files
committed
Refactor watch queries to allow cancelling them
1 parent bb65cfa commit 96ea53b

File tree

3 files changed

+124
-71
lines changed

3 files changed

+124
-71
lines changed

packages/sqlite_async/lib/src/sqlite_queries.dart

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,25 +44,23 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
4444
Stream<sqlite.ResultSet> watch(String sql,
4545
{List<Object?> parameters = const [],
4646
Duration throttle = const Duration(milliseconds: 30),
47-
Iterable<String>? triggerOnTables}) async* {
47+
Iterable<String>? triggerOnTables}) {
4848
assert(updates != null,
4949
'updates stream must be provided to allow query watching');
50-
final tables =
51-
triggerOnTables ?? await getSourceTables(this, sql, parameters);
52-
final filteredStream =
53-
updates!.transform(UpdateNotification.filterTablesTransformer(tables));
54-
final throttledStream = UpdateNotification.throttleStream(
55-
filteredStream, throttle,
56-
addOne: UpdateNotification.empty());
5750

58-
// FIXME:
59-
// When the subscription is cancelled, this performs a final query on the next
60-
// update.
61-
// The loop only stops once the "yield" is reached.
62-
// Using asyncMap instead of a generator would solve it, but then the body
63-
// here can't be async for getSourceTables().
64-
await for (var _ in throttledStream) {
65-
yield await getAll(sql, parameters);
51+
Stream<sqlite.ResultSet> watchInner(Iterable<String> trigger) {
52+
return onChange(
53+
trigger,
54+
throttle: throttle,
55+
triggerImmediately: true,
56+
).asyncMap((_) => getAll(sql, parameters));
57+
}
58+
59+
if (triggerOnTables case final knownTrigger?) {
60+
return watchInner(knownTrigger);
61+
} else {
62+
return Stream.fromFuture(getSourceTables(this, sql, parameters))
63+
.asyncExpand(watchInner);
6664
}
6765
}
6866

packages/sqlite_async/lib/src/update_notification.dart

Lines changed: 105 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,13 @@ class UpdateNotification {
5252
static Stream<UpdateNotification> throttleStream(
5353
Stream<UpdateNotification> input, Duration timeout,
5454
{UpdateNotification? addOne}) {
55-
return _throttleStream(input, timeout, addOne: addOne, throttleFirst: true,
56-
add: (a, b) {
57-
return a.union(b);
58-
});
55+
return _throttleStream(
56+
input: input,
57+
timeout: timeout,
58+
throttleFirst: true,
59+
add: (a, b) => a.union(b),
60+
addOne: addOne,
61+
);
5962
}
6063

6164
/// Filter an update stream by specific tables.
@@ -67,62 +70,112 @@ class UpdateNotification {
6770
}
6871
}
6972

70-
/// Given a broadcast stream, return a singular throttled stream that is throttled.
71-
/// This immediately starts listening.
73+
/// Throttles an [input] stream to not emit events more often than with a
74+
/// frequency of 1/[timeout].
7275
///
73-
/// Behaviour:
74-
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
75-
/// Otherwise, we wait until the timeout is over.
76-
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
77-
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
78-
var nextPing = Completer<void>();
79-
var done = false;
80-
T? lastData;
81-
82-
var listener = input.listen((data) {
83-
if (lastData != null && add != null) {
84-
lastData = add(lastData!, data);
85-
} else {
86-
lastData = data;
76+
/// When an event is received and no timeout window is active, it is forwarded
77+
/// downstream and a timeout window is started. For events received within a
78+
/// timeout window, [add] is called to fold events. Then when the window
79+
/// expires, pending events are emitted.
80+
/// The subscription to the [input] stream is never paused.
81+
///
82+
/// When the returned stream is paused, an active timeout window is reset and
83+
/// restarts after the stream is resumed.
84+
///
85+
/// If [addOne] is not null, that event will always be added when the stream is
86+
/// subscribed to.
87+
/// When [throttleFirst] is true, a timeout window begins immediately after
88+
/// listening (so that the first event, apart from [addOne], is emitted no
89+
/// earlier than after [timeout]).
90+
Stream<T> _throttleStream<T extends Object>({
91+
required Stream<T> input,
92+
required Duration timeout,
93+
required bool throttleFirst,
94+
required T Function(T, T) add,
95+
required T? addOne,
96+
}) {
97+
return Stream.multi((listener) {
98+
T? pendingData;
99+
Timer? activeTimeoutWindow;
100+
101+
/// Add pending data, bypassing the active timeout window.
102+
///
103+
/// This is used to forward error and done events immediately.
104+
bool addPendingEvents() {
105+
if (pendingData case final data?) {
106+
pendingData = null;
107+
listener.addSync(data);
108+
activeTimeoutWindow?.cancel();
109+
return true;
110+
} else {
111+
return false;
112+
}
87113
}
88-
if (!nextPing.isCompleted) {
89-
nextPing.complete();
114+
115+
/// Emits [pendingData] if no timeout window is active, and then starts a
116+
/// timeout window if necessary.
117+
void maybeEmit() {
118+
if (activeTimeoutWindow == null && !listener.isPaused) {
119+
final didAdd = addPendingEvents();
120+
if (didAdd) {
121+
activeTimeoutWindow = Timer(timeout, () {
122+
activeTimeoutWindow = null;
123+
maybeEmit();
124+
});
125+
}
126+
}
90127
}
91-
}, onDone: () {
92-
if (!nextPing.isCompleted) {
93-
nextPing.complete();
128+
129+
void setTimeout() {
130+
activeTimeoutWindow = Timer(timeout, () {
131+
activeTimeoutWindow = null;
132+
maybeEmit();
133+
});
94134
}
95135

96-
done = true;
97-
});
136+
void onData(T data) {
137+
pendingData = switch (pendingData) {
138+
null => data,
139+
final pending => add(pending, data),
140+
};
141+
maybeEmit();
142+
}
98143

99-
try {
100-
if (addOne != null) {
101-
yield addOne;
144+
void onError(Object error, StackTrace trace) {
145+
addPendingEvents();
146+
listener.addErrorSync(error, trace);
102147
}
103-
if (throttleFirst) {
104-
await Future.delayed(timeout);
148+
149+
void onDone() {
150+
addPendingEvents();
151+
listener.closeSync();
105152
}
106-
while (!done) {
107-
// If a value is available now, we'll use it immediately.
108-
// If not, this waits for it.
109-
await nextPing.future;
110-
if (done) break;
111-
112-
// Capture any new values coming in while we wait.
113-
nextPing = Completer<void>();
114-
T data = lastData as T;
115-
// Clear before we yield, so that we capture new changes while yielding
116-
lastData = null;
117-
yield data;
118-
// Wait a minimum of this duration between tasks
119-
await Future.delayed(timeout);
153+
154+
final subscription = input.listen(onData, onError: onError, onDone: onDone);
155+
var needsTimeoutWindowAfterResume = false;
156+
157+
listener.onPause = () {
158+
needsTimeoutWindowAfterResume = activeTimeoutWindow != null;
159+
activeTimeoutWindow?.cancel();
160+
};
161+
listener.onResume = () {
162+
if (needsTimeoutWindowAfterResume) {
163+
setTimeout();
164+
} else {
165+
maybeEmit();
166+
}
167+
};
168+
listener.onCancel = () async {
169+
activeTimeoutWindow?.cancel();
170+
return subscription.cancel();
171+
};
172+
173+
if (addOne != null) {
174+
// This must not be sync, we're doing this directly in onListen
175+
listener.add(addOne);
120176
}
121-
} finally {
122-
if (lastData case final data?) {
123-
yield data;
177+
if (throttleFirst) {
178+
setTimeout();
124179
}
125-
126-
await listener.cancel();
127-
}
180+
});
128181
}

packages/sqlite_async/test/watch_test.dart

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ void main() {
113113
lastCount = count;
114114
}
115115

116-
// The number of read queries must not be greater than the number of writes overall.
117-
expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count']));
116+
// The number of read queries must not be greater than the number of
117+
// writes overall, plus one for the initial stream emission.
118+
expect(numberOfQueries,
119+
lessThanOrEqualTo(results.last.first['count'] + 1));
118120

119121
DateTime? lastTime;
120122
for (var r in times) {
@@ -283,7 +285,7 @@ void main() {
283285
});
284286
await Future.delayed(delay);
285287

286-
subscription.cancel();
288+
await subscription.cancel();
287289

288290
expect(
289291
counts,

0 commit comments

Comments
 (0)