diff --git a/Sources/AsyncAlgorithms/Internal/_TinyArray.swift b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift index 07357ccb..d9d04411 100644 --- a/Sources/AsyncAlgorithms/Internal/_TinyArray.swift +++ b/Sources/AsyncAlgorithms/Internal/_TinyArray.swift @@ -72,12 +72,12 @@ extension _TinyArray: RandomAccessCollection { } extension _TinyArray { - @inlinable + //@inlinable init(_ elements: some Sequence) { self.storage = .init(elements) } - @inlinable + //@inlinable init() { self.storage = .init() } diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift index 58b41ae1..39fb4c95 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel+Internal.swift @@ -97,7 +97,7 @@ extension MultiProducerSingleConsumerChannel { @usableFromInline var description: String { - switch consume self { + switch /*consume*/ self { case .watermark(let strategy): return strategy.description case .unbounded(let unbounded): @@ -107,7 +107,7 @@ extension MultiProducerSingleConsumerChannel { @inlinable mutating func didSend(elements: Deque.SubSequence) -> Bool { - switch consume self { + switch /*consume*/ self { case .watermark(var strategy): let result = strategy.didSend(elements: elements) self = .watermark(strategy) @@ -121,7 +121,7 @@ extension MultiProducerSingleConsumerChannel { @inlinable mutating func didConsume(element: Element) -> Bool { - switch consume self { + switch /*consume*/ self { case .watermark(var strategy): let result = strategy.didConsume(element: element) self = .watermark(strategy) @@ -244,7 +244,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func send( contentsOf sequence: some Sequence ) throws -> MultiProducerSingleConsumerChannel.Source.SendResult { @@ -272,7 +272,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func enqueueProducer( callbackToken: UInt64, continuation: UnsafeContinuation @@ -293,7 +293,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func enqueueProducer( callbackToken: UInt64, onProduceMore: sending @escaping (Result) -> Void @@ -314,7 +314,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func cancelProducer( callbackToken: UInt64 ) { @@ -336,7 +336,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func finish(_ failure: Failure?) { let action = self._lock.withLock { self._stateMachine.finish(failure) @@ -371,7 +371,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func next(isolation actor: isolated (any Actor)?) async throws -> Element? { let action = self._lock.withLock { self._stateMachine.next() @@ -411,7 +411,7 @@ extension MultiProducerSingleConsumerChannel { } } - @inlinable + //@inlinable func suspendNext(isolation actor: isolated (any Actor)?) async throws -> Element? { return try await withTaskCancellationHandler { return try await withUnsafeThrowingContinuation { continuation in @@ -488,10 +488,10 @@ extension MultiProducerSingleConsumerChannel._Storage { @usableFromInline var _state: _State - @inlinable + //@inlinable var _onTermination: (@Sendable () -> Void)? { set { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): channeling.onTermination = newValue self = .init(state: .channeling(channeling)) @@ -535,7 +535,7 @@ extension MultiProducerSingleConsumerChannel._Storage { ) } - @inlinable + //@inlinable init(state: consuming _State) { self._state = state } @@ -552,9 +552,9 @@ extension MultiProducerSingleConsumerChannel._Storage { ) } - @inlinable + //@inlinable mutating func sourceDeinitialized() -> SourceDeinitializedAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): channeling.activeProducers -= 1 @@ -605,10 +605,10 @@ extension MultiProducerSingleConsumerChannel._Storage { ) } - @inlinable + //@inlinable mutating func sequenceDeinitialized() -> SequenceDeinitializedAction? { - switch consume self._state { - case .channeling(let channeling): + switch (consume self)._state { + case .channeling(var channeling): guard channeling.iteratorInitialized else { // No iterator was created so we can transition to finished right away. self = .init(state: .finished(.init(iteratorInitialized: false, sourceFinished: false))) @@ -624,7 +624,7 @@ extension MultiProducerSingleConsumerChannel._Storage { return .none - case .sourceFinished(let sourceFinished): + case .sourceFinished(var sourceFinished): guard sourceFinished.iteratorInitialized else { // No iterator was created so we can transition to finished right away. self = .init(state: .finished(.init(iteratorInitialized: false, sourceFinished: true))) @@ -637,7 +637,7 @@ extension MultiProducerSingleConsumerChannel._Storage { return .none - case .finished(let finished): + case .finished(var finished): // We are already finished so there is nothing left to clean up. // This is just the references dropping afterwards. self = .init(state: .finished(finished)) @@ -646,9 +646,9 @@ extension MultiProducerSingleConsumerChannel._Storage { } } - @inlinable + //@inlinable mutating func iteratorInitialized() { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): if channeling.iteratorInitialized { // Our sequence is a unicast sequence and does not support multiple AsyncIterator's @@ -691,9 +691,9 @@ extension MultiProducerSingleConsumerChannel._Storage { ) } - @inlinable + //@inlinable mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(let channeling): if channeling.iteratorInitialized { // An iterator was created and deinited. Since we only support @@ -721,7 +721,7 @@ extension MultiProducerSingleConsumerChannel._Storage { fatalError("MultiProducerSingleConsumerChannel internal inconsistency") } - case .finished(let finished): + case .finished(var finished): // We are already finished so there is nothing left to clean up. // This is just the references dropping afterwards. self = .init(state: .finished(finished)) @@ -781,9 +781,9 @@ extension MultiProducerSingleConsumerChannel._Storage { } } - @inlinable + //@inlinable mutating func send(_ sequence: some Sequence) -> SendAction { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): // We have an element and can resume the continuation let bufferEndIndexBeforeAppend = channeling.buffer.endIndex @@ -846,12 +846,12 @@ extension MultiProducerSingleConsumerChannel._Storage { case resumeProducerWithError((Result) -> Void, Error) } - @inlinable + //@inlinable mutating func enqueueProducer( callbackToken: UInt64, onProduceMore: sending @escaping (Result) -> Void ) -> EnqueueProducerAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { // Our producer got marked as cancelled. @@ -896,12 +896,12 @@ extension MultiProducerSingleConsumerChannel._Storage { case resumeProducerWithError(UnsafeContinuation, Error) } - @inlinable + //@inlinable mutating func enqueueContinuation( callbackToken: UInt64, continuation: UnsafeContinuation ) -> EnqueueContinuationAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): if let index = channeling.cancelledAsyncProducers.firstIndex(of: callbackToken) { // Our producer got marked as cancelled. @@ -944,11 +944,11 @@ extension MultiProducerSingleConsumerChannel._Storage { case resumeProducerWithCancellationError(_MultiProducerSingleConsumerSuspendedProducer) } - @inlinable + //@inlinable mutating func cancelProducer( callbackToken: UInt64 ) -> CancelProducerAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): guard let index = channeling.suspendedProducers.firstIndex(where: { $0.0 == callbackToken }) else { // The task that sends was cancelled before sending elements so the cancellation handler @@ -998,9 +998,9 @@ extension MultiProducerSingleConsumerChannel._Storage { ) } - @inlinable + //@inlinable mutating func finish(_ failure: Failure?) -> FinishAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(let channeling): guard let consumerContinuation = channeling.consumerContinuation else { // We don't have a suspended consumer so we are just going to mark @@ -1057,9 +1057,9 @@ extension MultiProducerSingleConsumerChannel._Storage { case suspendTask } - @inlinable + //@inlinable mutating func next() -> NextAction { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): guard channeling.consumerContinuation == nil else { // We have multiple AsyncIterators iterating the sequence @@ -1131,9 +1131,9 @@ extension MultiProducerSingleConsumerChannel._Storage { case resumeConsumerWithNil(UnsafeContinuation) } - @inlinable + //@inlinable mutating func suspendNext(continuation: UnsafeContinuation) -> SuspendNextAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(var channeling): guard channeling.consumerContinuation == nil else { // We have multiple AsyncIterators iterating the sequence @@ -1198,9 +1198,9 @@ extension MultiProducerSingleConsumerChannel._Storage { case failProducersAndCallOnTermination(_TinyArray<_MultiProducerSingleConsumerSuspendedProducer>, (() -> Void)?) } - @inlinable + //@inlinable mutating func cancelNext() -> CancelNextAction? { - switch consume self._state { + switch (consume self)._state { case .channeling(let channeling): self = .init(state: .finished(.init(iteratorInitialized: channeling.iteratorInitialized, sourceFinished: false))) @@ -1219,12 +1219,12 @@ extension MultiProducerSingleConsumerChannel._Storage { channeling.onTermination ) - case .sourceFinished(let sourceFinished): + case .sourceFinished(var sourceFinished): self = .init(state: .sourceFinished(sourceFinished)) return .none - case .finished(let finished): + case .finished(var finished): self = .init(state: .finished(finished)) return .none @@ -1282,7 +1282,7 @@ extension MultiProducerSingleConsumerChannel._Storage._StateMachine { "backpressure:\(self.backpressureStrategy.description) iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) consumerContinuation:\(self.consumerContinuation == nil) producerContinuations:\(self.suspendedProducers.count) cancelledProducers:\(self.cancelledAsyncProducers.count) hasOutstandingDemand:\(self.hasOutstandingDemand)" } - @inlinable + //@inlinable init( backpressureStrategy: MultiProducerSingleConsumerChannel._InternalBackpressureStrategy, iteratorInitialized: Bool, @@ -1338,7 +1338,7 @@ extension MultiProducerSingleConsumerChannel._Storage._StateMachine { "iteratorInitialized:\(self.iteratorInitialized) buffer:\(self.buffer.count) failure:\(self.failure == nil)" } - @inlinable + //@inlinable init( iteratorInitialized: Bool, buffer: Deque, @@ -1366,7 +1366,7 @@ extension MultiProducerSingleConsumerChannel._Storage._StateMachine { "iteratorInitialized:\(self.iteratorInitialized) sourceFinished:\(self.sourceFinished)" } - @inlinable + //@inlinable init( iteratorInitialized: Bool, sourceFinished: Bool diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift index 5e860a89..2e847d0b 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerChannel.swift @@ -251,7 +251,7 @@ extension MultiProducerSingleConsumerChannel { /// /// - Parameter sequence: The elements to send to the channel. /// - Returns: The result that indicates if more elements should be produced at this time. - @inlinable + //@inlinable public mutating func send(contentsOf sequence: sending S) throws -> SendResult where Element == S.Element, S: Sequence { try self._storage.send(contentsOf: sequence) } @@ -264,7 +264,7 @@ extension MultiProducerSingleConsumerChannel { /// /// - Parameter element: The element to send to the channel. /// - Returns: The result that indicates if more elements should be produced at this time. - @inlinable + //@inlinable public mutating func send(_ element: sending Element) throws -> SendResult { try self._storage.send(contentsOf: CollectionOfOne(element)) } @@ -278,7 +278,7 @@ extension MultiProducerSingleConsumerChannel { /// - Parameters: /// - callbackToken: The callback token. /// - onProduceMore: The callback which gets invoked once more elements should be produced. - @inlinable + //@inlinable public mutating func enqueueCallback( callbackToken: consuming SendResult.CallbackToken, onProduceMore: sending @escaping (Result) -> Void @@ -294,7 +294,7 @@ extension MultiProducerSingleConsumerChannel { /// will mark the passed `callbackToken` as cancelled. /// /// - Parameter callbackToken: The callback token. - @inlinable + //@inlinable public mutating func cancelCallback(callbackToken: consuming SendResult.CallbackToken) { self._storage.cancelProducer(callbackToken: callbackToken._id) } @@ -357,7 +357,7 @@ extension MultiProducerSingleConsumerChannel { /// /// - Parameters: /// - sequence: The elements to send to the channel. - @inlinable + //@inlinable public mutating func send(contentsOf sequence: sending S) async throws where Element == S.Element, S: Sequence { let sendResult = try { try self.send(contentsOf: sequence) }() @@ -420,7 +420,7 @@ extension MultiProducerSingleConsumerChannel { /// /// - Parameters: /// - error: The error to throw, or `nil`, to finish normally. - @inlinable + //@inlinable public consuming func finish(throwing error: Failure? = nil) { self._storage.finish(error) } @@ -458,12 +458,12 @@ extension MultiProducerSingleConsumerChannel { } @_disfavoredOverload - @inlinable + //@inlinable public mutating func next() async throws -> Element? { try await self._backing.storage.next(isolation: nil) } - @inlinable + //@inlinable public mutating func next( isolation actor: isolated (any Actor)? = #isolation ) async throws(Failure) -> Element? {