@@ -16,16 +16,19 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
16
16
typealias SuspendedProducer = UnsafeContinuation < Void , Never >
17
17
typealias SuspendedConsumer = UnsafeContinuation < Result < Base . Element , Error > ? , Never >
18
18
19
+ // We are using UnsafeTransfer here since we have to get the elements from the task
20
+ // into the consumer task. This is a transfer but we cannot prove this to the compiler at this point
21
+ // since next is not marked as transferring the return value.
19
22
fileprivate enum State {
20
23
case initial( base: Base )
21
24
case buffering(
22
25
task: Task < Void , Never > ,
23
- buffer: Deque < Result < Element , Error > > ,
26
+ buffer: Deque < Result < UnsafeTransfer < Element > , Error > > ,
24
27
suspendedProducer: SuspendedProducer ? ,
25
28
suspendedConsumer: SuspendedConsumer ?
26
29
)
27
30
case modifying
28
- case finished( buffer: Deque < Result < Element , Error > > )
31
+ case finished( buffer: Deque < Result < UnsafeTransfer < Element > , Error > > )
29
32
}
30
33
31
34
private var state : State
@@ -139,7 +142,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
139
142
// we have to stack the new element or suspend the producer if the buffer is full
140
143
precondition ( buffer. count < limit, " Invalid state. The buffer should be available for stacking a new element. " )
141
144
self . state = . modifying
142
- buffer. append ( . success( element) )
145
+ buffer. append ( . success( . init ( element) ) )
143
146
self . state = . buffering( task: task, buffer: buffer, suspendedProducer: nil , suspendedConsumer: nil )
144
147
return . none
145
148
@@ -218,7 +221,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
218
221
self . state = . modifying
219
222
let result = buffer. popFirst ( ) !
220
223
self . state = . buffering( task: task, buffer: buffer, suspendedProducer: nil , suspendedConsumer: nil )
221
- return . returnResult( producerContinuation: suspendedProducer, result: result)
224
+ return . returnResult( producerContinuation: suspendedProducer, result: result. map { $0 . wrapped } )
222
225
223
226
case . buffering( _, _, _, . some) :
224
227
preconditionFailure ( " Invalid states. There is already a suspended consumer. " )
@@ -233,7 +236,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
233
236
self . state = . modifying
234
237
let result = buffer. popFirst ( ) !
235
238
self . state = . finished( buffer: buffer)
236
- return . returnResult( producerContinuation: nil , result: result)
239
+ return . returnResult( producerContinuation: nil , result: result. map { $0 . wrapped } )
237
240
}
238
241
}
239
242
@@ -257,7 +260,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
257
260
self . state = . modifying
258
261
let result = buffer. popFirst ( ) !
259
262
self . state = . buffering( task: task, buffer: buffer, suspendedProducer: nil , suspendedConsumer: nil )
260
- return . returnResult( producerContinuation: suspendedProducer, result: result)
263
+ return . returnResult( producerContinuation: suspendedProducer, result: result. map { $0 . wrapped } )
261
264
262
265
case . buffering( _, _, _, . some) :
263
266
preconditionFailure ( " Invalid states. There is already a suspended consumer. " )
@@ -272,7 +275,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
272
275
self . state = . modifying
273
276
let result = buffer. popFirst ( ) !
274
277
self . state = . finished( buffer: buffer)
275
- return . returnResult( producerContinuation: nil , result: result)
278
+ return . returnResult( producerContinuation: nil , result: result. map { $0 . wrapped } )
276
279
}
277
280
}
278
281
0 commit comments