Skip to content

Commit aef33fc

Browse files
committed
Refactor throttle implementation
1 parent 2938232 commit aef33fc

File tree

3 files changed

+225
-46
lines changed

3 files changed

+225
-46
lines changed

Diff for: packages/sqlite_async/lib/src/update_notification.dart

+90-46
Original file line numberDiff line numberDiff line change
@@ -62,58 +62,102 @@ class UpdateNotification {
6262
static StreamTransformer<UpdateNotification, UpdateNotification>
6363
filterTablesTransformer(Iterable<String> tables) {
6464
Set<String> normalized = {for (var table in tables) table.toLowerCase()};
65-
return StreamTransformer<UpdateNotification,
66-
UpdateNotification>.fromHandlers(handleData: (data, sink) {
67-
if (data.containsAny(normalized)) {
68-
sink.add(data);
69-
}
70-
});
65+
66+
return StreamTransformer.fromBind(
67+
(source) => source.where((data) => data.containsAny(normalized)));
7168
}
7269
}
7370

74-
/// Given a broadcast stream, return a singular throttled stream that is throttled.
75-
/// This immediately starts listening.
71+
/// Given an [input] stream, returns a stream that will throttle events for each
72+
/// listener.
7673
///
7774
/// Behaviour:
7875
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
7976
/// Otherwise, we wait until the timeout is over.
80-
Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
81-
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
82-
var nextPing = Completer<void>();
83-
T? lastData;
84-
85-
var listener = input.listen((data) {
86-
if (lastData is T && add != null) {
87-
lastData = add(lastData as T, data);
88-
} else {
89-
lastData = data;
90-
}
91-
if (!nextPing.isCompleted) {
92-
nextPing.complete();
93-
}
94-
});
77+
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
78+
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) {
79+
return Stream.multi(
80+
(downstream) {
81+
Timer? activeDelay;
82+
T? pendingData;
9583

96-
try {
97-
if (addOne != null) {
98-
yield addOne;
99-
}
100-
if (throttleFirst) {
101-
await Future.delayed(timeout);
102-
}
103-
while (true) {
104-
// If a value is available now, we'll use it immediately.
105-
// If not, this waits for it.
106-
await nextPing.future;
107-
// Capture any new values coming in while we wait.
108-
nextPing = Completer<void>();
109-
T data = lastData as T;
110-
// Clear before we yield, so that we capture new changes while yielding
111-
lastData = null;
112-
yield data;
113-
// Wait a minimum of this duration between tasks
114-
await Future.delayed(timeout);
115-
}
116-
} finally {
117-
listener.cancel();
118-
}
84+
bool needsToDelay() {
85+
return downstream.isPaused || activeDelay != null;
86+
}
87+
88+
void scheduleDelay() {
89+
assert(activeDelay == null);
90+
91+
if (!needsToDelay()) {
92+
activeDelay = Timer(timeout, () {
93+
activeDelay = null;
94+
95+
if (!needsToDelay()) {
96+
if (pendingData case final pending?) {
97+
pendingData = null;
98+
downstream.addSync(pending);
99+
scheduleDelay();
100+
}
101+
}
102+
});
103+
}
104+
}
105+
106+
void cancelTimer() {
107+
activeDelay?.cancel();
108+
activeDelay = null;
109+
}
110+
111+
var listener = input.listen(
112+
(data) {
113+
if (needsToDelay()) {
114+
// We can't send this yet, so combine / replace the value to send when
115+
// the wait is over.
116+
if (pendingData != null && add != null) {
117+
pendingData = add(pendingData as T, data);
118+
} else {
119+
pendingData = data;
120+
}
121+
} else {
122+
// We can forward this event directly, but need to wait for a timeout
123+
// before sending the next event.
124+
assert(pendingData == null);
125+
downstream.addSync(data);
126+
scheduleDelay();
127+
}
128+
},
129+
onError: downstream.addErrorSync,
130+
onDone: () {
131+
cancelTimer();
132+
if (pendingData case final pending?) {
133+
downstream.addSync(pending);
134+
}
135+
downstream.closeSync();
136+
},
137+
);
138+
139+
if (addOne != null) {
140+
downstream.add(addOne);
141+
}
142+
if (throttleFirst) {
143+
scheduleDelay();
144+
}
145+
146+
downstream.onResume = () {
147+
if (!needsToDelay()) {
148+
if (pendingData case final pending?) {
149+
pendingData = null;
150+
downstream.add(pending);
151+
scheduleDelay();
152+
}
153+
}
154+
};
155+
156+
downstream.onCancel = () {
157+
cancelTimer();
158+
return listener.cancel();
159+
};
160+
},
161+
isBroadcast: input.isBroadcast,
162+
);
119163
}

