Skip to content

Commit 8048f3c

Browse files
committed
refactor: modify task operation to asynchronous wait to use stream
1 parent 3f3dd36 commit 8048f3c

File tree

3 files changed

+34
-111
lines changed

3 files changed

+34
-111
lines changed

Sources/AsyncObjects/AsyncSemaphore.swift

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import Foundation
2-
32
import OrderedCollections
43

54
/// An object that controls access to a resource across multiple task contexts through use of a traditional counting semaphore.

Sources/AsyncObjects/TaskOperation.swift

+34-109
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import Dispatch
2828
/// // or wait synchronously for completion
2929
/// operation.waitUntilFinished()
3030
/// ```
31-
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
32-
ContinuableCollection, Loggable, @unchecked Sendable
31+
public final class TaskOperation<R: Sendable>: Operation, AsyncObject, Loggable,
32+
@unchecked Sendable
3333
{
3434
/// The asynchronous action to perform as part of the operation..
3535
private let underlyingAction: @Sendable () async throws -> R
@@ -40,6 +40,12 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
4040
/// synchronize data access and modifications.
4141
@usableFromInline
4242
internal let locker: Locker
43+
/// The stream that propagates the operation
44+
/// finish event.
45+
internal var event: AsyncStream<Void>!
46+
/// The continuation of `stream` that controls the asynchronous wait
47+
/// for operation completion.
48+
internal var waiter: AsyncStream<Void>.Continuation!
4349

4450
/// A type representing a set of behaviors for the executed
4551
/// task type and task completion behavior.
@@ -116,9 +122,8 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
116122
willChangeValue(forKey: "isFinished")
117123
locker.perform {
118124
_isFinished = newValue
119-
guard newValue, !continuations.isEmpty else { return }
120-
continuations.forEach { $1.resume() }
121-
continuations = [:]
125+
guard newValue else { return }
126+
waiter.finish()
122127
}
123128
didChangeValue(forKey: "isFinished")
124129
}
@@ -166,11 +171,14 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
166171
self.flags = flags
167172
self.underlyingAction = operation
168173
super.init()
174+
self.event = AsyncStream(
175+
bufferingPolicy: .bufferingOldest(1)
176+
) { self.waiter = $0 }
169177
}
170178

171179
deinit {
172180
execTask?.cancel()
173-
locker.perform { self.continuations.forEach { $1.cancel() } }
181+
waiter.finish()
174182
}
175183

176184
/// Begins the execution of the operation.
@@ -222,99 +230,6 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
222230
}
223231

224232
// MARK: AsyncObject
225-
/// The suspended tasks continuation type.
226-
@usableFromInline
227-
internal typealias Continuation = TrackedContinuation<
228-
GlobalContinuation<Void, Error>
229-
>
230-
/// The continuations stored with an associated key for all the suspended task that are waiting for operation completion.
231-
@usableFromInline
232-
internal private(set) var continuations: [UUID: Continuation] = [:]
233-
234-
/// Add continuation with the provided key in `continuations` map.
235-
///
236-
/// - Parameters:
237-
/// - continuation: The `continuation` to add.
238-
/// - key: The key in the map.
239-
/// - file: The file add request originates from (there's usually no need to pass it
240-
/// explicitly as it defaults to `#fileID`).
241-
/// - function: The function add request originates from (there's usually no need to
242-
/// pass it explicitly as it defaults to `#function`).
243-
/// - line: The line add request originates from (there's usually no need to pass it
244-
/// explicitly as it defaults to `#line`).
245-
/// - preinit: The pre-initialization handler to run
246-
/// in the beginning of this method.
247-
///
248-
/// - Important: The pre-initialization handler must run
249-
/// before any logic in this method.
250-
@inlinable
251-
internal func addContinuation(
252-
_ continuation: Continuation,
253-
withKey key: UUID,
254-
file: String, function: String, line: UInt,
255-
preinit: @Sendable () -> Void
256-
) {
257-
locker.perform {
258-
preinit()
259-
log("Adding", id: key, file: file, function: function, line: line)
260-
guard !continuation.resumed else {
261-
log(
262-
"Already resumed, not tracking", id: key,
263-
file: file, function: function, line: line
264-
)
265-
return
266-
}
267-
268-
guard !isFinished else {
269-
continuation.resume()
270-
log(
271-
"Resumed", id: key,
272-
file: file, function: function, line: line
273-
)
274-
return
275-
}
276-
277-
continuations[key] = continuation
278-
log("Tracking", id: key, file: file, function: function, line: line)
279-
}
280-
}
281-
282-
/// Remove continuation associated with provided key
283-
/// from `continuations` map.
284-
///
285-
/// - Parameters:
286-
/// - continuation: The continuation to remove and cancel.
287-
/// - key: The key in the map.
288-
/// - file: The file remove request originates from (there's usually no need to pass it
289-
/// explicitly as it defaults to `#fileID`).
290-
/// - function: The function remove request originates from (there's usually no need to
291-
/// pass it explicitly as it defaults to `#function`).
292-
/// - line: The line remove request originates from (there's usually no need to pass it
293-
/// explicitly as it defaults to `#line`).
294-
@inlinable
295-
internal func removeContinuation(
296-
_ continuation: Continuation,
297-
withKey key: UUID,
298-
file: String, function: String, line: UInt
299-
) {
300-
locker.perform {
301-
log("Removing", id: key, file: file, function: function, line: line)
302-
continuations.removeValue(forKey: key)
303-
guard !continuation.resumed else {
304-
log(
305-
"Already resumed, not cancelling", id: key,
306-
file: file, function: function, line: line
307-
)
308-
return
309-
}
310-
311-
continuation.cancel()
312-
log(
313-
"Cancelled", id: key,
314-
file: file, function: function, line: line
315-
)
316-
}
317-
}
318233

319234
/// Starts operation asynchronously
320235
/// as part of a new top-level task on behalf of the current actor.
@@ -356,18 +271,28 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
356271
function: String = #function,
357272
line: UInt = #line
358273
) async throws {
359-
guard !isFinished else {
360-
log("Finished", file: file, function: function, line: line)
361-
return
362-
}
363-
364274
let key = UUID()
365275
log("Waiting", id: key, file: file, function: function, line: line)
366-
try await withPromisedContinuation(
367-
withKey: key,
368-
file: file, function: function, line: line
369-
)
370-
log("Finished", id: key, file: file, function: function, line: line)
276+
for await _ in event { break }
277+
do {
278+
try Task.checkCancellation()
279+
} catch {
280+
log(
281+
"Cancelled", id: key,
282+
file: file, function: function, line: line
283+
)
284+
throw error
285+
}
286+
do {
287+
let _ = try await execTask?.value
288+
log("Finished", id: key, file: file, function: function, line: line)
289+
} catch {
290+
log(
291+
"Finished with error: \(error)", id: key,
292+
file: file, function: function, line: line
293+
)
294+
throw error
295+
}
371296
}
372297
}
373298

Sources/AsyncObjects/TaskQueue.swift

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import Foundation
2-
32
import OrderedCollections
43

54
/// An object that acts as a concurrent queue executing submitted tasks concurrently.

0 commit comments

Comments
 (0)