@@ -20,9 +20,10 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
20
20
var value : Value
21
21
var continuations : [ UInt : AsyncStream < Value > . Continuation ] = [ : ]
22
22
var count : UInt = 0
23
+ var finished = false
23
24
}
24
25
25
- let bufferingPolicy : BufferingPolicy
26
+ let bufferingPolicy : UncheckedSendable < BufferingPolicy >
26
27
let mutableState : LockIsolated < MutableState >
27
28
28
29
/// Creates a new AsyncValueSubject with an initial value.
@@ -31,7 +32,7 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
31
32
/// - bufferingPolicy: Determines how values are buffered in the AsyncStream (defaults to .unbounded)
32
33
package init ( _ initialValue: Value , bufferingPolicy: BufferingPolicy = . unbounded) {
33
34
self . mutableState = LockIsolated ( MutableState ( value: initialValue) )
34
- self . bufferingPolicy = bufferingPolicy
35
+ self . bufferingPolicy = UncheckedSendable ( bufferingPolicy)
35
36
}
36
37
37
38
deinit {
@@ -43,12 +44,17 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
43
44
mutableState. value
44
45
}
45
46
46
- /// Sends a new value to the subject and notifies all observers.
47
- /// - Parameter value: The new value to send
47
+ /// Resume the task awaiting the next iteration point by having it return normally from its suspension point with a given element.
48
+ /// - Parameter value: The value to yield from the continuation.
49
+ ///
50
+ /// If nothing is awaiting the next value, this method attempts to buffer the result’s element.
51
+ ///
52
+ /// This can be called more than once and returns to the caller immediately without blocking for any awaiting consumption from the iteration.
48
53
package func yield( _ value: Value ) {
49
54
mutableState. withValue {
50
- $0. value = value
55
+ guard ! $0. finished else { return }
51
56
57
+ $0. value = value
52
58
for (_, continuation) in $0. continuations {
53
59
continuation. yield ( value)
54
60
}
@@ -62,14 +68,20 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
62
68
/// finish, the stream enters a terminal state and doesn't produce any
63
69
/// additional elements.
64
70
package func finish( ) {
65
- for (_, continuation) in mutableState. continuations {
66
- continuation. finish ( )
71
+ mutableState. withValue {
72
+ guard $0. finished == false else { return }
73
+
74
+ $0. finished = true
75
+
76
+ for (_, continuation) in $0. continuations {
77
+ continuation. finish ( )
78
+ }
67
79
}
68
80
}
69
81
70
82
/// An AsyncStream that emits the current value and all subsequent updates.
71
83
package var values : AsyncStream < Value > {
72
- AsyncStream ( bufferingPolicy: bufferingPolicy) { continuation in
84
+ AsyncStream ( bufferingPolicy: bufferingPolicy. value ) { continuation in
73
85
insert ( continuation)
74
86
}
75
87
}
0 commit comments