forked from apple/swift-async-algorithms
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathUnboundedBufferStorage.swift
123 lines (112 loc) · 3.74 KB
/
UnboundedBufferStorage.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Sendable {
private let stateMachine: ManagedCriticalState<UnboundedBufferStateMachine<Base>>
init(base: Base, policy: UnboundedBufferStateMachine<Base>.Policy) {
self.stateMachine = ManagedCriticalState(UnboundedBufferStateMachine<Base>(base: base, policy: policy))
}
func next() async -> Result<Base.Element, Error>? {
return await withTaskCancellationHandler {
let action: UnboundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.next()
switch action {
case .startTask(let base):
self.startTask(stateMachine: &stateMachine, base: base)
return nil
case .suspend:
return action
case .returnResult:
return action
}
}
switch action {
case .startTask:
// We are handling the startTask in the lock already because we want to avoid
// other inputs interleaving while starting the task
fatalError("Internal inconsistency")
case .suspend:
break
case .returnResult(let result):
return result
case .none:
break
}
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
switch action {
case .none:
break
case .resumeConsumer(let result):
continuation.resume(returning: result)
}
}
} onCancel: {
self.interrupted()
}
}
private func startTask(
stateMachine: inout UnboundedBufferStateMachine<Base>,
base: Base
) {
let task = Task {
do {
for try await element in base {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.elementProduced(element: element)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: nil)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
}
} catch {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: error)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
}
}
stateMachine.taskStarted(task: task)
}
func interrupted() {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.interrupted()
}
switch action {
case .none:
break
case .resumeConsumer(let task, let continuation):
task.cancel()
continuation?.resume(returning: nil)
}
}
deinit {
self.interrupted()
}
}