@@ -56,35 +56,54 @@ extension AsyncSequence {
56
56
}
57
57
58
58
/// An `AsyncSequence` that chunks elements into collected `RangeReplaceableCollection` instances by either count or a signal from another `AsyncSequence`.
59
- public struct AsyncChunksOfCountOrSignalSequence< Base: AsyncSequence , Collected: RangeReplaceableCollection , Signal: AsyncSequence > : AsyncSequence , Sendable where Collected. Element == Base . Element , Base: Sendable , Signal: Sendable , Base. AsyncIterator : Sendable , Signal . AsyncIterator : Sendable , Base . Element: Sendable , Signal. Element: Sendable {
59
+ public struct AsyncChunksOfCountOrSignalSequence< Base: AsyncSequence , Collected: RangeReplaceableCollection , Signal: AsyncSequence > : AsyncSequence , Sendable where Collected. Element == Base . Element , Base: Sendable , Signal: Sendable , Base. Element: Sendable , Signal. Element: Sendable {
60
60
61
61
public typealias Element = Collected
62
62
63
+ enum Either {
64
+ case element( Base . Element )
65
+ case terminal
66
+ case signal
67
+ }
68
+
63
69
/// The iterator for a `AsyncChunksOfCountOrSignalSequence` instance.
64
- public struct Iterator: AsyncIteratorProtocol , Sendable {
70
+ public struct Iterator: AsyncIteratorProtocol {
71
+ typealias EitherMappedBase = AsyncMapSequence < Base , Either >
72
+ typealias EitherMappedSignal = AsyncMapSequence < Signal , Either >
73
+ typealias ChainedBase = AsyncChain2Sequence < EitherMappedBase , AsyncLazySequence < [ Either ] > >
74
+ typealias Merged = AsyncMerge2Sequence < ChainedBase , EitherMappedSignal >
75
+
65
76
let count : Int ?
66
- var state : Merge2StateMachine < Base , Signal >
67
- init ( base: Base . AsyncIterator , count: Int ? , signal: Signal . AsyncIterator ) {
77
+ var iterator : Merged . AsyncIterator
78
+ var terminated = false
79
+
80
+ init ( iterator: Merged . AsyncIterator , count: Int ? ) {
68
81
self . count = count
69
- self . state = Merge2StateMachine ( base , terminatesOnNil : true , signal )
82
+ self . iterator = iterator
70
83
}
71
84
72
85
public mutating func next( ) async rethrows -> Collected ? {
73
- var result : Collected ?
74
- while let next = try await state. next ( ) {
86
+ guard !terminated else {
87
+ return nil
88
+ }
89
+ var result : Collected ?
90
+ while let next = try await iterator. next ( ) {
75
91
switch next {
76
- case . first( let element) :
77
- if result == nil {
78
- result = Collected ( )
79
- }
80
- result!. append ( element)
81
- if result? . count == count {
82
- return result
83
- }
84
- case . second( _) :
85
- if result != nil {
86
- return result
87
- }
92
+ case . element( let element) :
93
+ if result == nil {
94
+ result = Collected ( )
95
+ }
96
+ result!. append ( element)
97
+ if result? . count == count {
98
+ return result
99
+ }
100
+ case . terminal:
101
+ terminated = true
102
+ return result
103
+ case . signal:
104
+ if result != nil {
105
+ return result
106
+ }
88
107
}
89
108
}
90
109
return result
@@ -105,6 +124,7 @@ public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected:
105
124
}
106
125
107
126
public func makeAsyncIterator( ) - > Iterator {
108
- return Iterator ( base: base. makeAsyncIterator ( ) , count: count, signal: signal. makeAsyncIterator ( ) )
127
+
128
+ return Iterator ( iterator: merge ( chain ( base. map { Either . element ( $0) } , [ . terminal] . async ) , signal. map { _ in Either . signal } ) . makeAsyncIterator ( ) , count: count)
109
129
}
110
130
}
0 commit comments