diff --git a/Evolution/NNNN-channel.md b/Evolution/0012-channel.md similarity index 100% rename from Evolution/NNNN-channel.md rename to Evolution/0012-channel.md diff --git a/Evolution/NNNN-chunk.md b/Evolution/0013-chunk.md similarity index 100% rename from Evolution/NNNN-chunk.md rename to Evolution/0013-chunk.md diff --git a/Evolution/NNNN-rate-limits.md b/Evolution/0014-rate-limits.md similarity index 100% rename from Evolution/NNNN-rate-limits.md rename to Evolution/0014-rate-limits.md diff --git a/Evolution/NNNN-reductions.md b/Evolution/0015-reductions.md similarity index 100% rename from Evolution/NNNN-reductions.md rename to Evolution/0015-reductions.md diff --git a/Evolution/0016-mutli-producer-single-consumer-channel.md b/Evolution/0016-mutli-producer-single-consumer-channel.md new file mode 100644 index 00000000..596a1ccc --- /dev/null +++ b/Evolution/0016-mutli-producer-single-consumer-channel.md @@ -0,0 +1,652 @@ +# MutliProducerSingleConsumerChannel + +* Proposal: [SAA-0016](0016-multi-producer-single-consumer-channel.md) +* Authors: [Franz Busch](https://github.com/FranzBusch) +* Review Manager: TBD +* Status: **Implemented** + +## Revision +- 2023/12/18: Migrate proposal from Swift Evolution to Swift Async Algorithms. +- 2023/12/19: Add element size dependent strategy +- 2024/05/19: Rename to multi producer single consumer channel +- 2024/05/28: Add unbounded strategy + +## Introduction + +[SE-0314](https://github.com/apple/swift-evolution/blob/main/proposals/0314-async-stream.md) +introduced new `Async[Throwing]Stream` types which act as root asynchronous +sequences. These two types allow bridging from synchronous callbacks such as +delegates to an asynchronous sequence. This proposal adds a new root +asynchronous sequence with the goal to bridge multi producer systems +into an asynchronous sequence. + +## Motivation + +After using the `AsyncSequence` protocol and the `Async[Throwing]Stream` types +extensively over the past years, we learned that there are a few important +behavioral details that any `AsyncSequence` implementation needs to support. +These behaviors are: + +1. Backpressure +2. Multi/single consumer support +3. Downstream consumer termination +4. Upstream producer termination + +In general, `AsyncSequence` implementations can be divided into two kinds: Root +asynchronous sequences that are the source of values such as +`Async[Throwing]Stream` and transformational asynchronous sequences such as +`AsyncMapSequence`. Most transformational asynchronous sequences implicitly +fulfill the above behaviors since they forward any demand to a base asynchronous +sequence that should implement the behaviors. On the other hand, root +asynchronous sequences need to make sure that all of the above behaviors are +correctly implemented. Let's look at the current behavior of +`Async[Throwing]Stream` to see if and how it achieves these behaviors. + +### Backpressure + +Root asynchronous sequences need to relay the backpressure to the producing +system. `Async[Throwing]Stream` aims to support backpressure by providing a +configurable buffer and returning +`Async[Throwing]Stream.Continuation.YieldResult` which contains the current +buffer depth from the `yield()` method. However, only providing the current +buffer depth on `yield()` is not enough to bridge a backpressured system into +an asynchronous sequence since this can only be used as a "stop" signal but we +are missing a signal to indicate resuming the production. The only viable +backpressure strategy that can be implemented with the current API is a timed +backoff where we stop producing for some period of time and then speculatively +produce again. This is a very inefficient pattern that produces high latencies +and inefficient use of resources. + +### Multi/single consumer support + +The `AsyncSequence` protocol itself makes no assumptions about whether the +implementation supports multiple consumers or not. This allows the creation of +unicast and multicast asynchronous sequences. The difference between a unicast +and multicast asynchronous sequence is if they allow multiple iterators to be +created. `AsyncStream` does support the creation of multiple iterators and it +does handle multiple consumers correctly. On the other hand, +`AsyncThrowingStream` also supports multiple iterators but does `fatalError` +when more than one iterator has to suspend. The original proposal states: + +> As with any sequence, iterating over an AsyncStream multiple times, or +creating multiple iterators and iterating over them separately, may produce an +unexpected series of values. + +While that statement leaves room for any behavior we learned that a clear distinction +of behavior for root asynchronous sequences is beneficial; especially, when it comes to +how transformation algorithms are applied on top. + +### Downstream consumer termination + +Downstream consumer termination allows the producer to notify the consumer that +no more values are going to be produced. `Async[Throwing]Stream` does support +this by calling the `finish()` or `finish(throwing:)` methods of the +`Async[Throwing]Stream.Continuation`. However, `Async[Throwing]Stream` does not +handle the case that the `Continuation` may be `deinit`ed before one of the +finish methods is called. This currently leads to async streams that never +terminate. The behavior could be changed but it could result in semantically +breaking code. + +### Upstream producer termination + +Upstream producer termination is the inverse of downstream consumer termination +where the producer is notified once the consumption has terminated. Currently, +`Async[Throwing]Stream` does expose the `onTermination` property on the +`Continuation`. The `onTermination` closure is invoked once the consumer has +terminated. The consumer can terminate in four separate cases: + +1. The asynchronous sequence was `deinit`ed and no iterator was created +2. The iterator was `deinit`ed and the asynchronous sequence is unicast +3. The consuming task is canceled +4. The asynchronous sequence returned `nil` or threw + +`Async[Throwing]Stream` currently invokes `onTermination` in all cases; however, +since `Async[Throwing]Stream` supports multiple consumers (as discussed in the +`Multi/single consumer support` section), a single consumer task being canceled +leads to the termination of all consumers. This is not expected from multicast +asynchronous sequences in general. + +## Proposed solution + +The above motivation lays out the expected behaviors from a root asynchronous +sequence and compares them to the behaviors of `Async[Throwing]Stream`. These +are the behaviors where `Async[Throwing]Stream` diverges from the expectations. + +- Backpressure: Doesn't expose a "resumption" signal to the producer +- Multi/single consumer: + - Divergent implementation between throwing and non-throwing variant + - Supports multiple consumers even though proposal positions it as a unicast + asynchronous sequence +- Consumer termination: Doesn't handle the `Continuation` being `deinit`ed +- Producer termination: Happens on first consumer termination + +This section proposes a new type called `MutliProducerSingleConsumerChannel` that implement all of +the above-mentioned behaviors. + +### Creating an MutliProducerSingleConsumerChannel + +You can create an `MutliProducerSingleConsumerChannel` instance using the new +`makeChannel(of: backpressureStrategy:)` method. This method returns you the +channel and the source. The source can be used to send new values to the +asynchronous channel. The new API specifically provides a +multi-producer/single-consumer pattern. + +```swift +let (channel, source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +``` + +The new proposed APIs offer three different ways to bridge a backpressured +system. The foundation is the multi-step synchronous interface. Below is an +example of how it can be used: + +```swift +do { + let sendResult = try source.send(contentsOf: sequence) + + switch sendResult { + case .produceMore: + // Trigger more production + + case .enqueueCallback(let callbackToken): + source.enqueueCallback(token: callbackToken, onProduceMore: { result in + switch result { + case .success: + // Trigger more production + case .failure(let error): + // Terminate the underlying producer + } + }) + } +} catch { + // `send(contentsOf:)` throws if the asynchronous stream already terminated +} +``` + +The above API offers the most control and highest performance when bridging a +synchronous producer to an asynchronous sequence. First, you have to send +values using the `send(contentsOf:)` which returns a `SendResult`. The result +either indicates that more values should be produced or that a callback should +be enqueued by calling the `enqueueCallback(callbackToken: onProduceMore:)` +method. This callback is invoked once the backpressure strategy decided that +more values should be produced. This API aims to offer the most flexibility with +the greatest performance. The callback only has to be allocated in the case +where the producer needs to be suspended. + +Additionally, the above API is the building block for some higher-level and +easier-to-use APIs to send values to the channel. Below is an +example of the two higher-level APIs. + +```swift +// Writing new values and providing a callback when to produce more +try source.send(contentsOf: sequence, onProduceMore: { result in + switch result { + case .success: + // Trigger more production + case .failure(let error): + // Terminate the underlying producer + } +}) + +// This method suspends until more values should be produced +try await source.send(contentsOf: sequence) +``` + +With the above APIs, we should be able to effectively bridge any system into an +asynchronous sequence regardless if the system is callback-based, blocking or +asynchronous. + +### Downstream consumer termination + +> When reading the next two examples around termination behaviour keep in mind +that the newly proposed APIs are providing a strict unicast asynchronous sequence. + +Calling `finish()` terminates the downstream consumer. Below is an example of +this: + +```swift +// Termination through calling finish +let (channel, source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) + +_ = try await source.send(1) +source.finish() + +for try await element in channel { + print(element) +} +print("Finished") + +// Prints +// 1 +// Finished +``` + +The other way to terminate the consumer is by deiniting the source. This has the +same effect as calling `finish()` and makes sure that no consumer is stuck +indefinitely. + +```swift +// Termination through deiniting the source +let (channel, _) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) + +for await element in channel { + print(element) +} +print("Finished") + +// Prints +// Finished +``` + +Trying to send more elements after the source has been finish will result in an +error thrown from the send methods. + +### Upstream producer termination + +The producer will get notified about termination through the `onTerminate` +callback. Termination of the producer happens in the following scenarios: + +```swift +// Termination through task cancellation +let (channel source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) + +let task = Task { + for await element in channel { + + } +} +task.cancel() +``` + +```swift +// Termination through deiniting the sequence +let (_, source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +``` + +```swift +// Termination through deiniting the iterator +let (channel, source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) +_ = channel.makeAsyncIterator() +``` + +```swift +// Termination through calling finish +let (channel, source) = MutliProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) +) + +_ = try source.send(1) +source.finish() + +for await element in channel {} + +// onTerminate will be called after all elements have been consumed +``` + +Similar to the downstream consumer termination, trying to send more elements after the +producer has been terminated will result in an error thrown from the send methods. + +## Detailed design + +```swift +/// An error that is thrown from the various `send` methods of the +/// ``MultiProducerSingleConsumerChannel/Source``. +/// +/// This error is thrown when the channel is already finished when +/// trying to send new elements to the source. +public struct MultiProducerSingleConsumerChannelAlreadyFinishedError : Error { + + @usableFromInline + internal init() +} + +/// A multi producer single consumer channel. +/// +/// The ``MultiProducerSingleConsumerChannel`` provides a ``MultiProducerSingleConsumerChannel/Source`` to +/// send values to the channel. The source exposes the internal backpressure of the asynchronous sequence to the +/// producer. Additionally, the source can be used from synchronous and asynchronous contexts. +/// +/// +/// ## Using a MultiProducerSingleConsumerChannel +/// +/// To use a ``MultiProducerSingleConsumerChannel`` you have to create a new channel with it's source first by calling +/// the ``MultiProducerSingleConsumerChannel/makeChannel(of:throwing:BackpressureStrategy:)`` method. +/// Afterwards, you can pass the source to the producer and the channel to the consumer. +/// +/// ``` +/// let (channel, source) = MultiProducerSingleConsumerChannel.makeChannel( +/// backpressureStrategy: .watermark(low: 2, high: 4) +/// ) +/// ``` +/// +/// ### Asynchronous producers +/// +/// Values can be send to the source from asynchronous contexts using ``MultiProducerSingleConsumerChannel/Source/send(_:)-9b5do`` +/// and ``MultiProducerSingleConsumerChannel/Source/send(contentsOf:)-4myrz``. Backpressure results in calls +/// to the `send` methods to be suspended. Once more elements should be produced the `send` methods will be resumed. +/// +/// ``` +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await source.send(1) +/// try await source.send(2) +/// try await source.send(3) +/// } +/// +/// for await element in channel { +/// print(element) +/// } +/// } +/// ``` +/// +/// ### Synchronous producers +/// +/// Values can also be send to the source from synchronous context. Backpressure is also exposed on the synchronous contexts; however, +/// it is up to the caller to decide how to properly translate the backpressure to underlying producer e.g. by blocking the thread. +/// +/// ## Finishing the source +/// +/// To properly notify the consumer if the production of values has been finished the source's ``MultiProducerSingleConsumerChannel/Source/finish(throwing:)`` **must** be called. +public struct MultiProducerSingleConsumerChannel: AsyncSequence { + /// Initializes a new ``MultiProducerSingleConsumerChannel`` and an ``MultiProducerSingleConsumerChannel/Source``. + /// + /// - Parameters: + /// - elementType: The element type of the channel. + /// - failureType: The failure type of the channel. + /// - BackpressureStrategy: The backpressure strategy that the channel should use. + /// - Returns: A tuple containing the channel and its source. The source should be passed to the + /// producer while the channel should be passed to the consumer. + public static func makeChannel(of elementType: Element.Type = Element.self, throwing failureType: Failure.Type = Never.self, backpressureStrategy: Source.BackpressureStrategy) -> (`Self`, Source) +} + +extension MultiProducerSingleConsumerChannel { + /// A struct to send values to the channel. + /// + /// Use this source to provide elements to the channel by calling one of the `send` methods. + /// + /// - Important: You must terminate the source by calling ``finish(throwing:)``. + public struct Source: Sendable { + /// A strategy that handles the backpressure of the channel. + public struct BackpressureStrategy: Sendable { + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + public static func watermark(low: Int, high: Int) -> BackpressureStrategy + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + /// - waterLevelForElement: A closure used to compute the contribution of each buffered element to the current water level. + /// + /// - Note, `waterLevelForElement` will be called on each element when it is written into the source and when + /// it is consumed from the channel, so it is recommended to provide an function that runs in constant time. + public static func watermark(low: Int, high: Int, waterLevelForElement: @escaping @Sendable (Element) -> Int) -> BackpressureStrategy + } + + /// A type that indicates the result of sending elements to the source. + public enum SendResult: Sendable { + /// A token that is returned when the channel's backpressure strategy indicated that production should + /// be suspended. Use this token to enqueue a callback by calling the ``enqueueCallback(_:)`` method. + public struct CallbackToken: Sendable { } + + /// Indicates that more elements should be produced and written to the source. + case produceMore + + /// Indicates that a callback should be enqueued. + /// + /// The associated token should be passed to the ``enqueueCallback(_:)`` method. + case enqueueCallback(CallbackToken) + } + + /// A callback to invoke when the channel finished. + /// + /// The channel finishes and calls this closure in the following cases: + /// - No iterator was created and the sequence was deinited + /// - An iterator was created and deinited + /// - After ``finish(throwing:)`` was called and all elements have been consumed + public var onTermination: (@Sendable () -> Void)? { get set } + + /// Sends new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter sequence: The elements to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + public func send(contentsOf sequence: S) throws -> SendResult where Element == S.Element, S : Sequence + + /// Send the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter element: The element to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + public func send(_ element: Element) throws -> SendResult + + /// Enqueues a callback that will be invoked once more elements should be produced. + /// + /// Call this method after ``send(contentsOf:)-5honm`` or ``send(_:)-3jxzb`` returned ``SendResult/enqueueCallback(_:)``. + /// + /// - Important: Enqueueing the same token multiple times is not allowed. + /// + /// - Parameters: + /// - callbackToken: The callback token. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. + public func enqueueCallback(callbackToken: consuming SendResult.CallbackToken, onProduceMore: @escaping @Sendable (Result) -> Void) + + /// Cancel an enqueued callback. + /// + /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and + /// will mark the passed `callbackToken` as cancelled. + /// + /// - Parameter callbackToken: The callback token. + public func cancelCallback(callbackToken: consuming SendResult.CallbackToken) + + /// Send new elements to the channel and provide a callback which will be invoked once more elements should be produced. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(contentsOf:onProduceMore:)``. + public func send(contentsOf sequence: S, onProduceMore: @escaping @Sendable (Result) -> Void) where Element == S.Element, S : Sequence + + /// Sends the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - element: The element to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(_:onProduceMore:)``. + public func send(_ element: Element, onProduceMore: @escaping @Sendable (Result) -> Void) + + /// Send new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + public func send(contentsOf sequence: S) async throws where Element == S.Element, S : Sequence + + /// Send new element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - element: The element to send to the channel. + public func send(_ element: Element) async throws + + /// Send the elements of the asynchronous sequence to the channel. + /// + /// This method returns once the provided asynchronous sequence or the channel finished. + /// + /// - Important: This method does not finish the source if consuming the upstream sequence terminated. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + public func send(contentsOf sequence: S) async throws where Element == S.Element, S : AsyncSequence + + /// Indicates that the production terminated. + /// + /// After all buffered elements are consumed the next iteration point will return `nil` or throw an error. + /// + /// Calling this function more than once has no effect. After calling finish, the channel enters a terminal state and doesn't accept + /// new elements. + /// + /// - Parameters: + /// - error: The error to throw, or `nil`, to finish normally. + public func finish(throwing error: Failure? = nil) + } +} + +extension MultiProducerSingleConsumerChannel { + /// The asynchronous iterator for iterating the channel. + /// + /// This type is not `Sendable`. Don't use it from multiple + /// concurrent contexts. It is a programmer error to invoke `next()` from a + /// concurrent context that contends with another such call, which + /// results in a call to `fatalError()`. + public struct Iterator: AsyncIteratorProtocol {} + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator +} + +extension MultiProducerSingleConsumerChannel: Sendable where Element : Sendable {} +``` + +## Comparison to other root asynchronous sequences + +### swift-async-algorithm: AsyncChannel + +The `AsyncChannel` is a multi-consumer/multi-producer root asynchronous sequence +which can be used to communicate between two tasks. It only offers asynchronous +production APIs and has no internal buffer. This means that any producer will be +suspended until its value has been consumed. `AsyncChannel` can handle multiple +consumers and resumes them in FIFO order. + +### swift-nio: NIOAsyncSequenceProducer + +The NIO team have created their own root asynchronous sequence with the goal to +provide a high performance sequence that can be used to bridge a NIO `Channel` +inbound stream into Concurrency. The `NIOAsyncSequenceProducer` is a highly +generic and fully inlinable type and quite unwiedly to use. This proposal is +heavily inspired by the learnings from this type but tries to create a more +flexible and easier to use API that fits into the standard library. + +## Future directions + +### Adaptive backpressure strategy + +The high/low watermark strategy is common in networking code; however, there are +other strategies such as an adaptive strategy that we could offer in the future. +An adaptive strategy regulates the backpressure based on the rate of +consumption and production. With the proposed new APIs we can easily add further +strategies. + +## Alternatives considered + +### Provide the `onTermination` callback to the factory method + +During development of the new APIs, I first tried to provide the `onTermination` +callback in the `makeStream` method. However, that showed significant usability +problems in scenarios where one wants to store the source in a type and +reference `self` in the `onTermination` closure at the same time; hence, I kept +the current pattern of setting the `onTermination` closure on the source. + +### Provide a `onConsumerCancellation` callback + +During the pitch phase, it was raised that we should provide a +`onConsumerCancellation` callback which gets invoked once the asynchronous +stream notices that the consuming task got cancelled. This callback could be +used to customize how cancellation is handled by the stream e.g. one could +imagine writing a few more elements to the stream before finishing it. Right now +the stream immediately returns `nil` or throws a `CancellationError` when it +notices cancellation. This proposal decided to not provide this customization +because it opens up the possiblity that asynchronous streams are not terminating +when implemented incorrectly. Additionally, asynchronous sequences are not the +only place where task cancellation leads to an immediate error being thrown i.e. +`Task.sleep()` does the same. Hence, the value of the asynchronous not +terminating immediately brings little value when the next call in the iterating +task might throw. However, the implementation is flexible enough to add this in +the future and we can just default it to the current behaviour. + +### Create a custom type for the `Result` of the `onProduceMore` callback + +The `onProducerMore` callback takes a `Result` which is used to +indicate if the producer should produce more or if the asynchronous stream +finished. We could introduce a new type for this but the proposal decided +against it since it effectively is a result type. + +### Use an initializer instead of factory methods + +Instead of providing a `makeStream` factory method we could use an initializer +approach that takes a closure which gets the `Source` passed into. A similar API +has been offered with the `Continuation` based approach and +[SE-0388](https://github.com/apple/swift-evolution/blob/main/proposals/0388-async-stream-factory.md) +introduced new factory methods to solve some of the usability ergonomics with +the initializer based APIs. + +### Follow the `AsyncStream` & `AsyncThrowingStream` naming + +All other types that offer throwing and non-throwing variants are currently +following the naming scheme where the throwing variant gets an extra `Throwing` +in its name. Now that Swift is gaining typed throws support this would make the +type with the `Failure` parameter capable to express both throwing and +non-throwing variants. However, the less flexible type has the better name. +Hence, this proposal uses the good name for the throwing variant with the +potential in the future to deprecate the `AsyncNonThrowingBackpressuredStream` +in favour of adopting typed throws. + +## Acknowledgements + +- [Johannes Weiss](https://github.com/weissi) - For making me aware how +important this problem is and providing great ideas on how to shape the API. +- [Philippe Hausler](https://github.com/phausler) - For helping me designing the +APIs and continuously providing feedback +- [George Barnett](https://github.com/glbrntt) - For providing extensive code +reviews and testing the implementation. +- [Si Beaumont](https://github.com/simonjbeaumont) - For implementing the element size dependent strategy diff --git a/Package.swift b/Package.swift index 2932e199..c8b857fa 100644 --- a/Package.swift +++ b/Package.swift @@ -20,7 +20,10 @@ let package = Package( targets: [ .target( name: "AsyncAlgorithms", - dependencies: [.product(name: "Collections", package: "swift-collections")], + dependencies: [ + .product(name: "DequeModule", package: "swift-collections"), + .product(name: "OrderedCollections", package: "swift-collections"), + ], swiftSettings: [ .enableExperimentalFeature("StrictConcurrency=complete"), ] diff --git a/Sources/AsyncAlgorithms/Internal/_TinyArray.swift b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift new file mode 100644 index 00000000..07357ccb --- /dev/null +++ b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift @@ -0,0 +1,329 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftCertificates open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftCertificates project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftCertificates project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// ``_TinyArray`` is a ``RandomAccessCollection`` optimised to store zero or one ``Element``. +/// It supports arbitrary many elements but if only up to one ``Element`` is stored it does **not** allocate separate storage on the heap +/// and instead stores the ``Element`` inline. +@usableFromInline +struct _TinyArray { + @usableFromInline + enum Storage { + case one(Element) + case arbitrary([Element]) + } + + @usableFromInline + var storage: Storage +} + +// MARK: - TinyArray "public" interface + +extension _TinyArray: Equatable where Element: Equatable {} +extension _TinyArray: Hashable where Element: Hashable {} +extension _TinyArray: Sendable where Element: Sendable {} + +extension _TinyArray: RandomAccessCollection { + @usableFromInline + typealias Element = Element + + @usableFromInline + typealias Index = Int + + @inlinable + subscript(position: Int) -> Element { + get { + self.storage[position] + } + set { + self.storage[position] = newValue + } + } + + @inlinable + var startIndex: Int { + self.storage.startIndex + } + + @inlinable + var endIndex: Int { + self.storage.endIndex + } +} + +extension _TinyArray { + @inlinable + init(_ elements: some Sequence) { + self.storage = .init(elements) + } + + @inlinable + init() { + self.storage = .init() + } + + @inlinable + mutating func append(_ newElement: Element) { + self.storage.append(newElement) + } + + @inlinable + mutating func append(contentsOf newElements: some Sequence) { + self.storage.append(contentsOf: newElements) + } + + @discardableResult + @inlinable + mutating func remove(at index: Int) -> Element { + self.storage.remove(at: index) + } + + @inlinable + mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { + try self.storage.removeAll(where: shouldBeRemoved) + } + + @inlinable + mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { + try self.storage.sort(by: areInIncreasingOrder) + } +} + +// MARK: - TinyArray.Storage "private" implementation + +extension _TinyArray.Storage: Equatable where Element: Equatable { + @inlinable + static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.one(let lhs), .one(let rhs)): + return lhs == rhs + case (.arbitrary(let lhs), .arbitrary(let rhs)): + // we don't use lhs.elementsEqual(rhs) so we can hit the fast path from Array + // if both arrays share the same underlying storage: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1775 + return lhs == rhs + + case (.one(let element), .arbitrary(let array)), + (.arbitrary(let array), .one(let element)): + guard array.count == 1 else { + return false + } + return element == array[0] + + } + } +} +extension _TinyArray.Storage: Hashable where Element: Hashable { + @inlinable + func hash(into hasher: inout Hasher) { + // same strategy as Array: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1801 + hasher.combine(count) + for element in self { + hasher.combine(element) + } + } +} +extension _TinyArray.Storage: Sendable where Element: Sendable {} + +extension _TinyArray.Storage: RandomAccessCollection { + @inlinable + subscript(position: Int) -> Element { + get { + switch self { + case .one(let element): + guard position == 0 else { + fatalError("index \(position) out of bounds") + } + return element + case .arbitrary(let elements): + return elements[position] + } + } + set { + switch self { + case .one: + guard position == 0 else { + fatalError("index \(position) out of bounds") + } + self = .one(newValue) + case .arbitrary(var elements): + elements[position] = newValue + self = .arbitrary(elements) + } + } + } + + @inlinable + var startIndex: Int { + 0 + } + + @inlinable + var endIndex: Int { + switch self { + case .one: return 1 + case .arbitrary(let elements): return elements.endIndex + } + } +} + +extension _TinyArray.Storage { + @inlinable + init(_ elements: some Sequence) { + var iterator = elements.makeIterator() + guard let firstElement = iterator.next() else { + self = .arbitrary([]) + return + } + guard let secondElement = iterator.next() else { + // newElements just contains a single element + // and we hit the fast path + self = .one(firstElement) + return + } + + var elements: [Element] = [] + elements.reserveCapacity(elements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + while let nextElement = iterator.next() { + elements.append(nextElement) + } + self = .arbitrary(elements) + } + + @inlinable + init() { + self = .arbitrary([]) + } + + @inlinable + mutating func append(_ newElement: Element) { + self.append(contentsOf: CollectionOfOne(newElement)) + } + + @inlinable + mutating func append(contentsOf newElements: some Sequence) { + switch self { + case .one(let firstElement): + var iterator = newElements.makeIterator() + guard let secondElement = iterator.next() else { + // newElements is empty, nothing to do + return + } + var elements: [Element] = [] + elements.reserveCapacity(1 + newElements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + elements.appendRemainingElements(from: &iterator) + self = .arbitrary(elements) + + case .arbitrary(var elements): + if elements.isEmpty { + // if `self` is currently empty and `newElements` just contains a single + // element, we skip allocating an array and set `self` to `.one(firstElement)` + var iterator = newElements.makeIterator() + guard let firstElement = iterator.next() else { + // newElements is empty, nothing to do + return + } + guard let secondElement = iterator.next() else { + // newElements just contains a single element + // and we hit the fast path + self = .one(firstElement) + return + } + elements.reserveCapacity(elements.count + newElements.underestimatedCount) + elements.append(firstElement) + elements.append(secondElement) + elements.appendRemainingElements(from: &iterator) + self = .arbitrary(elements) + + } else { + elements.append(contentsOf: newElements) + self = .arbitrary(elements) + } + + } + } + + @discardableResult + @inlinable + mutating func remove(at index: Int) -> Element { + switch self { + case .one(let oldElement): + guard index == 0 else { + fatalError("index \(index) out of bounds") + } + self = .arbitrary([]) + return oldElement + + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + return elements.remove(at: index) + + } + } + + @inlinable + mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { + switch self { + case .one(let oldElement): + if try shouldBeRemoved(oldElement) { + self = .arbitrary([]) + } + + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + return try elements.removeAll(where: shouldBeRemoved) + + } + } + + @inlinable + mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { + switch self { + case .one: + // a collection of just one element is always sorted, nothing to do + break + case .arbitrary(var elements): + defer { + self = .arbitrary(elements) + } + + try elements.sort(by: areInIncreasingOrder) + } + } +} + +extension Array { + @inlinable + mutating func appendRemainingElements(from iterator: inout some IteratorProtocol) { + while let nextElement = iterator.next() { + append(nextElement) + } + } +} diff --git a/Sources/AsyncAlgorithms/Locking.swift b/Sources/AsyncAlgorithms/Locking.swift index 952b13c8..669c9b8c 100644 --- a/Sources/AsyncAlgorithms/Locking.swift +++ b/Sources/AsyncAlgorithms/Locking.swift @@ -17,24 +17,35 @@ import Glibc import WinSDK #endif -internal struct Lock { +@usableFromInline +internal class Lock { #if canImport(Darwin) + @usableFromInline typealias Primitive = os_unfair_lock #elseif canImport(Glibc) + @usableFromInline typealias Primitive = pthread_mutex_t #elseif canImport(WinSDK) + @usableFromInline typealias Primitive = SRWLOCK #else + @usableFromInline typealias Primitive = Int #endif + @usableFromInline typealias PlatformLock = UnsafeMutablePointer + @usableFromInline let platformLock: PlatformLock private init(_ platformLock: PlatformLock) { self.platformLock = platformLock } - + + deinit { + self.deinitialize() + } + fileprivate static func initialize(_ platformLock: PlatformLock) { #if canImport(Darwin) platformLock.initialize(to: os_unfair_lock()) @@ -54,7 +65,8 @@ internal struct Lock { platformLock.deinitialize(count: 1) } - fileprivate static func lock(_ platformLock: PlatformLock) { + @usableFromInline + static func lock(_ platformLock: PlatformLock) { #if canImport(Darwin) os_unfair_lock_lock(platformLock) #elseif canImport(Glibc) @@ -64,7 +76,8 @@ internal struct Lock { #endif } - fileprivate static func unlock(_ platformLock: PlatformLock) { + @usableFromInline + static func unlock(_ platformLock: PlatformLock) { #if canImport(Darwin) os_unfair_lock_unlock(platformLock) #elseif canImport(Glibc) @@ -85,10 +98,12 @@ internal struct Lock { Lock.deinitialize(platformLock) } + @inlinable func lock() { Lock.lock(platformLock) } + @inlinable func unlock() { Lock.unlock(platformLock) } @@ -101,6 +116,7 @@ internal struct Lock { /// /// - Parameter body: The block to execute while holding the lock. /// - Returns: The value returned by the block. + @inlinable func withLock(_ body: () throws -> T) rethrows -> T { self.lock() defer { @@ -115,14 +131,17 @@ internal struct Lock { } } +@usableFromInline struct ManagedCriticalState { - private final class LockedBuffer: ManagedBuffer { + @usableFromInline + final class LockedBuffer: ManagedBuffer { deinit { withUnsafeMutablePointerToElements { Lock.deinitialize($0) } } } - private let buffer: ManagedBuffer + @usableFromInline + let buffer: ManagedBuffer init(_ initial: State) { buffer = LockedBuffer.create(minimumCapacity: 1) { buffer in @@ -131,6 +150,7 @@ struct ManagedCriticalState { } } + @inlinable func withCriticalRegion(_ critical: (inout State) throws -> R) rethrows -> R { try buffer.withUnsafeMutablePointers { header, lock in Lock.lock(lock) diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift new file mode 100644 index 00000000..58b41ae1 --- /dev/null +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift @@ -0,0 +1,1409 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.0) +import DequeModule + +extension MultiProducerSingleConsumerChannel { + @usableFromInline + enum _InternalBackpressureStrategy: Sendable, CustomStringConvertible { + @usableFromInline + struct _Watermark: Sendable, CustomStringConvertible { + /// The low watermark where demand should start. + @usableFromInline + let _low: Int + + /// The high watermark where demand should be stopped. + @usableFromInline + let _high: Int + + /// The current watermark level. + @usableFromInline + var _currentWatermark: Int = 0 + + /// A closure that can be used to calculate the watermark impact of a single element + @usableFromInline + let _waterLevelForElement: (@Sendable (Element) -> Int)? + + @usableFromInline + var description: String { + "watermark(\(self._currentWatermark))" + } + + init(low: Int, high: Int, waterLevelForElement: (@Sendable (Element) -> Int)?) { + precondition(low <= high) + self._low = low + self._high = high + self._waterLevelForElement = waterLevelForElement + } + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + if let waterLevelForElement = self._waterLevelForElement { + self._currentWatermark += elements.reduce(0) { $0 + waterLevelForElement($1) } + } else { + self._currentWatermark += elements.count + } + precondition(self._currentWatermark >= 0) + // We are demanding more until we reach the high watermark + return self._currentWatermark < self._high + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + if let waterLevelForElement = self._waterLevelForElement { + self._currentWatermark -= waterLevelForElement(element) + } else { + self._currentWatermark -= 1 + } + precondition(self._currentWatermark >= 0) + // We start demanding again once we are below the low watermark + return self._currentWatermark < self._low + } + } + + @usableFromInline + struct _Unbounded: Sendable, CustomStringConvertible { + @usableFromInline + var description: String { + return "unbounded" + } + + init() { } + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + return true + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + return true + } + } + + /// A watermark based strategy. + case watermark(_Watermark) + /// An unbounded based strategy. + case unbounded(_Unbounded) + + @usableFromInline + var description: String { + switch consume self { + case .watermark(let strategy): + return strategy.description + case .unbounded(let unbounded): + return unbounded.description + } + } + + @inlinable + mutating func didSend(elements: Deque.SubSequence) -> Bool { + switch consume self { + case .watermark(var strategy): + let result = strategy.didSend(elements: elements) + self = .watermark(strategy) + return result + case .unbounded(var strategy): + let result = strategy.didSend(elements: elements) + self = .unbounded(strategy) + return result + } + } + + @inlinable + mutating func didConsume(element: Element) -> Bool { + switch consume self { + case .watermark(var strategy): + let result = strategy.didConsume(element: element) + self = .watermark(strategy) + return result + case .unbounded(var strategy): + let result = strategy.didConsume(element: element) + self = .unbounded(strategy) + return result + } + } + } +} + +extension MultiProducerSingleConsumerChannel { + @usableFromInline + final class _Storage { + @usableFromInline + let _lock = Lock.allocate() + /// The state machine + @usableFromInline + var _stateMachine: _StateMachine + + var onTermination: (@Sendable () -> Void)? { + set { + self._lock.withLockVoid { + self._stateMachine._onTermination = newValue + } + } + get { + self._lock.withLock { + self._stateMachine._onTermination + } + } + } + + init( + backpressureStrategy: _InternalBackpressureStrategy + ) { + self._stateMachine = .init(backpressureStrategy: backpressureStrategy) + } + + func sequenceDeinitialized() { + let action = self._lock.withLock { + self._stateMachine.sequenceDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + func iteratorInitialized() { + self._lock.withLockVoid { + self._stateMachine.iteratorInitialized() + } + } + + func iteratorDeinitialized() { + let action = self._lock.withLock { + self._stateMachine.iteratorDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + func sourceDeinitialized() { + let action = self._lock.withLock { + self._stateMachine.sourceDeinitialized() + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + + @inlinable + func send( + contentsOf sequence: some Sequence + ) throws -> MultiProducerSingleConsumerChannel.Source.SendResult { + let action = self._lock.withLock { + return self._stateMachine.send(sequence) + } + + switch action { + case .returnProduceMore: + return .produceMore + + case .returnEnqueue(let callbackToken): + return .enqueueCallback(.init(id: callbackToken)) + + case .resumeConsumerAndReturnProduceMore(let continuation, let element): + continuation.resume(returning: element) + return .produceMore + + case .resumeConsumerAndReturnEnqueue(let continuation, let element, let callbackToken): + continuation.resume(returning: element) + return .enqueueCallback(.init(id: callbackToken)) + + case .throwFinishedError: + throw MultiProducerSingleConsumerChannelAlreadyFinishedError() + } + } + + @inlinable + func enqueueProducer( + callbackToken: UInt64, + continuation: UnsafeContinuation + ) { + let action = self._lock.withLock { + self._stateMachine.enqueueContinuation(callbackToken: callbackToken, continuation: continuation) + } + + switch action { + case .resumeProducer(let continuation): + continuation.resume() + + case .resumeProducerWithError(let continuation, let error): + continuation.resume(throwing: error) + + case .none: + break + } + } + + @inlinable + func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) { + let action = self._lock.withLock { + self._stateMachine.enqueueProducer(callbackToken: callbackToken, onProduceMore: onProduceMore) + } + + switch action { + case .resumeProducer(let onProduceMore): + onProduceMore(Result.success(())) + + case .resumeProducerWithError(let onProduceMore, let error): + onProduceMore(Result.failure(error)) + + case .none: + break + } + } + + @inlinable + func cancelProducer( + callbackToken: UInt64 + ) { + let action = self._lock.withLock { + self._stateMachine.cancelProducer(callbackToken: callbackToken) + } + + switch action { + case .resumeProducerWithCancellationError(let onProduceMore): + switch onProduceMore { + case .closure(let onProduceMore): + onProduceMore(.failure(CancellationError())) + case .continuation(let continuation): + continuation.resume(throwing: CancellationError()) + } + + case .none: + break + } + } + + @inlinable + func finish(_ failure: Failure?) { + let action = self._lock.withLock { + self._stateMachine.finish(failure) + } + + switch action { + case .callOnTermination(let onTermination): + onTermination?() + + case .resumeConsumerAndCallOnTermination(let consumerContinuation, let failure, let onTermination): + switch failure { + case .some(let error): + consumerContinuation.resume(throwing: error) + case .none: + consumerContinuation.resume(returning: nil) + } + + onTermination?() + + case .resumeProducers(let producerContinuations): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + + case .none: + break + } + } + + @inlinable + func next(isolation actor: isolated (any Actor)?) async throws -> Element? { + let action = self._lock.withLock { + self._stateMachine.next() + } + + switch action { + case .returnElement(let element): + return element + + case .returnElementAndResumeProducers(let element, let producerContinuations): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.success(())) + case .continuation(let continuation): + continuation.resume() + } + } + + return element + + case .returnFailureAndCallOnTermination(let failure, let onTermination): + onTermination?() + switch failure { + case .some(let error): + throw error + + case .none: + return nil + } + + case .returnNil: + return nil + + case .suspendTask: + return try await self.suspendNext(isolation: actor) + } + } + + @inlinable + func suspendNext(isolation actor: isolated (any Actor)?) async throws -> Element? { + return try await withTaskCancellationHandler { + return try await withUnsafeThrowingContinuation { continuation in + let action = self._lock.withLock { + self._stateMachine.suspendNext(continuation: continuation) + } + + switch action { + case .resumeConsumerWithElement(let continuation, let element): + continuation.resume(returning: element) + + case .resumeConsumerWithElementAndProducers(let continuation, let element, let producerContinuations): + continuation.resume(returning: element) + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(CancellationError())) + case .continuation(let continuation): + continuation.resume() + } + } + + case .resumeConsumerWithFailureAndCallOnTermination(let continuation, let failure, let onTermination): + switch failure { + case .some(let error): + continuation.resume(throwing: error) + + case .none: + continuation.resume(returning: nil) + } + onTermination?() + + case .resumeConsumerWithNil(let continuation): + continuation.resume(returning: nil) + + case .none: + break + } + } + } onCancel: { + let action = self._lock.withLock { + self._stateMachine.cancelNext() + } + + switch action { + case .resumeConsumerWithNilAndCallOnTermination(let continuation, let onTermination): + continuation.resume(returning: nil) + onTermination?() + + case .failProducersAndCallOnTermination(let producerContinuations, let onTermination): + for producerContinuation in producerContinuations { + switch producerContinuation { + case .closure(let onProduceMore): + onProduceMore(.failure(MultiProducerSingleConsumerChannelAlreadyFinishedError())) + case .continuation(let continuation): + continuation.resume(throwing: MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + onTermination?() + + case .none: + break + } + } + } + } +} + +extension MultiProducerSingleConsumerChannel._Storage { + /// The state machine of the channel. + @usableFromInline + struct _StateMachine: ~Copyable { + /// The state machine's current state. + @usableFromInline + var _state: _State + + @inlinable + var _onTermination: (@Sendable () -> Void)? { + set { + switch consume self._state { + case .channeling(var channeling): + channeling.onTermination = newValue + self = .init(state: .channeling(channeling)) + + case .sourceFinished(var sourceFinished): + sourceFinished.onTermination = newValue + self = .init(state: .sourceFinished(sourceFinished)) + + case .finished(let finished): + self = .init(state: .finished(finished)) + } + } + get { + switch self._state { + case .channeling(let channeling): + return channeling.onTermination + + case .sourceFinished(let sourceFinished): + return sourceFinished.onTermination + + case .finished: + return nil + } + } + } + + init( + backpressureStrategy: MultiProducerSingleConsumerChannel._InternalBackpressureStrategy + ) { + self._state = .channeling( + .init( + backpressureStrategy: backpressureStrategy, + iteratorInitialized: false, + buffer: .init(), + producerContinuations: .init(), + cancelledAsyncProducers: .init(), + hasOutstandingDemand: true, + activeProducers: 1, + nextCallbackTokenID: 0 + ) + ) + } + + @inlinable + init(state: consuming _State) { + self._state = state + } + + /// Actions returned by `sourceDeinitialized()`. + @usableFromInline + enum SourceDeinitializedAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((@Sendable () -> Void)?) + /// Indicates that all producers should be failed and `onTermination` should be called. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (@Sendable () -> Void)? + ) + } + + @inlinable + mutating func sourceDeinitialized() -> SourceDeinitializedAction? { + switch consume self._state { + case .channeling(var channeling): + channeling.activeProducers -= 1 + + if channeling.activeProducers == 0 { + // This was the last producer so we can transition to source finished now + + self = .init(state: .sourceFinished(.init( + iteratorInitialized: channeling.iteratorInitialized, + buffer: channeling.buffer + ))) + + if channeling.suspendedProducers.isEmpty { + return .callOnTermination(channeling.onTermination) + } else { + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + } else { + // We still have more producers + self = .init(state: .channeling(channeling)) + + return nil + } + case .sourceFinished(let sourceFinished): + // This can happen if one producer calls finish and another deinits afterwards + self = .init(state: .sourceFinished(sourceFinished)) + + return nil + case .finished(let finished): + // This can happen if the consumer finishes and the producers deinit + self = .init(state: .finished(finished)) + + return nil + } + } + + /// Actions returned by `sequenceDeinitialized()`. + @usableFromInline + enum SequenceDeinitializedAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((@Sendable () -> Void)?) + /// Indicates that all producers should be failed and `onTermination` should be called. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (@Sendable () -> Void)? + ) + } + + @inlinable + mutating func sequenceDeinitialized() -> SequenceDeinitializedAction? { + switch consume self._state { + case .channeling(let channeling): + guard channeling.iteratorInitialized else { + // No iterator was created so we can transition to finished right away. + self = .init(state: .finished(.init(iteratorInitialized: false, sourceFinished: false))) + + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + self = .init(state: .channeling(channeling)) + + return .none + + case .sourceFinished(let sourceFinished): + guard sourceFinished.iteratorInitialized else { + // No iterator was created so we can transition to finished right away. + self = .init(state: .finished(.init(iteratorInitialized: false, sourceFinished: true))) + + return .callOnTermination(sourceFinished.onTermination) + } + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + self = .init(state: .finished(finished)) + + return .none + } + } + + @inlinable + mutating func iteratorInitialized() { + switch consume self._state { + case .channeling(var channeling): + if channeling.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + // The first and only iterator was initialized. + channeling.iteratorInitialized = true + self = .init(state: .channeling(channeling)) + } + + case .sourceFinished(var sourceFinished): + if sourceFinished.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + // The first and only iterator was initialized. + sourceFinished.iteratorInitialized = true + self = .init(state: .sourceFinished(sourceFinished)) + } + + case .finished(let finished): + if finished.iteratorInitialized { + // Our sequence is a unicast sequence and does not support multiple AsyncIterator's + fatalError("Only a single AsyncIterator can be created") + } else { + self = .init(state: .finished(.init(iteratorInitialized: true, sourceFinished: finished.sourceFinished))) + } + } + } + + /// Actions returned by `iteratorDeinitialized()`. + @usableFromInline + enum IteratorDeinitializedAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((@Sendable () -> Void)?) + /// Indicates that all producers should be failed and `onTermination` should be called. + case failProducersAndCallOnTermination( + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, + (@Sendable () -> Void)? + ) + } + + @inlinable + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch consume self._state { + case .channeling(let channeling): + if channeling.iteratorInitialized { + // An iterator was created and deinited. Since we only support + // a single iterator we can now transition to finish. + self = .init(state: .finished(.init(iteratorInitialized: true, sourceFinished: false))) + + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } else { + // An iterator needs to be initialized before it can be deinitialized. + fatalError("MultiProducerSingleConsumerChannel internal inconsistency") + } + + case .sourceFinished(let sourceFinished): + if sourceFinished.iteratorInitialized { + // An iterator was created and deinited. Since we only support + // a single iterator we can now transition to finish. + self = .init(state: .finished(.init(iteratorInitialized: true, sourceFinished: true))) + + return .callOnTermination(sourceFinished.onTermination) + } else { + // An iterator needs to be initialized before it can be deinitialized. + fatalError("MultiProducerSingleConsumerChannel internal inconsistency") + } + + case .finished(let finished): + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + self = .init(state: .finished(finished)) + + return .none + } + } + + /// Actions returned by `send()`. + @usableFromInline + enum SendAction { + /// Indicates that the producer should be notified to produce more. + case returnProduceMore + /// Indicates that the producer should be suspended to stop producing. + case returnEnqueue( + callbackToken: UInt64 + ) + /// Indicates that the consumer should be resumed and the producer should be notified to produce more. + case resumeConsumerAndReturnProduceMore( + continuation: UnsafeContinuation, + element: Element + ) + /// Indicates that the consumer should be resumed and the producer should be suspended. + case resumeConsumerAndReturnEnqueue( + continuation: UnsafeContinuation, + element: Element, + callbackToken: UInt64 + ) + /// Indicates that the producer has been finished. + case throwFinishedError + + @inlinable + init( + callbackToken: UInt64?, + continuationAndElement: (UnsafeContinuation, Element)? = nil + ) { + switch (callbackToken, continuationAndElement) { + case (.none, .none): + self = .returnProduceMore + + case (.some(let callbackToken), .none): + self = .returnEnqueue(callbackToken: callbackToken) + + case (.none, .some((let continuation, let element))): + self = .resumeConsumerAndReturnProduceMore( + continuation: continuation, + element: element + ) + + case (.some(let callbackToken), .some((let continuation, let element))): + self = .resumeConsumerAndReturnEnqueue( + continuation: continuation, + element: element, + callbackToken: callbackToken + ) + } + } + } + + @inlinable + mutating func send(_ sequence: some Sequence) -> SendAction { + switch consume self._state { + case .channeling(var channeling): + // We have an element and can resume the continuation + let bufferEndIndexBeforeAppend = channeling.buffer.endIndex + channeling.buffer.append(contentsOf: sequence) + var shouldProduceMore = channeling.backpressureStrategy.didSend( + elements: channeling.buffer[bufferEndIndexBeforeAppend...] + ) + channeling.hasOutstandingDemand = shouldProduceMore + + guard let consumerContinuation = channeling.consumerContinuation else { + // We don't have a suspended consumer so we just buffer the elements + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init( + callbackToken: callbackToken + ) + } + guard let element = channeling.buffer.popFirst() else { + // We got a send of an empty sequence. We just tolerate this. + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init(callbackToken: callbackToken) + } + // We need to tell the back pressure strategy that we consumed + shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + // We got a consumer continuation and an element. We can resume the consumer now + channeling.consumerContinuation = nil + let callbackToken = shouldProduceMore ? nil : channeling.nextCallbackToken() + self = .init(state: .channeling(channeling)) + + return .init( + callbackToken: callbackToken, + continuationAndElement: (consumerContinuation, element) + ) + + case .sourceFinished(let sourceFinished): + // If the source has finished we are dropping the elements. + self = .init(state: .sourceFinished(sourceFinished)) + + return .throwFinishedError + + case .finished(let finished): + // If the source has finished we are dropping the elements. + self = .init(state: .finished(finished)) + + return .throwFinishedError + } + } + + /// Actions returned by `enqueueProducer()`. + @usableFromInline + enum EnqueueProducerAction { + /// Indicates that the producer should be notified to produce more. + case resumeProducer((Result) -> Void) + /// Indicates that the producer should be notified about an error. + case resumeProducerWithError((Result) -> Void, Error) + } + + @inlinable + mutating func enqueueProducer( + callbackToken: UInt64, + onProduceMore: sending @escaping (Result) -> Void + ) -> EnqueueProducerAction? { + switch consume self._state { + case .channeling(var channeling): + if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { + // Our producer got marked as cancelled. + channeling.cancelledAsyncProducers.remove(at: index) + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithError(onProduceMore, CancellationError()) + } else if channeling.hasOutstandingDemand { + // We hit an edge case here where we wrote but the consuming thread got interleaved + self = .init(state: .channeling(channeling)) + + return .resumeProducer(onProduceMore) + } else { + channeling.suspendedProducers.append((callbackToken, .closure(onProduceMore))) + self = .init(state: .channeling(channeling)) + + return .none + } + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerChannelAlreadyFinishedError()) + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .resumeProducerWithError(onProduceMore, MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + + /// Actions returned by `enqueueContinuation()`. + @usableFromInline + enum EnqueueContinuationAction { + /// Indicates that the producer should be notified to produce more. + case resumeProducer(UnsafeContinuation) + /// Indicates that the producer should be notified about an error. + case resumeProducerWithError(UnsafeContinuation, Error) + } + + @inlinable + mutating func enqueueContinuation( + callbackToken: UInt64, + continuation: UnsafeContinuation + ) -> EnqueueContinuationAction? { + switch consume self._state { + case .channeling(var channeling): + if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { + // Our producer got marked as cancelled. + channeling.cancelledAsyncProducers.remove(at: index) + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithError(continuation, CancellationError()) + } else if channeling.hasOutstandingDemand { + // We hit an edge case here where we wrote but the consuming thread got interleaved + self = .init(state: .channeling(channeling)) + + return .resumeProducer(continuation) + } else { + channeling.suspendedProducers.append((callbackToken, .continuation(continuation))) + self = .init(state: .channeling(channeling)) + + return .none + } + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeProducerWithError(continuation, MultiProducerSingleConsumerChannelAlreadyFinishedError()) + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .resumeProducerWithError(continuation, MultiProducerSingleConsumerChannelAlreadyFinishedError()) + } + } + + /// Actions returned by `cancelProducer()`. + @usableFromInline + enum CancelProducerAction { + /// Indicates that the producer should be notified about cancellation. + case resumeProducerWithCancellationError(_MultiProducerSingleConsumerSuspendedProducer) + } + + @inlinable + mutating func cancelProducer( + callbackToken: UInt64 + ) -> CancelProducerAction? { + switch consume self._state { + case .channeling(var channeling): + guard let index = channeling.suspendedProducers.firstIndex(where: { $0.0 == callbackToken }) else { + // The task that sends was cancelled before sending elements so the cancellation handler + // got invoked right away + channeling.cancelledAsyncProducers.append(callbackToken) + self = .init(state: .channeling(channeling)) + + return .none + } + // We have an enqueued producer that we need to resume now + let continuation = channeling.suspendedProducers.remove(at: index).1 + self = .init(state: .channeling(channeling)) + + return .resumeProducerWithCancellationError(continuation) + + case .sourceFinished(let sourceFinished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + // Since we are unlocking between sending elements and suspending the send + // It can happen that the source got finished or the consumption fully finishes. + self = .init(state: .finished(finished)) + + return .none + } + } + + /// Actions returned by `finish()`. + @usableFromInline + enum FinishAction { + /// Indicates that `onTermination` should be called. + case callOnTermination((() -> Void)?) + /// Indicates that the consumer should be resumed with the failure, the producers + /// should be resumed with an error and `onTermination` should be called. + case resumeConsumerAndCallOnTermination( + consumerContinuation: UnsafeContinuation, + failure: Failure?, + onTermination: (() -> Void)? + ) + /// Indicates that the producers should be resumed with an error. + case resumeProducers( + producerContinuations: _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + ) + } + + @inlinable + mutating func finish(_ failure: Failure?) -> FinishAction? { + switch consume self._state { + case .channeling(let channeling): + guard let consumerContinuation = channeling.consumerContinuation else { + // We don't have a suspended consumer so we are just going to mark + // the source as finished and terminate the current suspended producers. + self = .init(state: .sourceFinished( + .init( + iteratorInitialized: channeling.iteratorInitialized, + buffer: channeling.buffer, + failure: failure, + onTermination: channeling.onTermination + )) + ) + + return .resumeProducers(producerContinuations: .init(channeling.suspendedProducers.lazy.map { $0.1 })) + } + // We have a continuation, this means our buffer must be empty + // Furthermore, we can now transition to finished + // and resume the continuation with the failure + precondition(channeling.buffer.isEmpty, "Expected an empty buffer") + + self = .init(state: .finished(.init(iteratorInitialized: channeling.iteratorInitialized, sourceFinished: true))) + + return .resumeConsumerAndCallOnTermination( + consumerContinuation: consumerContinuation, + failure: failure, + onTermination: channeling.onTermination + ) + + case .sourceFinished(let sourceFinished): + // If the source has finished, finishing again has no effect. + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(var finished): + finished.sourceFinished = true + self = .init(state: .finished(finished)) + return .none + } + } + + /// Actions returned by `next()`. + @usableFromInline + enum NextAction { + /// Indicates that the element should be returned to the caller. + case returnElement(Element) + /// Indicates that the element should be returned to the caller and that all producers should be called. + case returnElementAndResumeProducers(Element, _TinyArray<_MultiProducerSingleConsumerSuspendedProducer>) + /// Indicates that the `Failure` should be returned to the caller and that `onTermination` should be called. + case returnFailureAndCallOnTermination(Failure?, (() -> Void)?) + /// Indicates that the `nil` should be returned to the caller. + case returnNil + /// Indicates that the `Task` of the caller should be suspended. + case suspendTask + } + + @inlinable + mutating func next() -> NextAction { + switch consume self._state { + case .channeling(var channeling): + guard channeling.consumerContinuation == nil else { + // We have multiple AsyncIterators iterating the sequence + fatalError("MultiProducerSingleConsumerChannel internal inconsistency") + } + + guard let element = channeling.buffer.popFirst() else { + // There is nothing in the buffer to fulfil the demand so we need to suspend. + // We are not interacting with the backpressure strategy here because + // we are doing this inside `suspendNext` + self = .init(state: .channeling(channeling)) + + return .suspendTask + } + // We have an element to fulfil the demand right away. + let shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + guard shouldProduceMore else { + // We don't have any new demand, so we can just return the element. + self = .init(state: .channeling(channeling)) + + return .returnElement(element) + } + // There is demand and we have to resume our producers + let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) + channeling.suspendedProducers.removeAll(keepingCapacity: true) + self = .init(state: .channeling(channeling)) + + return .returnElementAndResumeProducers(element, producers) + + case .sourceFinished(var sourceFinished): + // Check if we have an element left in the buffer and return it + guard let element = sourceFinished.buffer.popFirst() else { + // We are returning the queued failure now and can transition to finished + self = .init(state: .finished(.init(iteratorInitialized: sourceFinished.iteratorInitialized, sourceFinished: true))) + + return .returnFailureAndCallOnTermination(sourceFinished.failure, sourceFinished.onTermination) + } + self = .init(state: .sourceFinished(sourceFinished)) + + return .returnElement(element) + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .returnNil + } + } + + /// Actions returned by `suspendNext()`. + @usableFromInline + enum SuspendNextAction { + /// Indicates that the consumer should be resumed. + case resumeConsumerWithElement(UnsafeContinuation, Element) + /// Indicates that the consumer and all producers should be resumed. + case resumeConsumerWithElementAndProducers( + UnsafeContinuation, + Element, + _TinyArray<_MultiProducerSingleConsumerSuspendedProducer> + ) + /// Indicates that the consumer should be resumed with the failure and that `onTermination` should be called. + case resumeConsumerWithFailureAndCallOnTermination( + UnsafeContinuation, + Failure?, + (() -> Void)? + ) + /// Indicates that the consumer should be resumed with `nil`. + case resumeConsumerWithNil(UnsafeContinuation) + } + + @inlinable + mutating func suspendNext(continuation: UnsafeContinuation) -> SuspendNextAction? { + switch consume self._state { + case .channeling(var channeling): + guard channeling.consumerContinuation == nil else { + // We have multiple AsyncIterators iterating the sequence + fatalError("MultiProducerSingleConsumerChannel internal inconsistency") + } + + // We have to check here again since we might have a producer interleave next and suspendNext + guard let element = channeling.buffer.popFirst() else { + // There is nothing in the buffer to fulfil the demand so we to store the continuation. + channeling.consumerContinuation = continuation + self = .init(state: .channeling(channeling)) + + return .none + } + // We have an element to fulfil the demand right away. + + let shouldProduceMore = channeling.backpressureStrategy.didConsume(element: element) + channeling.hasOutstandingDemand = shouldProduceMore + + guard shouldProduceMore else { + // We don't have any new demand, so we can just return the element. + self = .init(state: .channeling(channeling)) + + return .resumeConsumerWithElement(continuation, element) + } + // There is demand and we have to resume our producers + let producers = _TinyArray(channeling.suspendedProducers.lazy.map { $0.1 }) + channeling.suspendedProducers.removeAll(keepingCapacity: true) + self = .init(state: .channeling(channeling)) + + return .resumeConsumerWithElementAndProducers(continuation, element, producers) + + case .sourceFinished(var sourceFinished): + // Check if we have an element left in the buffer and return it + guard let element = sourceFinished.buffer.popFirst() else { + // We are returning the queued failure now and can transition to finished + self = .init(state: .finished(.init(iteratorInitialized: sourceFinished.iteratorInitialized, sourceFinished: true))) + + return .resumeConsumerWithFailureAndCallOnTermination( + continuation, + sourceFinished.failure, + sourceFinished.onTermination + ) + } + self = .init(state: .sourceFinished(sourceFinished)) + + return .resumeConsumerWithElement(continuation, element) + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .resumeConsumerWithNil(continuation) + } + } + + /// Actions returned by `cancelNext()`. + @usableFromInline + enum CancelNextAction { + /// Indicates that the continuation should be resumed with nil, the producers should be finished and call onTermination. + case resumeConsumerWithNilAndCallOnTermination(UnsafeContinuation, (() -> Void)?) + /// Indicates that the producers should be finished and call onTermination. + case failProducersAndCallOnTermination(_TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, (() -> Void)?) + } + + @inlinable + mutating func cancelNext() -> CancelNextAction? { + switch consume self._state { + case .channeling(let channeling): + self = .init(state: .finished(.init(iteratorInitialized: channeling.iteratorInitialized, sourceFinished: false))) + + guard let consumerContinuation = channeling.consumerContinuation else { + return .failProducersAndCallOnTermination( + .init(channeling.suspendedProducers.lazy.map { $0.1 }), + channeling.onTermination + ) + } + precondition( + channeling.suspendedProducers.isEmpty, + "Internal inconsistency. Unexpected producer continuations." + ) + return .resumeConsumerWithNilAndCallOnTermination( + consumerContinuation, + channeling.onTermination + ) + + case .sourceFinished(let sourceFinished): + self = .init(state: .sourceFinished(sourceFinished)) + + return .none + + case .finished(let finished): + self = .init(state: .finished(finished)) + + return .none + } + } + } +} + +extension MultiProducerSingleConsumerChannel._Storage._StateMachine { + @usableFromInline + enum _State: ~Copyable { + @usableFromInline + struct Channeling: ~Copyable { + /// The backpressure strategy. + @usableFromInline + var backpressureStrategy: MultiProducerSingleConsumerChannel._InternalBackpressureStrategy + + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// The onTermination callback. + @usableFromInline + var onTermination: (@Sendable () -> Void)? + + /// The buffer of elements. + @usableFromInline + var buffer: Deque + + /// The optional consumer continuation. + @usableFromInline + var consumerContinuation: UnsafeContinuation? + + /// The producer continuations. + @usableFromInline + var suspendedProducers: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)> + + /// The producers that have been cancelled. + @usableFromInline + var cancelledAsyncProducers: Deque + + /// Indicates if we currently have outstanding demand. + @usableFromInline + var hasOutstandingDemand: Bool + + /// The number of active producers. + @usableFromInline + var activeProducers: UInt64 + + /// The next callback token. + @usableFromInline + var nextCallbackTokenID: UInt64 + + var description: String { + "backpressure:\(self.backpressureStrategy.description) iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) consumerContinuation:\(self.consumerContinuation == nil) producerContinuations:\(self.suspendedProducers.count) cancelledProducers:\(self.cancelledAsyncProducers.count) hasOutstandingDemand:\(self.hasOutstandingDemand)" + } + + @inlinable + init( + backpressureStrategy: MultiProducerSingleConsumerChannel._InternalBackpressureStrategy, + iteratorInitialized: Bool, + onTermination: (@Sendable () -> Void)? = nil, + buffer: Deque, + consumerContinuation: UnsafeContinuation? = nil, + producerContinuations: Deque<(UInt64, _MultiProducerSingleConsumerSuspendedProducer)>, + cancelledAsyncProducers: Deque, + hasOutstandingDemand: Bool, + activeProducers: UInt64, + nextCallbackTokenID: UInt64 + ) { + self.backpressureStrategy = backpressureStrategy + self.iteratorInitialized = iteratorInitialized + self.onTermination = onTermination + self.buffer = buffer + self.consumerContinuation = consumerContinuation + self.suspendedProducers = producerContinuations + self.cancelledAsyncProducers = cancelledAsyncProducers + self.hasOutstandingDemand = hasOutstandingDemand + self.activeProducers = activeProducers + self.nextCallbackTokenID = nextCallbackTokenID + } + + /// Generates the next callback token. + @inlinable + mutating func nextCallbackToken() -> UInt64 { + let id = self.nextCallbackTokenID + self.nextCallbackTokenID += 1 + return id + } + } + + @usableFromInline + struct SourceFinished: ~Copyable { + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// The buffer of elements. + @usableFromInline + var buffer: Deque + + /// The failure that should be thrown after the last element has been consumed. + @usableFromInline + var failure: Failure? + + /// The onTermination callback. + @usableFromInline + var onTermination: (@Sendable () -> Void)? + + var description: String { + "iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) failure:\(self.failure == nil)" + } + + @inlinable + init( + iteratorInitialized: Bool, + buffer: Deque, + failure: Failure? = nil, + onTermination: (@Sendable () -> Void)? = nil + ) { + self.iteratorInitialized = iteratorInitialized + self.buffer = buffer + self.failure = failure + self.onTermination = onTermination + } + } + + @usableFromInline + struct Finished: ~Copyable { + /// Indicates if the iterator was initialized. + @usableFromInline + var iteratorInitialized: Bool + + /// Indicates if the source was finished. + @usableFromInline + var sourceFinished: Bool + + var description: String { + "iteratorInitialized:\(self.iteratorInitialized) sourceFinished:\(self.sourceFinished)" + } + + @inlinable + init( + iteratorInitialized: Bool, + sourceFinished: Bool + ) { + self.iteratorInitialized = iteratorInitialized + self.sourceFinished = sourceFinished + } + } + + /// The state once either any element was sent or `next()` was called. + case channeling(Channeling) + + /// The state once the underlying source signalled that it is finished. + case sourceFinished(SourceFinished) + + /// The state once there can be no outstanding demand. This can happen if: + /// 1. The iterator was deinited + /// 2. The underlying source finished and all buffered elements have been consumed + case finished(Finished) + + @usableFromInline + var description: String { + switch self { + case .channeling(let channeling): + return "channeling \(channeling.description)" + case .sourceFinished(let sourceFinished): + return "sourceFinished \(sourceFinished.description)" + case .finished(let finished): + return "finished \(finished.description)" + } + } + } +} + +@usableFromInline +enum _MultiProducerSingleConsumerSuspendedProducer { + case closure((Result) -> Void) + case continuation(UnsafeContinuation) +} +#endif diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift new file mode 100644 index 00000000..5e860a89 --- /dev/null +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift @@ -0,0 +1,489 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.0) +/// An error that is thrown from the various `send` methods of the +/// ``MultiProducerSingleConsumerChannel/Source``. +/// +/// This error is thrown when the channel is already finished when +/// trying to send new elements to the source. +public struct MultiProducerSingleConsumerChannelAlreadyFinishedError: Error { + @usableFromInline + init() {} +} + +/// A multi producer single consumer channel. +/// +/// The ``MultiProducerSingleConsumerChannel`` provides a ``MultiProducerSingleConsumerChannel/Source`` to +/// send values to the channel. The source exposes the internal backpressure of the asynchronous sequence to the +/// producer. Additionally, the source can be used from synchronous and asynchronous contexts. +/// +/// +/// ## Using a MultiProducerSingleConsumerChannel +/// +/// To use a ``MultiProducerSingleConsumerChannel`` you have to create a new channel with it's source first by calling +/// the ``MultiProducerSingleConsumerChannel/makeChannel(of:throwing:BackpressureStrategy:)`` method. +/// Afterwards, you can pass the source to the producer and the channel to the consumer. +/// +/// ``` +/// let (channel, source) = MultiProducerSingleConsumerChannel.makeChannel( +/// backpressureStrategy: .watermark(low: 2, high: 4) +/// ) +/// ``` +/// +/// ### Asynchronous producers +/// +/// Values can be send to the source from asynchronous contexts using ``MultiProducerSingleConsumerChannel/Source/send(_:)-9b5do`` +/// and ``MultiProducerSingleConsumerChannel/Source/send(contentsOf:)-4myrz``. Backpressure results in calls +/// to the `send` methods to be suspended. Once more elements should be produced the `send` methods will be resumed. +/// +/// ``` +/// try await withThrowingTaskGroup(of: Void.self) { group in +/// group.addTask { +/// try await source.send(1) +/// try await source.send(2) +/// try await source.send(3) +/// } +/// +/// for await element in channel { +/// print(element) +/// } +/// } +/// ``` +/// +/// ### Synchronous producers +/// +/// Values can also be send to the source from synchronous context. Backpressure is also exposed on the synchronous contexts; however, +/// it is up to the caller to decide how to properly translate the backpressure to underlying producer e.g. by blocking the thread. +/// +/// ## Finishing the source +/// +/// To properly notify the consumer if the production of values has been finished the source's ``MultiProducerSingleConsumerChannel/Source/finish(throwing:)`` **must** be called. +public struct MultiProducerSingleConsumerChannel: AsyncSequence { + /// A private class to give the ``MultiProducerSingleConsumerChannel`` a deinit so we + /// can tell the producer when any potential consumer went away. + private final class _Backing: Sendable { + /// The underlying storage. + fileprivate let storage: _Storage + + init(storage: _Storage) { + self.storage = storage + } + + deinit { + storage.sequenceDeinitialized() + } + } + + /// The backing storage. + private let backing: _Backing + + @frozen + public struct ChannelAndStream: ~Copyable { + public var channel: MultiProducerSingleConsumerChannel + public var source: Source + + public init( + channel: MultiProducerSingleConsumerChannel, + source: consuming Source + ) { + self.channel = channel + self.source = source + } + } + + /// Initializes a new ``MultiProducerSingleConsumerChannel`` and an ``MultiProducerSingleConsumerChannel/Source``. + /// + /// - Parameters: + /// - elementType: The element type of the channel. + /// - failureType: The failure type of the channel. + /// - BackpressureStrategy: The backpressure strategy that the channel should use. + /// - Returns: A tuple containing the channel and its source. The source should be passed to the + /// producer while the channel should be passed to the consumer. + public static func makeChannel( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Never.self, + backpressureStrategy: Source.BackpressureStrategy + ) -> ChannelAndStream { + let storage = _Storage( + backpressureStrategy: backpressureStrategy.internalBackpressureStrategy + ) + let source = Source(storage: storage) + + return .init(channel: .init(storage: storage), source: source) + } + + init(storage: _Storage) { + self.backing = .init(storage: storage) + } +} + +extension MultiProducerSingleConsumerChannel { + /// A struct to send values to the channel. + /// + /// Use this source to provide elements to the channel by calling one of the `send` methods. + /// + /// - Important: You must terminate the source by calling ``finish(throwing:)``. + public struct Source: ~Copyable, Sendable { + /// A strategy that handles the backpressure of the channel. + public struct BackpressureStrategy: Sendable { + var internalBackpressureStrategy: _InternalBackpressureStrategy + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + public static func watermark(low: Int, high: Int) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: nil) + ) + ) + } + + /// A backpressure strategy using a high and low watermark to suspend and resume production respectively. + /// + /// - Parameters: + /// - low: When the number of buffered elements drops below the low watermark, producers will be resumed. + /// - high: When the number of buffered elements rises above the high watermark, producers will be suspended. + /// - waterLevelForElement: A closure used to compute the contribution of each buffered element to the current water level. + /// + /// - Note, `waterLevelForElement` will be called on each element when it is written into the source and when + /// it is consumed from the channel, so it is recommended to provide an function that runs in constant time. + public static func watermark( + low: Int, + high: Int, + waterLevelForElement: @escaping @Sendable (Element) -> Int // TODO: In the future this should become sending + ) -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .watermark( + .init(low: low, high: high, waterLevelForElement: waterLevelForElement) + ) + ) + } + + /// An unbounded backpressure strategy. + /// + /// - Important: Only use this strategy if the production of elements is limited through some other mean. Otherwise + /// an unbounded backpressure strategy can result in infinite memory usage and open your application to denial of service + /// attacks. + public static func unbounded() -> BackpressureStrategy { + .init( + internalBackpressureStrategy: .unbounded(.init()) + ) + } + } + + /// A type that indicates the result of sending elements to the source. + public enum SendResult: ~Copyable, Sendable { + /// A token that is returned when the channel's backpressure strategy indicated that production should + /// be suspended. Use this token to enqueue a callback by calling the ``enqueueCallback(_:)`` method. + public struct CallbackToken: Sendable { + @usableFromInline + let _id: UInt64 + + @usableFromInline + init(id: UInt64) { + self._id = id + } + } + + /// Indicates that more elements should be produced and written to the source. + case produceMore + + /// Indicates that a callback should be enqueued. + /// + /// The associated token should be passed to the ``enqueueCallback(_:)`` method. + case enqueueCallback(CallbackToken) + } + + + /// A callback to invoke when the channel finished. + /// + /// The channel finishes and calls this closure in the following cases: + /// - No iterator was created and the sequence was deinited + /// - An iterator was created and deinited + /// - After ``finish(throwing:)`` was called and all elements have been consumed + public var onTermination: (@Sendable () -> Void)? { + set { + self._storage.onTermination = newValue + } + get { + self._storage.onTermination + } + } + + @usableFromInline + let _storage: _Storage + + internal init(storage: _Storage) { + self._storage = storage + } + + deinit { + self._storage.sourceDeinitialized() + } + + + /// Creates a new source which can be used to send elements to the channel concurrently. + /// + /// The channel will only automatically be finished if all existing sources have been deinited. + /// + /// - Returns: A new source for sending elements to the channel. + public mutating func copy() -> Self { + .init(storage: self._storage) + } + + /// Sends new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter sequence: The elements to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + @inlinable + public mutating func send(contentsOf sequence: sending S) throws -> SendResult where Element == S.Element, S: Sequence { + try self._storage.send(contentsOf: sequence) + } + + /// Send the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// - Parameter element: The element to send to the channel. + /// - Returns: The result that indicates if more elements should be produced at this time. + @inlinable + public mutating func send(_ element: sending Element) throws -> SendResult { + try self._storage.send(contentsOf: CollectionOfOne(element)) + } + + /// Enqueues a callback that will be invoked once more elements should be produced. + /// + /// Call this method after ``send(contentsOf:)-5honm`` or ``send(_:)-3jxzb`` returned ``SendResult/enqueueCallback(_:)``. + /// + /// - Important: Enqueueing the same token multiple times is not allowed. + /// + /// - Parameters: + /// - callbackToken: The callback token. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. + @inlinable + public mutating func enqueueCallback( + callbackToken: consuming SendResult.CallbackToken, + onProduceMore: sending @escaping (Result) -> Void + ) { + self._storage.enqueueProducer(callbackToken: callbackToken._id, onProduceMore: onProduceMore) + } + + /// Cancel an enqueued callback. + /// + /// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method. + /// + /// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and + /// will mark the passed `callbackToken` as cancelled. + /// + /// - Parameter callbackToken: The callback token. + @inlinable + public mutating func cancelCallback(callbackToken: consuming SendResult.CallbackToken) { + self._storage.cancelProducer(callbackToken: callbackToken._id) + } + + /// Send new elements to the channel and provide a callback which will be invoked once more elements should be produced. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(contentsOf:onProduceMore:)``. + @inlinable + public mutating func send( + contentsOf sequence: sending S, + onProduceMore: @escaping @Sendable (Result) -> Void + ) where Element == S.Element, S: Sequence { + do { + let sendResult = try self.send(contentsOf: sequence) + + switch consume sendResult { + case .produceMore: + onProduceMore(Result.success(())) + + case .enqueueCallback(let callbackToken): + self.enqueueCallback(callbackToken: callbackToken, onProduceMore: onProduceMore) + } + } catch { + onProduceMore(.failure(error)) + } + } + + /// Sends the element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then `onProduceMore` will be invoked with + /// a `Result.failure`. + /// + /// - Parameters: + /// - element: The element to send to the channel. + /// - onProduceMore: The callback which gets invoked once more elements should be produced. This callback might be + /// invoked during the call to ``send(_:onProduceMore:)``. + @inlinable + public mutating func send( + _ element: sending Element, + onProduceMore: @escaping @Sendable (Result) -> Void + ) { + self.send(contentsOf: CollectionOfOne(element), onProduceMore: onProduceMore) + } + + /// Send new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating func send(contentsOf sequence: sending S) async throws where Element == S.Element, S: Sequence { + let sendResult = try { try self.send(contentsOf: sequence) }() + + switch consume sendResult { + case .produceMore: + return () + + case .enqueueCallback(let callbackToken): + let id = callbackToken._id + let storage = self._storage + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { continuation in + self._storage.enqueueProducer( + callbackToken: id, + continuation: continuation + ) + } + } onCancel: { + storage.cancelProducer(callbackToken: id) + } + } + } + + /// Send new element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - element: The element to send to the channel. + @inlinable + public mutating func send(_ element: sending Element) async throws { + try await self.send(contentsOf: CollectionOfOne(element)) + } + + /// Send the elements of the asynchronous sequence to the channel. + /// + /// This method returns once the provided asynchronous sequence or the channel finished. + /// + /// - Important: This method does not finish the source if consuming the upstream sequence terminated. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating func send(contentsOf sequence: sending S) async throws where Element == S.Element, S: AsyncSequence { + for try await element in sequence { + try await self.send(contentsOf: CollectionOfOne(element)) + } + } + + /// Indicates that the production terminated. + /// + /// After all buffered elements are consumed the next iteration point will return `nil` or throw an error. + /// + /// Calling this function more than once has no effect. After calling finish, the channel enters a terminal state and doesn't accept + /// new elements. + /// + /// - Parameters: + /// - error: The error to throw, or `nil`, to finish normally. + @inlinable + public consuming func finish(throwing error: Failure? = nil) { + self._storage.finish(error) + } + } +} + +extension MultiProducerSingleConsumerChannel { + /// The asynchronous iterator for iterating the channel. + /// + /// This type is not `Sendable`. Don't use it from multiple + /// concurrent contexts. It is a programmer error to invoke `next()` from a + /// concurrent context that contends with another such call, which + /// results in a call to `fatalError()`. + public struct Iterator: AsyncIteratorProtocol { + @usableFromInline + final class _Backing { + @usableFromInline + let storage: _Storage + + init(storage: _Storage) { + self.storage = storage + self.storage.iteratorInitialized() + } + + deinit { + self.storage.iteratorDeinitialized() + } + } + + @usableFromInline + let _backing: _Backing + + init(storage: _Storage) { + self._backing = .init(storage: storage) + } + + @_disfavoredOverload + @inlinable + public mutating func next() async throws -> Element? { + try await self._backing.storage.next(isolation: nil) + } + + @inlinable + public mutating func next( + isolation actor: isolated (any Actor)? = #isolation + ) async throws(Failure) -> Element? { + do { + return try await self._backing.storage.next(isolation: actor) + } catch { + throw error as! Failure + } + } + } + + /// Creates the asynchronous iterator that produces elements of this + /// asynchronous sequence. + public func makeAsyncIterator() -> Iterator { + Iterator(storage: self.backing.storage) + } +} + +extension MultiProducerSingleConsumerChannel: Sendable where Element: Sendable {} + +@available(*, unavailable) +extension MultiProducerSingleConsumerChannel.Iterator: Sendable {} +#endif diff --git a/Tests/AsyncAlgorithmsTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannelTests.swift b/Tests/AsyncAlgorithmsTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannelTests.swift new file mode 100644 index 00000000..cca9e25c --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannelTests.swift @@ -0,0 +1,1076 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import XCTest + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +final class MultiProducerSingleConsumerChannelTests: XCTestCase { + // MARK: - sequenceDeinitialized + + // Following tests are disabled since the channel is not getting deinited due to a known bug + +// func testSequenceDeinitialized_whenNoIterator() async throws { +// var channelAndStream: MultiProducerSingleConsumerChannel.ChannelAndStream! = MultiProducerSingleConsumerChannel.makeChannel( +// of: Int.self, +// backpressureStrategy: .watermark(low: 5, high: 10) +// ) +// var channel: MultiProducerSingleConsumerChannel? = channelAndStream.channel +// var source = channelAndStream.source +// channelAndStream = nil +// +// let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() +// source.onTermination = { +// onTerminationContinuation.finish() +// } +// +// await withThrowingTaskGroup(of: Void.self) { group in +// group.addTask { +// onTerminationContinuation.yield() +// try await Task.sleep(for: .seconds(10)) +// } +// +// var onTerminationIterator = onTerminationStream.makeAsyncIterator() +// _ = await onTerminationIterator.next() +// +// withExtendedLifetime(channel) {} +// channel = nil +// +// let terminationResult: Void? = await onTerminationIterator.next() +// XCTAssertNil(terminationResult) +// +// do { +// _ = try { try source.send(2) }() +// XCTFail("Expected an error to be thrown") +// } catch { +// XCTAssertTrue(error is MultiProducerSingleConsumerChannelAlreadyFinishedError) +// } +// +// group.cancelAll() +// } +// } +// +// func testSequenceDeinitialized_whenIterator() async throws { +// let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( +// of: Int.self, +// backpressureStrategy: .watermark(low: 5, high: 10) +// ) +// var channel: MultiProducerSingleConsumerChannel? = channelAndStream.channel +// var source = consume channelAndStream.source +// +// var iterator = channel?.makeAsyncIterator() +// +// let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() +// source.onTermination = { +// onTerminationContinuation.finish() +// } +// +// try await withThrowingTaskGroup(of: Void.self) { group in +// group.addTask { +// while !Task.isCancelled { +// onTerminationContinuation.yield() +// try await Task.sleep(for: .seconds(0.2)) +// } +// } +// +// var onTerminationIterator = onTerminationStream.makeAsyncIterator() +// _ = await onTerminationIterator.next() +// +// try withExtendedLifetime(channel) { +// let writeResult = try source.send(1) +// writeResult.assertIsProducerMore() +// } +// +// channel = nil +// +// do { +// let writeResult = try { try source.send(2) }() +// writeResult.assertIsProducerMore() +// } catch { +// XCTFail("Expected no error to be thrown") +// } +// +// let element1 = await iterator?.next() +// XCTAssertEqual(element1, 1) +// let element2 = await iterator?.next() +// XCTAssertEqual(element2, 2) +// +// group.cancelAll() +// } +// } +// +// func testSequenceDeinitialized_whenFinished() async throws { +// let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( +// of: Int.self, +// backpressureStrategy: .watermark(low: 5, high: 10) +// ) +// var channel: MultiProducerSingleConsumerChannel? = channelAndStream.channel +// var source = consume channelAndStream.source +// +// let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() +// source.onTermination = { +// onTerminationContinuation.finish() +// } +// +// await withThrowingTaskGroup(of: Void.self) { group in +// group.addTask { +// while !Task.isCancelled { +// onTerminationContinuation.yield() +// try await Task.sleep(for: .seconds(0.2)) +// } +// } +// +// var onTerminationIterator = onTerminationStream.makeAsyncIterator() +// _ = await onTerminationIterator.next() +// +// channel = nil +// +// let terminationResult: Void? = await onTerminationIterator.next() +// XCTAssertNil(terminationResult) +// XCTAssertNil(channel) +// +// group.cancelAll() +// } +// } +// +// func testSequenceDeinitialized_whenChanneling_andSuspendedProducer() async throws { +// let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( +// of: Int.self, +// backpressureStrategy: .watermark(low: 1, high: 2) +// ) +// var channel: MultiProducerSingleConsumerChannel? = channelAndStream.channel +// var source = consume channelAndStream.source +// +// _ = try { try source.send(1) }() +// +// do { +// try await withCheckedThrowingContinuation { continuation in +// source.send(1) { result in +// continuation.resume(with: result) +// } +// +// channel = nil +// _ = channel?.makeAsyncIterator() +// } +// } catch { +// XCTAssertTrue(error is MultiProducerSingleConsumerChannelAlreadyFinishedError) +// } +// } + + // MARK: - iteratorInitialized + + func testIteratorInitialized_whenInitial() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + let source = consume channelAndStream.source + + _ = channel.makeAsyncIterator() + } + + func testIteratorInitialized_whenChanneling() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + + var iterator = channel.makeAsyncIterator() + let element = await iterator.next() + XCTAssertEqual(element, 1) + } + + func testIteratorInitialized_whenSourceFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + source.finish(throwing: nil) + + var iterator = channel.makeAsyncIterator() + let element1 = await iterator.next() + XCTAssertEqual(element1, 1) + let element2 = await iterator.next() + XCTAssertNil(element2) + } + + func testIteratorInitialized_whenFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + let source = consume channelAndStream.source + + source.finish(throwing: nil) + + var iterator = channel.makeAsyncIterator() + let element = await iterator.next() + XCTAssertNil(element) + } + + // MARK: - iteratorDeinitialized + + func testIteratorDeinitialized_whenInitial() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.onTermination = { + onTerminationContinuation.finish() + } + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel.makeAsyncIterator() + iterator = nil + _ = await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenChanneling() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.onTermination = { + onTerminationContinuation.finish() + } + + try await source.send(1) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel.makeAsyncIterator() + iterator = nil + _ = await iterator?.next(isolation: nil) + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenSourceFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.onTermination = { + onTerminationContinuation.finish() + } + + try await source.send(1) + source.finish(throwing: nil) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel.makeAsyncIterator() + iterator = nil + _ = await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source.onTermination = { + onTerminationContinuation.finish() + } + + source.finish(throwing: nil) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel.makeAsyncIterator() + iterator = nil + _ = try await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testIteratorDeinitialized_whenChanneling_andSuspendedProducer() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + var channel: MultiProducerSingleConsumerChannel? = channelAndStream.channel + var source = consume channelAndStream.source + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel?.makeAsyncIterator() + channel = nil + + _ = try { try source.send(1) }() + + do { + try await withCheckedThrowingContinuation { continuation in + source.send(1) { result in + continuation.resume(with: result) + } + + iterator = nil + } + } catch { + XCTAssertTrue(error is MultiProducerSingleConsumerChannelAlreadyFinishedError) + } + + _ = try await iterator?.next() + } + + // MARK: - sourceDeinitialized + + func testSourceDeinitialized_whenSourceFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source: MultiProducerSingleConsumerChannel.Source? = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source?.onTermination = { + onTerminationContinuation.finish() + } + + try await source?.send(1) + try await source?.send(2) + source?.finish(throwing: nil) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + var iterator: MultiProducerSingleConsumerChannel.AsyncIterator? = channel.makeAsyncIterator() + _ = try await iterator?.next() + + _ = await onTerminationIterator.next() + + _ = try await iterator?.next() + _ = try await iterator?.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testSourceDeinitialized_whenFinished() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + let channel = channelAndStream.channel + var source: MultiProducerSingleConsumerChannel.Source? = consume channelAndStream.source + + let (onTerminationStream, onTerminationContinuation) = AsyncStream.makeStream() + source?.onTermination = { + onTerminationContinuation.finish() + } + + source?.finish(throwing: nil) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while !Task.isCancelled { + onTerminationContinuation.yield() + try await Task.sleep(for: .seconds(0.2)) + } + } + + var onTerminationIterator = onTerminationStream.makeAsyncIterator() + _ = await onTerminationIterator.next() + + _ = channel.makeAsyncIterator() + + _ = await onTerminationIterator.next() + + let terminationResult: Void? = await onTerminationIterator.next() + XCTAssertNil(terminationResult) + + group.cancelAll() + } + } + + func testOneOfTwoSourcesDeinitialized() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Never.self, + backpressureStrategy: .watermark(low: 5, high: 10) + ) + + let channel = channelAndStream.channel + let consumerTask = Task { + var count = 0 + for await _ in channel { + count += 1 + if count == 2 { + break + } + } + return count + } + + var source = consume channelAndStream.source + _ = try await { + var sourceCopy = source.copy() + _ = try await sourceCopy.send(1) + }() + + do { + _ = try await source.send(2) + } catch { + XCTFail("source.send() unexpectedly failed \(error)") + } + + let consumedEvents = await consumerTask.value + XCTAssertEqual(consumedEvents, 2) + } + + // MARK: - write + + func testWrite_whenInitial() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + + var iterator = channel.makeAsyncIterator() + let element = await iterator.next() + XCTAssertEqual(element, 1) + } + + func testWrite_whenChanneling_andNoConsumer() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + try await source.send(2) + + var iterator = channel.makeAsyncIterator() + let element1 = await iterator.next() + XCTAssertEqual(element1, 1) + let element2 = await iterator.next() + XCTAssertEqual(element2, 2) + } + + func testWrite_whenChanneling_andSuspendedConsumer() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return await channel.first { _ in true } + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + try await source.send(1) + let element = try await group.next() + XCTAssertEqual(element, 1) + } + } + + func testWrite_whenChanneling_andSuspendedConsumer_andEmptySequence() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 5) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return await channel.first { _ in true } + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + try await source.send(contentsOf: []) + try await source.send(contentsOf: [1]) + let element = try await group.next() + XCTAssertEqual(element, 1) + } + } + + // MARK: - enqueueProducer + + func testEnqueueProducer_whenChanneling_andAndCancelled() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + try await source.send(1) + + let writeResult = try { try source.send(2) }() + + switch consume writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.cancelCallback(callbackToken: callbackToken) + + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + do { + _ = try await producerStream.first { _ in true } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + let element = await channel.first { _ in true } + XCTAssertEqual(element, 1) + } + + func testEnqueueProducer_whenChanneling_andAndCancelled_andAsync() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await source.send(2) + } + + group.cancelAll() + do { + try await group.next() + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + } + + let element = await channel.first { _ in true } + XCTAssertEqual(element, 1) + } + + func testEnqueueProducer_whenChanneling_andInterleaving() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + var iterator = channel.makeAsyncIterator() + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + let writeResult = try { try source.send(1) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + let element = await iterator.next() + XCTAssertEqual(element, 1) + + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + do { + _ = try await producerStream.first { _ in true } + } catch { + XCTFail("Expected no error to be thrown") + } + } + + func testEnqueueProducer_whenChanneling_andSuspending() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + var iterator = channel.makeAsyncIterator() + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + let writeResult = try { try source.send(1) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + } + + let element = await iterator.next() + XCTAssertEqual(element, 1) + + do { + _ = try await producerStream.first { _ in true } + } catch { + XCTFail("Expected no error to be thrown") + } + } + + // MARK: - cancelProducer + + func testCancelProducer_whenChanneling() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 2) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (producerStream, producerSource) = AsyncThrowingStream.makeStream() + + try await source.send(1) + + let writeResult = try { try source.send(2) }() + + switch writeResult { + case .produceMore: + preconditionFailure() + case .enqueueCallback(let callbackToken): + source.enqueueCallback(callbackToken: callbackToken) { result in + producerSource.yield(with: result) + } + + source.cancelCallback(callbackToken: callbackToken) + } + + do { + _ = try await producerStream.first { _ in true } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + let element = await channel.first { _ in true } + XCTAssertEqual(element, 1) + } + + // MARK: - finish + + func testFinish_whenChanneling_andConsumerSuspended() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndStream.channel + var source: MultiProducerSingleConsumerChannel.Source? = consume channelAndStream.source + + try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return await channel.first { $0 == 2 } + } + + // This is always going to be a bit racy since we need the call to next() suspend + try await Task.sleep(for: .seconds(0.5)) + + source?.finish(throwing: nil) + source = nil + + let element = try await group.next() + XCTAssertEqual(element, .some(nil)) + } + } + + func testFinish_whenInitial() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 1, high: 1) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + source.finish(throwing: CancellationError()) + + do { + for try await _ in channel {} + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + } + + } + + // MARK: - Backpressure + + func testBackpressure() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await source.send(contentsOf: [1]) + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next() + _ = await iterator.next() + _ = await iterator.next() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } + + func testBackpressureSync() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await withCheckedThrowingContinuation { continuation in + source.send(contentsOf: [1]) { result in + continuation.resume(with: result) + } + } + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next() + _ = await iterator.next() + _ = await iterator.next() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } + + func testWatermarkWithCustomCoount() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: [Int].self, + backpressureStrategy: .watermark(low: 2, high: 4, waterLevelForElement: { $0.count }) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + var iterator = channel.makeAsyncIterator() + + try await source.send([1, 1, 1]) + + _ = await iterator.next() + + try await source.send([1, 1, 1]) + + _ = await iterator.next() + } + + func testWatermarWithLotsOfElements() async throws { + // This test should in the future use a custom task executor to schedule to avoid sending + // 1000 elements. + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source: MultiProducerSingleConsumerChannel.Source! = consume channelAndStream.source + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var source = source.take()! + for i in 0...10000 { + try await source.send(i) + } + source.finish() + } + + group.addTask { + var sum = 0 + for try await element in channel { + sum += element + } + } + } + } + + func testThrowsError() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + throwing: Error.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + try await source.send(1) + try await source.send(2) + source.finish(throwing: CancellationError()) + + var elements = [Int]() + var iterator = channel.makeAsyncIterator() + + do { + while let element = try await iterator.next() { + elements.append(element) + } + XCTFail("Expected an error to be thrown") + } catch { + XCTAssertTrue(error is CancellationError) + XCTAssertEqual(elements, [1, 2]) + } + + let element = try await iterator.next() + XCTAssertNil(element) + } + + func testAsyncSequenceWrite() async throws { + let (stream, continuation) = AsyncStream.makeStream() + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + continuation.yield(1) + continuation.yield(2) + continuation.finish() + + try await source.send(contentsOf: stream) + source.finish(throwing: nil) + + let elements = await channel.collect() + XCTAssertEqual(elements, [1, 2]) + } + + // MARK: NonThrowing + + func testNonThrowing() async throws { + let channelAndStream = MultiProducerSingleConsumerChannel.makeChannel( + of: Int.self, + backpressureStrategy: .watermark(low: 2, high: 4) + ) + let channel = channelAndStream.channel + var source = consume channelAndStream.source + + let (backpressureEventStream, backpressureEventContinuation) = AsyncStream.makeStream(of: Void.self) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while true { + backpressureEventContinuation.yield(()) + try await source.send(contentsOf: [1]) + } + } + + var backpressureEventIterator = backpressureEventStream.makeAsyncIterator() + var iterator = channel.makeAsyncIterator() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + _ = await iterator.next() + _ = await iterator.next() + _ = await iterator.next() + + await backpressureEventIterator.next() + await backpressureEventIterator.next() + await backpressureEventIterator.next() + + group.cancelAll() + } + } +} + +extension AsyncSequence { + /// Collect all elements in the sequence into an array. + fileprivate func collect() async rethrows -> [Element] { + try await self.reduce(into: []) { accumulated, next in + accumulated.append(next) + } + } +} + +extension MultiProducerSingleConsumerChannel.Source.SendResult { + func assertIsProducerMore() { + switch self { + case .produceMore: + return () + + case .enqueueCallback: + XCTFail("Expected produceMore") + } + } + + func assertIsEnqueueCallback() { + switch self { + case .produceMore: + XCTFail("Expected enqueueCallback") + + case .enqueueCallback: + return () + } + } +} + +extension Optional where Wrapped: ~Copyable { + fileprivate mutating func take() -> Self { + let result = consume self + self = nil + return result + } +}