Skip to content

Commit 0ab431e

Browse files
authored
refactor: add identifier metadata for cancellation source registered tasks (#18)
* feat: add cooperative cancellation to cancellation source registered tasks completion wait * wip: remove generics to fix Swift 5.6 build * wip: use for loop instead of iterator * wip: revert concurrent async stream iteration calls * wip: fix cancellation handling * wip: make cancellation task detached
1 parent 5ce6b1a commit 0ab431e

File tree

4 files changed

+149
-39
lines changed

4 files changed

+149
-39
lines changed

Sources/AsyncObjects/CancellationSource/Cancellable.swift

+38
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import Foundation
2+
13
/// A type representing a unit of work or task that supports cancellation.
24
///
35
/// Cancellation should be initiated on invoking ``cancel(file:function:line:)``
@@ -92,3 +94,39 @@ extension Task: Cancellable {
9294
let _ = try await self.value
9395
}
9496
}
97+
98+
/// Waits asynchronously for the work or task to complete
99+
/// handling cooperative cancellation initiation.
100+
///
101+
/// - Parameters:
102+
/// - work: The work for which completion to wait and handle cooperative cancellation.
103+
/// - id: The identifier associated with work.
104+
/// - file: The file wait request originates from (there's usually no need to pass it
105+
/// explicitly as it defaults to `#fileID`).
106+
/// - function: The function wait request originates from (there's usually no need to
107+
/// pass it explicitly as it defaults to `#function`).
108+
/// - line: The line wait request originates from (there's usually no need to pass it
109+
/// explicitly as it defaults to `#line`).
110+
///
111+
/// - Throws: If waiting for the work completes with an error.
112+
@inlinable
113+
func waitHandlingCancelation(
114+
for work: Cancellable,
115+
associatedId id: UUID,
116+
file: String = #fileID,
117+
function: String = #function,
118+
line: UInt = #line
119+
) async throws {
120+
try await withTaskCancellationHandler {
121+
defer {
122+
log("Finished", id: id, file: file, function: function, line: line)
123+
}
124+
try await work.wait(file: file, function: function, line: line)
125+
} onCancel: {
126+
work.cancel(file: file, function: function, line: line)
127+
log(
128+
"Cancellation initiated", id: id,
129+
file: file, function: function, line: line
130+
)
131+
}
132+
}

Sources/AsyncObjects/CancellationSource/CancellationSource.swift

