diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 8141df9..0c8f2c6 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -62,12 +62,8 @@ class UpdateNotification { static StreamTransformer filterTablesTransformer(Iterable tables) { Set normalized = {for (var table in tables) table.toLowerCase()}; - return StreamTransformer.fromHandlers(handleData: (data, sink) { - if (data.containsAny(normalized)) { - sink.add(data); - } - }); + return StreamTransformer.fromBind( + (source) => source.where((data) => data.containsAny(normalized))); } } @@ -77,20 +73,27 @@ class UpdateNotification { /// Behaviour: /// If there was no event in "timeout", and one comes in, it is pushed immediately. /// Otherwise, we wait until the timeout is over. -Stream _throttleStream(Stream input, Duration timeout, +Stream _throttleStream(Stream input, Duration timeout, {bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* { var nextPing = Completer(); + var done = false; T? lastData; var listener = input.listen((data) { - if (lastData is T && add != null) { - lastData = add(lastData as T, data); + if (lastData != null && add != null) { + lastData = add(lastData!, data); } else { lastData = data; } if (!nextPing.isCompleted) { nextPing.complete(); } + }, onDone: () { + if (!nextPing.isCompleted) { + nextPing.complete(); + } + + done = true; }); try { @@ -100,10 +103,12 @@ Stream _throttleStream(Stream input, Duration timeout, if (throttleFirst) { await Future.delayed(timeout); } - while (true) { + while (!done) { // If a value is available now, we'll use it immediately. // If not, this waits for it. await nextPing.future; + if (done) break; + // Capture any new values coming in while we wait. nextPing = Completer(); T data = lastData as T; @@ -114,6 +119,10 @@ Stream _throttleStream(Stream input, Duration timeout, await Future.delayed(timeout); } } finally { - listener.cancel(); + if (lastData case final data?) { + yield data; + } + + await listener.cancel(); } } diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index d333166..8aac3d6 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -34,6 +34,7 @@ dev_dependencies: stream_channel: ^2.1.2 path: ^1.9.0 test_descriptor: ^2.0.2 + fake_async: ^1.3.3 platforms: android: diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart new file mode 100644 index 0000000..0a00ccb --- /dev/null +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -0,0 +1,154 @@ +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:sqlite_async/src/update_notification.dart'; +import 'package:test/test.dart'; + +void main() { + group('Update notifications', () { + const timeout = Duration(seconds: 10); + const halfTimeout = Duration(seconds: 5); + + group('throttle', () { + test('can add initial', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout, + addOne: UpdateNotification({'a'})).listen(events.add); + + control.flushMicrotasks(); + expect(events, hasLength(1)); + control.elapse(halfTimeout); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); // Still a delay from the initial one + + control.elapse(halfTimeout); + expect(events, hasLength(2)); + }); + }); + + test('sends events after initial throttle', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + control.elapse(halfTimeout); + expect(events, hasLength(1)); + }); + }); + + test('merges events', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + expect(events, isEmpty); + + source.add(UpdateNotification({'b'})); + control.elapse(halfTimeout); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + }); + }); + + test('forwards cancellations', () { + fakeAsync((control) { + var cancelled = false; + final source = StreamController(sync: true) + ..onCancel = () => cancelled = true; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen((_) => fail('unexpected event'), + onDone: () => fail('unexpected done')); + + source.add(UpdateNotification({'a'})); + control.elapse(halfTimeout); + + sub.cancel(); + control.flushTimers(); + + expect(cancelled, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + + test('closes when source closes', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + source + // These two are combined due to throttleFirst + ..add(UpdateNotification({'a'})) + ..add(UpdateNotification({'b'})) + ..close(); + + control.flushTimers(); + expect(events, [ + UpdateNotification({'a', 'b'}) + ]); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + + test('closes when source closes after delay', () { + fakeAsync((control) { + final source = StreamController(sync: true) + ..onCancel = () => Future.value(); + final events = []; + var done = false; + + UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add, onDone: () => done = true); + + control.elapse(const Duration(hours: 1)); + source.close(); + + control.flushTimers(); + expect(events, isEmpty); + expect(done, isTrue); + expect(control.pendingTimers, isEmpty); + }); + }); + }); + + test('filter tables', () async { + final source = StreamController(sync: true); + final events = []; + final subscription = UpdateNotification.filterTablesTransformer(['a']) + .bind(source.stream) + .listen(events.add); + + source.add(UpdateNotification({'a', 'b'})); + expect(events, hasLength(1)); + + source.add(UpdateNotification({'b'})); + expect(events, hasLength(1)); + + await subscription.cancel(); + expect(source.hasListener, isFalse); + }); + }); +}