diff --git a/Sources/AsyncAlgorithms/AsyncThrottleSequence.swift b/Sources/AsyncAlgorithms/Throttle/AsyncThrottleSequence.swift similarity index 69% rename from Sources/AsyncAlgorithms/AsyncThrottleSequence.swift rename to Sources/AsyncAlgorithms/Throttle/AsyncThrottleSequence.swift index ae2b1db4..96d8f970 100644 --- a/Sources/AsyncAlgorithms/AsyncThrottleSequence.swift +++ b/Sources/AsyncAlgorithms/Throttle/AsyncThrottleSequence.swift @@ -12,19 +12,19 @@ extension AsyncSequence { /// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func throttle(for interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Element) async -> Reduced) -> AsyncThrottleSequence { + public func throttle(for interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Element) async -> Reduced) -> AsyncThrottleSequence where Self: Sendable { AsyncThrottleSequence(self, interval: interval, clock: clock, reducing: reducing) } /// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func throttle(for interval: Duration, reducing: @Sendable @escaping (Reduced?, Element) async -> Reduced) -> AsyncThrottleSequence { + public func throttle(for interval: Duration, reducing: @Sendable @escaping (Reduced?, Element) async -> Reduced) -> AsyncThrottleSequence where Self: Sendable { throttle(for: interval, clock: .continuous, reducing: reducing) } /// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func throttle(for interval: C.Instant.Duration, clock: C, latest: Bool = true) -> AsyncThrottleSequence { + public func throttle(for interval: C.Instant.Duration, clock: C, latest: Bool = true) -> AsyncThrottleSequence where Self: Sendable { throttle(for: interval, clock: clock) { previous, element in if latest { return element @@ -36,14 +36,14 @@ extension AsyncSequence { /// Create a rate-limited `AsyncSequence` by emitting values at most every specified interval. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func throttle(for interval: Duration, latest: Bool = true) -> AsyncThrottleSequence { + public func throttle(for interval: Duration, latest: Bool = true) -> AsyncThrottleSequence where Self: Sendable { throttle(for: interval, clock: .continuous, latest: latest) } } /// A rate-limited `AsyncSequence` by emitting values at most every specified interval. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct AsyncThrottleSequence { +public struct AsyncThrottleSequence { let base: Base let interval: C.Instant.Duration let clock: C @@ -60,44 +60,47 @@ public struct AsyncThrottleSequence { @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension AsyncThrottleSequence: AsyncSequence { public typealias Element = Reduced - - /// The iterator for an `AsyncThrottleSequence` instance. + + public func makeAsyncIterator() -> Iterator { + let storage = ThrottleStorage( + base, + interval: interval, + clock: clock, + reducing: reducing + ) + return Iterator(storage: storage) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension AsyncThrottleSequence { public struct Iterator: AsyncIteratorProtocol { - var base: Base.AsyncIterator - var last: C.Instant? - let interval: C.Instant.Duration - let clock: C - let reducing: @Sendable (Reduced?, Base.Element) async -> Reduced + final class InternalClass: Sendable { + private let storage: ThrottleStorage + + fileprivate init(storage: ThrottleStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async rethrows -> Element? { + try await self.storage.next() + } + } + + let internalClass: InternalClass - init(_ base: Base.AsyncIterator, interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Base.Element) async -> Reduced) { - self.base = base - self.interval = interval - self.clock = clock - self.reducing = reducing + fileprivate init(storage: ThrottleStorage) { + self.internalClass = InternalClass(storage: storage) } - public mutating func next() async rethrows -> Reduced? { - var reduced: Reduced? - let start = last ?? clock.now - repeat { - guard let element = try await base.next() else { - return nil - } - let reduction = await reducing(reduced, element) - let now = clock.now - if start.duration(to: now) >= interval || last == nil { - last = now - return reduction - } else { - reduced = reduction - } - } while true + public mutating func next() async rethrows -> Element? { + try await self.internalClass.next() } } - - public func makeAsyncIterator() -> Iterator { - Iterator(base.makeAsyncIterator(), interval: interval, clock: clock, reducing: reducing) - } } @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) diff --git a/Sources/AsyncAlgorithms/Throttle/ThrottleStateMachine.swift b/Sources/AsyncAlgorithms/Throttle/ThrottleStateMachine.swift new file mode 100644 index 00000000..0a2083bd --- /dev/null +++ b/Sources/AsyncAlgorithms/Throttle/ThrottleStateMachine.swift @@ -0,0 +1,520 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct ThrottleStateMachine { + typealias Element = Reduced + + private enum State { + /// The initial state before a call to `next` happened. + case initial(base: Base) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreamContinuation: UnsafeContinuation?, + bufferedElement: Element? + ) + + /// The state once the downstream signalled demand but before we received + /// the first element from the upstream. + case demandSignalled( + task: Task, + downstreamContinuation: UnsafeContinuation, Never> + ) + + /// The state while we are consuming the upstream and waiting for the Clock.sleep to finish. + case throttling( + task: Task, + upstreamContinuation: UnsafeContinuation?, + downstreamContinuation: UnsafeContinuation, Never>, + currentElement: Element + ) + + /// The state once any of the upstream sequences threw an `Error`. + case upstreamFailure( + error: Error + ) + + /// The state once all upstream sequences finished or the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + } + + /// The state machine's current state. + private var state: State + /// The interval to debounce. + private let interval: C.Instant.Duration + /// The clock. + private let clock: C + + init(base: Base, clock: C, interval: C.Instant.Duration) { + self.state = .initial(base: base) + self.clock = clock + self.interval = interval + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream and clock continuation need to be resumed with a `CancellationError`. + case cancelTaskAndUpstreamAndClockContinuations( + task: Task, + upstreamContinuation: UnsafeContinuation? + ) + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch self.state { + case .initial: + // Nothing to do here. No demand was signalled until now + return .none + + case .throttling, .demandSignalled: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .waitingForDemand(let task, let upstreamContinuation, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamAndClockContinuations( + task: task, + upstreamContinuation: upstreamContinuation + ) + + case .upstreamFailure: + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now. The cleanup already happened when we + // transitioned to `upstreamFailure`. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + } + } + + mutating func taskStarted(_ task: Task, downstreamContinuation: UnsafeContinuation, Never>) { + switch self.state { + case .initial: + // The user called `next` and we are starting the `Task` + // to consume the upstream sequence + self.state = .demandSignalled( + task: task, + downstreamContinuation: downstreamContinuation + ) + + case .throttling, .demandSignalled, .waitingForDemand, .upstreamFailure, .finished: + // We only a single iterator to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + } + } + + /// Actions returned by `upstreamTaskSuspended()`. + enum UpstreamTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + upstreamContinuation: UnsafeContinuation, + error: Error + ) + } + + mutating func upstreamTaskSuspended(_ continuation: UnsafeContinuation) -> UpstreamTaskSuspendedAction? { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(_, .some, _), .throttling(_, .some, _, _): + // We already have an upstream continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamFailure: + // The upstream already failed so it should never suspend again since the child task + // should have exited + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let bufferedElement): + // The upstream task is ready to consume the next element + // we are just waiting to get demand + self.state = .waitingForDemand( + task: task, + upstreamContinuation: continuation, + bufferedElement: bufferedElement + ) + + return .none + + case .demandSignalled: + // It can happen that the demand got signalled before our upstream suspended for the first time + // We need to resume it right away to demand the first element from the upstream + return .resumeContinuation(upstreamContinuation: continuation) + + case .throttling(_, .none, _, _): + // We are currently debouncing and the upstream task suspended again + // We need to resume the continuation right away so that it continues to + // consume new elements from the upstream + + return .resumeContinuation(upstreamContinuation: continuation) + + case .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + upstreamContinuation: continuation, + error: CancellationError() + ) + } + } + + mutating func elementProduced(_ element: Element) { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .waitingForDemand(_, _, .some): + // We can only ever buffer one element because of the race of both child tasks + // After that element got buffered we are not resuming the upstream continuation + // and should never get another element until we get downstream demand signalled + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamFailure: + // The upstream already failed so it should never have produced another element + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, let upstreamContinuation, .none): + // We got an element even though we don't have an outstanding demand + // this can happen because we race the upstream and Clock child tasks + // and the upstream might finish after the Clock. We just need + // to buffer the element for the next demand. + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + bufferedElement: element + ) + + case .demandSignalled(let task, let downstreamContinuation): + state = .waitingForDemand(task: task, upstreamContinuation: nil, bufferedElement: nil) + downstreamContinuation.resume(returning: .success(element)) + + + case .throttling(let task, let upstreamContinuation, let downstreamContinuation, _): + // We just got another element and the Clock hasn't finished sleeping yet + // We just need to store the new element + self.state = .throttling( + task: task, + upstreamContinuation: upstreamContinuation, + downstreamContinuation: downstreamContinuation, + currentElement: element + ) + + case .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + break + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTask( + task: Task + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstream( + downstreamContinuation: UnsafeContinuation, Never>, + task: Task, + upstreamContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithElementAndCancelTaskAndUpstream( + downstreamContinuation: UnsafeContinuation, Never>, + element: Element, + task: Task, + upstreamContinuation: UnsafeContinuation? + ) + } + + mutating func upstreamFinished() -> UpstreamFinishedAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .some, _): + // We will never receive an upstream finished and have an outstanding continuation + // since we only receive finish after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, .some): + // We will never receive an upstream finished while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream finished all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // The upstream already failed so it should never have finished again + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTask( + task: task + ) + + case .demandSignalled(let task, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream finished. This means we need to resume the downstream with nil + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstream( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil + ) + + case .throttling(let task, let upstreamContinuation, let downstreamContinuation, let currentElement): + // We are debouncing and the upstream finished. At this point + // we can just resume the downstream continuation with element and cancel everything else + self.state = .finished + + return .resumeContinuationWithElementAndCancelTaskAndUpstream( + downstreamContinuation: downstreamContinuation, + element: currentElement, + task: task, + upstreamContinuation: upstreamContinuation + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTask( + task: Task + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: UnsafeContinuation, Never>, + error: Error, + task: Task, + upstreamContinuation: UnsafeContinuation? + ) + } + + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(_, .some, _): + // We will never receive an upstream threw and have an outstanding continuation + // since we only receive threw after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, .some): + // We will never receive an upstream threw while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream threw all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // We need to tolerate multiple upstreams failing + return .none + + case .waitingForDemand(let task, .none, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTask( + task: task + ) + + case .demandSignalled(let task, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream threw. This means we need to resume the downstream with the error + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: nil + ) + + case .throttling(let task, let upstreamContinuation, let downstreamContinuation, _): + // We are debouncing and the upstream threw. At this point + // we can just resume the downstream continuation with error and cancel everything else + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: upstreamContinuation + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstream( + downstreamContinuation: UnsafeContinuation, Never>, + task: Task, + upstreamContinuation: UnsafeContinuation? + ) + } + + mutating func cancelled() -> CancelledAction? { + switch self.state { + case .initial: + state = .finished + return .none + + case .waitingForDemand: + // We got cancelled before we event got any demand. This can happen if a cancelled task + // calls next and the onCancel handler runs first. We can transition to finished right away. + self.state = .finished + + return .none + + case .demandSignalled(let task, let downstreamContinuation): + // We got cancelled while we were waiting for the first upstream element + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstream( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil + ) + + case .throttling(let task, let upstreamContinuation, let downstreamContinuation, _): + // We got cancelled while debouncing. + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstream( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: upstreamContinuation + ) + + case .upstreamFailure: + // An upstream already threw and we cancelled everything already. + // We should stay in the upstream failure state until the error is consumed + return .none + + case .finished: + // We are already finished so nothing to do here: + self.state = .finished + + return .none + } + } + + /// Actions returned by `next()`. + enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask(Base) + case resumeUpstreamContinuation( + upstreamContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(UnsafeContinuation, Never>) + /// Indicates that the downstream continuation should be resumed with the error. + case resumeDownstreamContinuationWithError( + UnsafeContinuation, Never>, + Error + ) + } + + mutating func next(for continuation: UnsafeContinuation, Never>) -> NextAction { + switch self.state { + case .initial(let base): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(base) + + case .demandSignalled, .throttling: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, let upstreamContinuation, let bufferedElement): + if let bufferedElement = bufferedElement { + // We already got an element from the last buffered one + // We can kick of the clock and upstream consumption right away and transition to debouncing + self.state = .throttling( + task: task, + upstreamContinuation: nil, + downstreamContinuation: continuation, + currentElement: bufferedElement + ) + + return .resumeUpstreamContinuation( + upstreamContinuation: upstreamContinuation + ) + } else { + // We don't have a buffered element so have to resume the upstream continuation + // to get the first one and transition to demandSignalled + self.state = .demandSignalled( + task: task, + downstreamContinuation: continuation + ) + + return .resumeUpstreamContinuation(upstreamContinuation: upstreamContinuation) + } + + case .upstreamFailure(let error): + // The upstream threw and haven't delivered the error yet + // Let's deliver it and transition to finished + self.state = .finished + + return .resumeDownstreamContinuationWithError(continuation, error) + + case .finished: + // We are already finished so we are just returning `nil` + return .resumeDownstreamContinuationWithNil(continuation) + } + } +} diff --git a/Sources/AsyncAlgorithms/Throttle/ThrottleStorage.swift b/Sources/AsyncAlgorithms/Throttle/ThrottleStorage.swift new file mode 100644 index 00000000..f3a605f3 --- /dev/null +++ b/Sources/AsyncAlgorithms/Throttle/ThrottleStorage.swift @@ -0,0 +1,229 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class ThrottleStorage: @unchecked Sendable where Base: Sendable { + typealias Element = Reduced + + /// The state machine protected with a lock. + private let stateMachine: ManagedCriticalState> + /// The interval to throttle. + private let interval: C.Instant.Duration + /// The clock. + private let clock: C + + private let reducing: @Sendable (Reduced?, Base.Element) async -> Reduced + + init(_ base: Base, interval: C.Instant.Duration, clock: C, reducing: @Sendable @escaping (Reduced?, Base.Element) async -> Reduced) { + self.stateMachine = .init(.init(base: base, clock: clock, interval: interval)) + self.interval = interval + self.clock = clock + self.reducing = reducing + } + + func iteratorDeinitialized() { + let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamAndClockContinuations( + let task, + let upstreamContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + case .none: + break + } + } + + func next() async rethrows -> Element? { + // We need to handle cancellation here because we are creating a continuation + // and because we need to cancel the `Task` we created to consume the upstream + return try await withTaskCancellationHandler { + // We always suspend since we can never return an element right away + + let result: Result = await withUnsafeContinuation { continuation in + self.stateMachine.withCriticalRegion { + let action = $0.next(for: continuation) + + switch action { + case .startTask(let base): + self.startTask( + stateMachine: &$0, + base: base, + downstreamContinuation: continuation + ) + + case .resumeUpstreamContinuation(let upstreamContinuation): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand. + upstreamContinuation?.resume(returning: ()) + case .resumeDownstreamContinuationWithNil(let continuation): + continuation.resume(returning: .success(nil)) + + case .resumeDownstreamContinuationWithError(let continuation, let error): + continuation.resume(returning: .failure(error)) + } + } + } + + return try result._rethrowGet() + } onCancel: { + let action = self.stateMachine.withCriticalRegion { $0.cancelled() } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstream( + let downstreamContinuation, + let task, + let upstreamContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + + case .none: + break + } + } + } + + private func startTask( + stateMachine: inout ThrottleStateMachine, + base: Base, + downstreamContinuation: UnsafeContinuation, Never> + ) { + let task = Task { + var reduced: Reduced? + var last: C.Instant? + var iterator = base.makeAsyncIterator() + + do { + // This is our upstream consumption loop + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand + // and until the Clock sleep finished. + try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { $0.upstreamTaskSuspended(continuation) } + + switch action { + case .resumeContinuation(let continuation): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: ()) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + // We got signalled from the downstream that we have demand so let's + // request a new element from the upstream + if let item = try await iterator.next() { + let element = await self.reducing(reduced, item) + reduced = element + let now = self.clock.now + if let prev = last { + let elapsed = prev.duration(to: now) + // ensure the interval since the last emission is greater than or equal to the period of throttling + if elapsed >= interval { + last = now + reduced = nil + self.stateMachine.withCriticalRegion { + $0.elementProduced(element) + } + } + } else { + // nothing has previously been emitted so consider this not to be rate limited + last = now + reduced = nil + self.stateMachine.withCriticalRegion { + $0.elementProduced(element) + } + } + + } else { + // The upstream returned `nil` which indicates that it finished + let action = self.stateMachine.withCriticalRegion { $0.upstreamFinished() } + + // All of this is mostly cleanup around the Task and the outstanding + // continuations used for signalling. + switch action { + case .cancelTask(let task): + task.cancel() + + break loop + case .resumeContinuationWithNilAndCancelTaskAndUpstream( + let downstreamContinuation, + let task, + let upstreamContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + + break loop + + case .resumeContinuationWithElementAndCancelTaskAndUpstream( + let downstreamContinuation, + let element, + let task, + let upstreamContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: .success(element)) + + break loop + + case .none: + + break loop + } + } + } + } catch { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.upstreamThrew(error) + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + task.cancel() + downstreamContinuation.resume(returning: .failure(error)) + case .cancelTask( + let task + ): + task.cancel() + case .none: + break + } + } + } + } + stateMachine.taskStarted(task, downstreamContinuation: downstreamContinuation) + } +}