-
Notifications
You must be signed in to change notification settings - Fork 161
/
Copy pathInput.swift
123 lines (109 loc) · 3.83 KB
/
Input.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
extension AsyncSequenceValidationDiagram {
public struct Specification: Sendable {
public let specification: String
public let location: SourceLocation
init(specification: String, location: SourceLocation) {
self.specification = specification
self.location = location
}
}
public struct Input: AsyncSequence, Sendable {
public typealias Element = String
struct State {
var emissions = [(Clock.Instant, Event)]()
}
let state = ManagedCriticalState(State())
let queue: WorkQueue
let index: Int
public struct Iterator: AsyncIteratorProtocol, Sendable {
let state: ManagedCriticalState<State>
let queue: WorkQueue
let index: Int
var active: (Clock.Instant, [Result<String?, Error>])?
var eventIndex = 0
mutating func apply(when: Clock.Instant, results: [Result<String?, Error>]) async throws -> Element? {
let token = queue.prepare()
if eventIndex + 1 >= results.count {
active = nil
}
defer {
if active != nil {
eventIndex += 1
} else {
eventIndex = 0
}
}
return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { continuation in
queue.enqueue(Context.state.withCriticalRegion(\.currentJob), deadline: when, continuation: continuation, results[eventIndex], index: index, token: token)
}
} onCancel: { [queue] in
queue.cancel(token)
}
}
public mutating func next() async throws -> Element? {
if let (when, results) = active {
return try await apply(when: when, results: results)
} else {
let next = state.withCriticalRegion { state -> (Clock.Instant, Event)? in
guard state.emissions.count > 0 else {
return nil
}
return state.emissions.removeFirst()
}
guard let next = next else {
return nil
}
let when = next.0
let results = next.1.results
active = (when, results)
return try await apply(when: when, results: results)
}
}
}
public func makeAsyncIterator() -> Iterator {
Iterator(state: state, queue: queue, index: index)
}
func parse<Theme: AsyncSequenceValidationTheme>(_ dsl: String, theme: Theme, location: SourceLocation) throws {
let emissions = try Event.parse(dsl, theme: theme, location: location)
state.withCriticalRegion { state in
state.emissions = emissions
}
}
var end: Clock.Instant? {
return state.withCriticalRegion { state in
state.emissions.map { $0.0 }.sorted().last
}
}
}
public struct InputList: RandomAccessCollection, Sendable {
let state = ManagedCriticalState([Input]())
let queue: WorkQueue
public var startIndex: Int { return 0 }
public var endIndex: Int {
state.withCriticalRegion { $0.count }
}
public subscript(position: Int) -> AsyncSequenceValidationDiagram.Input {
get {
return state.withCriticalRegion { state in
if position >= state.count {
for _ in state.count...position {
state.append(Input(queue: queue, index: position))
}
}
return state[position]
}
}
}
}
}