-
Notifications
You must be signed in to change notification settings - Fork 161
/
Copy pathAsyncReplaySequence.swift
87 lines (73 loc) · 2.69 KB
/
AsyncReplaySequence.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
import DequeModule
public extension AsyncSequence {
func replay(count: Int) -> AsyncReplaySequence<Self> {
AsyncReplaySequence(base: self, count: count)
}
}
public struct AsyncReplaySequence<Base: AsyncSequence>: AsyncSequence {
public typealias Element = Base.Element
public typealias AsyncIterator = Iterator
private let base: Base
private let count: Int
private let history: ManagedCriticalState<Deque<Result<Base.Element?, Error>>>
public init(base: Base, count: Int) {
self.base = base
self.count = count
self.history = ManagedCriticalState([])
}
private func push(element: Result<Element?, Error>) {
self.history.withCriticalRegion { history in
if history.count >= count {
_ = history.popFirst()
}
history.append(element)
}
}
private func dumpHistory(into localHistory: inout Deque<Result<Base.Element?, Error>>?) {
self.history.withCriticalRegion { localHistory = $0 }
}
public func makeAsyncIterator() -> AsyncIterator {
return Iterator(
asyncReplaySequence: self,
base: self.base.makeAsyncIterator()
)
}
public struct Iterator: AsyncIteratorProtocol {
let asyncReplaySequence: AsyncReplaySequence<Base>
var base: Base.AsyncIterator
var history: Deque<Result<Base.Element?, Error>>?
public mutating func next() async rethrows -> Element? {
if self.history == nil {
// first call to next, we make sure we have the latest available history
self.asyncReplaySequence.dumpHistory(into: &self.history)
}
if self.history!.isEmpty {
// nothing to replay, we request the next element from the base and push it in the history
let element: Result<Base.Element?, Error>
do {
element = .success(try await self.base.next())
} catch {
element = .failure(error)
}
self.asyncReplaySequence.push(element: element)
return try element._rethrowGet()
} else {
guard !Task.isCancelled else { return nil }
// we replay the oldest element from the history
let element = self.history!.popFirst()!
return try element._rethrowGet()
}
}
}
}
extension AsyncReplaySequence: Sendable where Base: Sendable, Base.Element: Sendable { }