Diff for: packages/sqlite_async/pubspec.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dev_dependencies:
3434
stream_channel: ^2.1.2
3535
path: ^1.9.0
3636
test_descriptor: ^2.0.2
37+
fake_async: ^1.3.3
3738

3839
platforms:
3940
android:
+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import 'dart:async';
2+
3+
import 'package:fake_async/fake_async.dart';
4+
import 'package:sqlite_async/src/update_notification.dart';
5+
import 'package:test/test.dart';
6+
7+
void main() {
8+
group('Update notifications', () {
9+
const timeout = Duration(seconds: 10);
10+
const halfTimeout = Duration(seconds: 5);
11+
12+
group('throttle', () {
13+
test('can add initial', () {
14+
fakeAsync((control) {
15+
final source = StreamController<UpdateNotification>(sync: true);
16+
final events = <UpdateNotification>[];
17+
18+
UpdateNotification.throttleStream(source.stream, timeout,
19+
addOne: UpdateNotification({'a'})).listen(events.add);
20+
21+
control.flushMicrotasks();
22+
expect(events, hasLength(1));
23+
control.elapse(halfTimeout);
24+
25+
source.add(UpdateNotification({'b'}));
26+
expect(events, hasLength(1)); // Still a delay from the initial one
27+
28+
control.elapse(halfTimeout);
29+
expect(events, hasLength(2));
30+
});
31+
});
32+
33+
test('sends events after initial throttle', () {
34+
fakeAsync((control) {
35+
final source = StreamController<UpdateNotification>(sync: true);
36+
final events = <UpdateNotification>[];
37+
38+
UpdateNotification.throttleStream(source.stream, timeout)
39+
.listen(events.add);
40+
41+
source.add(UpdateNotification({'a'}));
42+
control.elapse(halfTimeout);
43+
expect(events, isEmpty);
44+
45+
control.elapse(halfTimeout);
46+
expect(events, hasLength(1));
47+
});
48+
});
49+
50+
test('merges events', () {
51+
fakeAsync((control) {
52+
final source = StreamController<UpdateNotification>(sync: true);
53+
final events = <UpdateNotification>[];
54+
55+
UpdateNotification.throttleStream(source.stream, timeout)
56+
.listen(events.add);
57+
58+
source.add(UpdateNotification({'a'}));
59+
control.elapse(halfTimeout);
60+
expect(events, isEmpty);
61+
62+
source.add(UpdateNotification({'b'}));
63+
control.elapse(halfTimeout);
64+
expect(events, [
65+
UpdateNotification({'a', 'b'})
66+
]);
67+
});
68+
});
69+
70+
test('forwards cancellations', () {
71+
fakeAsync((control) {
72+
var cancelled = false;
73+
final source = StreamController<UpdateNotification>(sync: true)
74+
..onCancel = () => cancelled = true;
75+
76+
final sub = UpdateNotification.throttleStream(source.stream, timeout)
77+
.listen((_) => fail('unexpected event'),
78+
onDone: () => fail('unexpected done'));
79+
80+
source.add(UpdateNotification({'a'}));
81+
control.elapse(halfTimeout);
82+
83+
sub.cancel();
84+
control.flushMicrotasks();
85+
86+
expect(cancelled, isTrue);
87+
expect(control.pendingTimers, isEmpty);
88+
});
89+
});
90+
91+
test('closes when source closes', () {
92+
fakeAsync((control) {
93+
final source = StreamController<UpdateNotification>(sync: true)
94+
..onCancel = () => Future.value();
95+
final events = <UpdateNotification>[];
96+
var done = false;
97+
98+
UpdateNotification.throttleStream(source.stream, timeout)
99+
.listen(events.add, onDone: () => done = true);
100+
101+
source
102+
// These two are combined due to throttleFirst
103+
..add(UpdateNotification({'a'}))
104+
..add(UpdateNotification({'b'}))
105+
..close();
106+
107+
control.flushMicrotasks();
108+
expect(events, [
109+
UpdateNotification({'a', 'b'})
110+
]);
111+
expect(done, isTrue);
112+
expect(control.pendingTimers, isEmpty);
113+
});
114+
});
115+
});
116+
117+
test('filter tables', () async {
118+
final source = StreamController<UpdateNotification>(sync: true);
119+
final events = <UpdateNotification>[];
120+
final subscription = UpdateNotification.filterTablesTransformer(['a'])
121+
.bind(source.stream)
122+
.listen(events.add);
123+
124+
source.add(UpdateNotification({'a', 'b'}));
125+
expect(events, hasLength(1));
126+
127+
source.add(UpdateNotification({'b'}));
128+
expect(events, hasLength(1));
129+
130+
await subscription.cancel();
131+
expect(source.hasListener, isFalse);
132+
});
133+
});
134+
}

0 commit comments

Comments
 (0)