Skip to content

Commit de4726c

Browse files
committed
Throttle improvements
1 parent aef33fc commit de4726c

File tree

2 files changed

+51
-90
lines changed

2 files changed

+51
-90
lines changed

Diff for: packages/sqlite_async/lib/src/update_notification.dart

+49-88
Original file line numberDiff line numberDiff line change
@@ -62,102 +62,63 @@ class UpdateNotification {
6262
static StreamTransformer<UpdateNotification, UpdateNotification>
6363
filterTablesTransformer(Iterable<String> tables) {
6464
Set<String> normalized = {for (var table in tables) table.toLowerCase()};
65-
66-
return StreamTransformer.fromBind(
67-
(source) => source.where((data) => data.containsAny(normalized)));
65+
return StreamTransformer<UpdateNotification,
66+
UpdateNotification>.fromHandlers(handleData: (data, sink) {
67+
if (data.containsAny(normalized)) {
68+
sink.add(data);
69+
}
70+
});
6871
}
6972
}
7073

71-
/// Given an [input] stream, returns a stream that will throttle events for each
72-
/// listener.
74+
/// Given a broadcast stream, return a singular throttled stream that is throttled.
75+
/// This immediately starts listening.
7376
///
7477
/// Behaviour:
7578
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
7679
/// Otherwise, we wait until the timeout is over.
7780
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
78-
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) {
79-
return Stream.multi(
80-
(downstream) {
81-
Timer? activeDelay;
82-
T? pendingData;
83-
84-
bool needsToDelay() {
85-
return downstream.isPaused || activeDelay != null;
86-
}
87-
88-
void scheduleDelay() {
89-
assert(activeDelay == null);
90-
91-
if (!needsToDelay()) {
92-
activeDelay = Timer(timeout, () {
93-
activeDelay = null;
94-
95-
if (!needsToDelay()) {
96-
if (pendingData case final pending?) {
97-
pendingData = null;
98-
downstream.addSync(pending);
99-
scheduleDelay();
100-
}
101-
}
102-
});
103-
}
104-
}
105-
106-
void cancelTimer() {
107-
activeDelay?.cancel();
108-
activeDelay = null;
109-
}
110-
111-
var listener = input.listen(
112-
(data) {
113-
if (needsToDelay()) {
114-
// We can't send this yet, so combine / replace the value to send when
115-
// the wait is over.
116-
if (pendingData != null && add != null) {
117-
pendingData = add(pendingData as T, data);
118-
} else {
119-
pendingData = data;
120-
}
121-
} else {
122-
// We can forward this event directly, but need to wait for a timeout
123-
// before sending the next event.
124-
assert(pendingData == null);
125-
downstream.addSync(data);
126-
scheduleDelay();
127-
}
128-
},
129-
onError: downstream.addErrorSync,
130-
onDone: () {
131-
cancelTimer();
132-
if (pendingData case final pending?) {
133-
downstream.addSync(pending);
134-
}
135-
downstream.closeSync();
136-
},
137-
);
138-
139-
if (addOne != null) {
140-
downstream.add(addOne);
141-
}
142-
if (throttleFirst) {
143-
scheduleDelay();
144-
}
81+
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
82+
var nextPing = Completer<void>();
83+
var done = false;
84+
T? lastData;
85+
86+
var listener = input.listen((data) {
87+
if (lastData != null && add != null) {
88+
lastData = add(lastData!, data);
89+
} else {
90+
lastData = data;
91+
}
92+
if (!nextPing.isCompleted) {
93+
nextPing.complete();
94+
}
95+
}, onDone: () => done = true);
14596

146-
downstream.onResume = () {
147-
if (!needsToDelay()) {
148-
if (pendingData case final pending?) {
149-
pendingData = null;
150-
downstream.add(pending);
151-
scheduleDelay();
152-
}
153-
}
154-
};
97+
try {
98+
if (addOne != null) {
99+
yield addOne;
100+
}
101+
if (throttleFirst) {
102+
await Future.delayed(timeout);
103+
}
104+
while (!done) {
105+
// If a value is available now, we'll use it immediately.
106+
// If not, this waits for it.
107+
await nextPing.future;
108+
// Capture any new values coming in while we wait.
109+
nextPing = Completer<void>();
110+
T data = lastData as T;
111+
// Clear before we yield, so that we capture new changes while yielding
112+
lastData = null;
113+
yield data;
114+
// Wait a minimum of this duration between tasks
115+
await Future.delayed(timeout);
116+
}
117+
} finally {
118+
if (lastData case final data?) {
119+
yield data;
120+
}
155121

156-
downstream.onCancel = () {
157-
cancelTimer();
158-
return listener.cancel();
159-
};
160-
},
161-
isBroadcast: input.isBroadcast,
162-
);
122+
await listener.cancel();
123+
}
163124
}

Diff for: packages/sqlite_async/test/update_notification_test.dart

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void main() {
8181
control.elapse(halfTimeout);
8282

8383
sub.cancel();
84-
control.flushMicrotasks();
84+
control.flushTimers();
8585

8686
expect(cancelled, isTrue);
8787
expect(control.pendingTimers, isEmpty);
@@ -104,7 +104,7 @@ void main() {
104104
..add(UpdateNotification({'b'}))
105105
..close();
106106

107-
control.flushMicrotasks();
107+
control.flushTimers();
108108
expect(events, [
109109
UpdateNotification({'a', 'b'})
110110
]);

0 commit comments

Comments
 (0)