Skip to content

Add tests for update stream throttling #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions packages/sqlite_async/lib/src/update_notification.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,8 @@ class UpdateNotification {
static StreamTransformer<UpdateNotification, UpdateNotification>
filterTablesTransformer(Iterable<String> tables) {
Set<String> normalized = {for (var table in tables) table.toLowerCase()};
return StreamTransformer<UpdateNotification,
UpdateNotification>.fromHandlers(handleData: (data, sink) {
if (data.containsAny(normalized)) {
sink.add(data);
}
});
return StreamTransformer.fromBind(
(source) => source.where((data) => data.containsAny(normalized)));
}
}

Expand All @@ -77,20 +73,27 @@ class UpdateNotification {
/// Behaviour:
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
/// Otherwise, we wait until the timeout is over.
Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
var nextPing = Completer<void>();
var done = false;
T? lastData;

var listener = input.listen((data) {
if (lastData is T && add != null) {
lastData = add(lastData as T, data);
if (lastData != null && add != null) {
lastData = add(lastData!, data);
} else {
lastData = data;
}
if (!nextPing.isCompleted) {
nextPing.complete();
}
}, onDone: () {
if (!nextPing.isCompleted) {
nextPing.complete();
}

done = true;
});

try {
Expand All @@ -100,10 +103,12 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
if (throttleFirst) {
await Future.delayed(timeout);
}
while (true) {
while (!done) {
// If a value is available now, we'll use it immediately.
// If not, this waits for it.
await nextPing.future;
if (done) break;

// Capture any new values coming in while we wait.
nextPing = Completer<void>();
T data = lastData as T;
Expand All @@ -114,6 +119,10 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
await Future.delayed(timeout);
}
} finally {
listener.cancel();
if (lastData case final data?) {
yield data;
}

await listener.cancel();
}
}
1 change: 1 addition & 0 deletions packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dev_dependencies:
stream_channel: ^2.1.2
path: ^1.9.0
test_descriptor: ^2.0.2
fake_async: ^1.3.3

platforms:
android:
Expand Down
154 changes: 154 additions & 0 deletions packages/sqlite_async/test/update_notification_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import 'dart:async';

import 'package:fake_async/fake_async.dart';
import 'package:sqlite_async/src/update_notification.dart';
import 'package:test/test.dart';

void main() {
group('Update notifications', () {
const timeout = Duration(seconds: 10);
const halfTimeout = Duration(seconds: 5);

group('throttle', () {
test('can add initial', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];

UpdateNotification.throttleStream(source.stream, timeout,
addOne: UpdateNotification({'a'})).listen(events.add);

control.flushMicrotasks();
expect(events, hasLength(1));
control.elapse(halfTimeout);

source.add(UpdateNotification({'b'}));
expect(events, hasLength(1)); // Still a delay from the initial one

control.elapse(halfTimeout);
expect(events, hasLength(2));
});
});

test('sends events after initial throttle', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];

UpdateNotification.throttleStream(source.stream, timeout)
.listen(events.add);

source.add(UpdateNotification({'a'}));
control.elapse(halfTimeout);
expect(events, isEmpty);

control.elapse(halfTimeout);
expect(events, hasLength(1));
});
});

test('merges events', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];

UpdateNotification.throttleStream(source.stream, timeout)
.listen(events.add);

source.add(UpdateNotification({'a'}));
control.elapse(halfTimeout);
expect(events, isEmpty);

source.add(UpdateNotification({'b'}));
control.elapse(halfTimeout);
expect(events, [
UpdateNotification({'a', 'b'})
]);
});
});

test('forwards cancellations', () {
fakeAsync((control) {
var cancelled = false;
final source = StreamController<UpdateNotification>(sync: true)
..onCancel = () => cancelled = true;

final sub = UpdateNotification.throttleStream(source.stream, timeout)
.listen((_) => fail('unexpected event'),
onDone: () => fail('unexpected done'));

source.add(UpdateNotification({'a'}));
control.elapse(halfTimeout);

sub.cancel();
control.flushTimers();

expect(cancelled, isTrue);
expect(control.pendingTimers, isEmpty);
});
});

test('closes when source closes', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true)
..onCancel = () => Future.value();
final events = <UpdateNotification>[];
var done = false;

UpdateNotification.throttleStream(source.stream, timeout)
.listen(events.add, onDone: () => done = true);

source
// These two are combined due to throttleFirst
..add(UpdateNotification({'a'}))
..add(UpdateNotification({'b'}))
..close();

control.flushTimers();
expect(events, [
UpdateNotification({'a', 'b'})
]);
expect(done, isTrue);
expect(control.pendingTimers, isEmpty);
});
});

test('closes when source closes after delay', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true)
..onCancel = () => Future.value();
final events = <UpdateNotification>[];
var done = false;

UpdateNotification.throttleStream(source.stream, timeout)
.listen(events.add, onDone: () => done = true);

control.elapse(const Duration(hours: 1));
source.close();

control.flushTimers();
expect(events, isEmpty);
expect(done, isTrue);
expect(control.pendingTimers, isEmpty);
});
});
});

test('filter tables', () async {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];
final subscription = UpdateNotification.filterTablesTransformer(['a'])
.bind(source.stream)
.listen(events.add);

source.add(UpdateNotification({'a', 'b'}));
expect(events, hasLength(1));

source.add(UpdateNotification({'b'}));
expect(events, hasLength(1));

await subscription.cancel();
expect(source.hasListener, isFalse);
});
});
}