+16-24
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@ import Foundation
3333
/// tasks in that case.
3434
public struct CancellationSource: AsyncObject, Cancellable, Loggable {
3535
/// The continuation type controlling task group lifetime.
36-
@usableFromInline
3736
internal typealias Continuation = GlobalContinuation<Void, Error>
3837
/// The cancellable work with invocation context.
39-
@usableFromInline
4038
internal typealias WorkItem = (
41-
Cancellable, file: String, function: String, line: UInt
39+
Cancellable, id: UUID, file: String, function: String, line: UInt
4240
)
4341

4442
/// The lifetime task that is cancelled when
@@ -47,7 +45,6 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
4745
var lifetime: Task<Void, Error>!
4846
/// The stream continuation used to register work items
4947
/// for cooperative cancellation.
50-
@usableFromInline
5148
var pipe: AsyncStream<WorkItem>.Continuation!
5249

5350
/// A Boolean value that indicates whether cancellation is already
@@ -57,30 +54,24 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
5754
/// There is no way to uncancel on this source. Create a new
5855
/// `CancellationSource` to manage cancellation of newly spawned
5956
/// tasks in that case.
57+
@inlinable
6058
public var isCancelled: Bool { lifetime.isCancelled }
6159

6260
/// Creates a new cancellation source object.
6361
///
6462
/// - Returns: The newly created cancellation source.
6563
public init() {
6664
let stream = AsyncStream<WorkItem> { self.pipe = $0 }
67-
self.lifetime = Task {
65+
self.lifetime = Task.detached {
6866
try await withThrowingTaskGroup(of: Void.self) { group in
6967
for await item in stream {
7068
group.addTask {
71-
try? await withTaskCancellationHandler {
72-
try await item.0.wait(
73-
file: item.file,
74-
function: item.function,
75-
line: item.line
76-
)
77-
} onCancel: {
78-
item.0.cancel(
79-
file: item.file,
80-
function: item.function,
81-
line: item.line
82-
)
83-
}
69+
try? await waitHandlingCancelation(
70+
for: item.0, associatedId: item.id,
71+
file: item.file,
72+
function: item.function,
73+
line: item.line
74+
)
8475
}
8576
}
8677

@@ -110,18 +101,19 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
110101
function: String = #function,
111102
line: UInt = #line
112103
) {
113-
let result = pipe.yield((task, file, function, line))
104+
let id = UUID()
105+
let result = pipe.yield((task, id, file, function, line))
114106
switch result {
115107
case .enqueued:
116108
log(
117-
"Registered \(task)",
109+
"Registered \(task)", id: id,
118110
file: file, function: function, line: line
119111
)
120112
case .dropped, .terminated: fallthrough
121113
@unknown default:
122114
task.cancel(file: file, function: function, line: line)
123115
log(
124-
"Cancelled \(task) due to result: \(result)",
116+
"Cancelled \(task) due to result: \(result)", id: id,
125117
file: file, function: function, line: line
126118
)
127119
}
@@ -172,10 +164,10 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
172164
function: String = #function,
173165
line: UInt = #line
174166
) async {
175-
let key = UUID()
176-
log("Waiting", id: key, file: file, function: function, line: line)
167+
let id = UUID()
168+
log("Waiting", id: id, file: file, function: function, line: line)
177169
let _ = await lifetime.result
178-
log("Completed", id: key, file: file, function: function, line: line)
170+
log("Completed", id: id, file: file, function: function, line: line)
179171
}
180172
}
181173

Sources/AsyncObjects/Logging/Loggable.swift

+54
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,36 @@ let level: Logger.Level = .debug
3434
let level: Logger.Level = .info
3535
#endif
3636

37+
/// Log a message attaching an optional identifier.
38+
///
39+
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`.
40+
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`.
41+
/// Otherwise log level is set to `info`.
42+
///
43+
/// - Parameters:
44+
/// - message: The message to be logged.
45+
/// - id: Optional identifier associated with message.
46+
/// - file: The file this log message originates from (there's usually
47+
/// no need to pass it explicitly as it defaults to `#fileID`).
48+
/// - function: The function this log message originates from (there's usually
49+
/// no need to pass it explicitly as it defaults to `#function`).
50+
/// - line: The line this log message originates from (there's usually
51+
/// no need to pass it explicitly as it defaults to `#line`).
52+
@inlinable
53+
func log(
54+
_ message: @autoclosure () -> Logger.Message,
55+
id: UUID? = nil,
56+
file: String = #fileID,
57+
function: String = #function,
58+
line: UInt = #line
59+
) {
60+
let metadata: Logger.Metadata = (id != nil) ? ["id": "\(id!)"] : [:]
61+
logger.log(
62+
level: level, message(), metadata: metadata,
63+
file: file, function: function, line: line
64+
)
65+
}
66+
3767
extension Loggable {
3868
/// Log a message attaching the default type specific metadata
3969
/// and optional identifier.
@@ -102,6 +132,30 @@ extension LoggableActor {
102132
}
103133
}
104134
#else
135+
/// Log a message attaching an optional identifier.
136+
///
137+
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_TRACE` is set log level is set to `trace`.
138+
/// If `ASYNCOBJECTS_ENABLE_LOGGING_LEVEL_DEBUG` is set log level is set to `debug`.
139+
/// Otherwise log level is set to `info`.
140+
///
141+
/// - Parameters:
142+
/// - message: The message to be logged.
143+
/// - id: Optional identifier associated with message.
144+
/// - file: The file this log message originates from (there's usually
145+
/// no need to pass it explicitly as it defaults to `#fileID`).
146+
/// - function: The function this log message originates from (there's usually
147+
/// no need to pass it explicitly as it defaults to `#function`).
148+
/// - line: The line this log message originates from (there's usually
149+
/// no need to pass it explicitly as it defaults to `#line`).
150+
@inlinable
151+
func log(
152+
_ message: @autoclosure () -> String,
153+
id: UUID? = nil,
154+
file: String = #fileID,
155+
function: String = #function,
156+
line: UInt = #line
157+
) { /* Do nothing */ }
158+
105159
/// A type that emits log messages with specific metadata.
106160
@usableFromInline
107161
protocol Loggable {}

Tests/AsyncObjectsTests/CancellationSourceTests.swift

+41-15
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@ class CancellationSourceTests: XCTestCase {
99
let task = Task { try await Task.sleep(seconds: 10) }
1010
source.register(task: task)
1111
source.cancel()
12-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
12+
try await source.wait(forSeconds: 5)
13+
XCTAssertTrue(source.isCancelled)
14+
XCTAssertTrue(task.isCancelled)
1315
}
1416

1517
func testTaskCancellationWithTimeout() async throws {
1618
let task = Task { try await Task.sleep(seconds: 10) }
1719
let source = CancellationSource(cancelAfterNanoseconds: UInt64(1E9))
1820
source.register(task: task)
19-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
21+
try await source.wait(forSeconds: 5)
22+
XCTAssertTrue(source.isCancelled)
23+
XCTAssertTrue(task.isCancelled)
2024
}
2125

2226
#if swift(>=5.7)
@@ -33,7 +37,9 @@ class CancellationSourceTests: XCTestCase {
3337
)
3438
let task = Task { try await Task.sleep(seconds: 10, clock: clock) }
3539
source.register(task: task)
36-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
40+
try await source.wait(forSeconds: 5, clock: clock)
41+
XCTAssertTrue(source.isCancelled)
42+
XCTAssertTrue(task.isCancelled)
3743
}
3844
#endif
3945

@@ -43,7 +49,9 @@ class CancellationSourceTests: XCTestCase {
4349
let task = Task { try await Task.sleep(seconds: 10) }
4450
source.register(task: task)
4551
pSource.cancel()
46-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
52+
try await source.wait(forSeconds: 5)
53+
XCTAssertTrue(source.isCancelled)
54+
XCTAssertTrue(task.isCancelled)
4755
}
4856

4957
func testTaskCancellationWithMultipleLinkedSources() async throws {
@@ -53,7 +61,9 @@ class CancellationSourceTests: XCTestCase {
5361
let task = Task { try await Task.sleep(seconds: 10) }
5462
source.register(task: task)
5563
pSource1.cancel()
56-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
64+
try await source.wait(forSeconds: 5)
65+
XCTAssertTrue(source.isCancelled)
66+
XCTAssertTrue(task.isCancelled)
5767
}
5868

5969
func testAlreadyCancelledTask() async throws {
@@ -76,7 +86,9 @@ class CancellationSourceTests: XCTestCase {
7686
source.register(task: task)
7787
try await task.value
7888
source.cancel()
79-
XCTAssertFalse(task.isCancelled)
89+
try await source.wait(forSeconds: 5)
90+
XCTAssertTrue(source.isCancelled)
91+
try await task.value
8092
}
8193

8294
func testConcurrentCancellation() async throws {
@@ -87,15 +99,19 @@ class CancellationSourceTests: XCTestCase {
8799
for _ in 0..<10 { group.addTask { source.cancel() } }
88100
await group.waitForAll()
89101
}
90-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
102+
try await source.wait(forSeconds: 5)
103+
XCTAssertTrue(source.isCancelled)
104+
XCTAssertTrue(task.isCancelled)
91105
}
92106

93107
func testRegistrationAfterCancellation() async throws {
94108
let source = CancellationSource()
95109
let task = Task { try await Task.sleep(seconds: 10) }
96110
source.cancel()
97111
source.register(task: task)
98-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
112+
try await source.wait(forSeconds: 5)
113+
XCTAssertTrue(source.isCancelled)
114+
XCTAssertTrue(task.isCancelled)
99115
}
100116

101117
func testMultipleTaskCancellation() async throws {
@@ -107,9 +123,11 @@ class CancellationSourceTests: XCTestCase {
107123
source.register(task: task2)
108124
source.register(task: task3)
109125
source.cancel()
110-
try await waitUntil(task1, timeout: 5) { $0.isCancelled }
111-
try await waitUntil(task2, timeout: 5) { $0.isCancelled }
112-
try await waitUntil(task3, timeout: 5) { $0.isCancelled }
126+
try await source.wait(forSeconds: 5)
127+
XCTAssertTrue(source.isCancelled)
128+
XCTAssertTrue(task1.isCancelled)
129+
XCTAssertTrue(task2.isCancelled)
130+
XCTAssertTrue(task3.isCancelled)
113131
}
114132
}
115133

@@ -125,7 +143,9 @@ class CancellationSourceInitializationTests: XCTestCase {
125143
} catch {}
126144
}
127145
source.cancel()
128-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
146+
try await source.wait(forSeconds: 5)
147+
XCTAssertTrue(source.isCancelled)
148+
XCTAssertTrue(task.isCancelled)
129149
}
130150

131151
func testDetachedTaskCancellation() async throws {
@@ -137,7 +157,9 @@ class CancellationSourceInitializationTests: XCTestCase {
137157
} catch {}
138158
}
139159
source.cancel()
140-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
160+
try await source.wait(forSeconds: 5)
161+
XCTAssertTrue(source.isCancelled)
162+
XCTAssertTrue(task.isCancelled)
141163
}
142164

143165
func testThrowingTaskCancellation() async throws {
@@ -146,7 +168,9 @@ class CancellationSourceInitializationTests: XCTestCase {
146168
try await Task.sleep(seconds: 10)
147169
}
148170
source.cancel()
149-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
171+
try await source.wait(forSeconds: 5)
172+
XCTAssertTrue(source.isCancelled)
173+
XCTAssertTrue(task.isCancelled)
150174
}
151175

152176
func testThrowingDetachedTaskCancellation() async throws {
@@ -155,6 +179,8 @@ class CancellationSourceInitializationTests: XCTestCase {
155179
try await Task.sleep(seconds: 10)
156180
}
157181
source.cancel()
158-
try await waitUntil(task, timeout: 5) { $0.isCancelled }
182+
try await source.wait(forSeconds: 5)
183+
XCTAssertTrue(source.isCancelled)
184+
XCTAssertTrue(task.isCancelled)
159185
}
160186
}

0 commit comments

Comments
 (0)