Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred algorithm #222

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions Evolution/NNNN-deferred.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Deferred

* Proposal: [NNNN](NNNN-deferred.md)
* Authors: [Tristan Celder](https://github.com/tcldr)
* Review Manager: TBD
* Status: **Awaiting implementation**

* Implementation: [[Source](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Sources/AsyncAlgorithms/AsyncDeferredSequence.swift) |
[Tests](https://github.com/tcldr/swift-async-algorithms/blob/pr/deferred/Tests/AsyncAlgorithmsTests/TestDeferred.swift)]
* Decision Notes: [Additional Commentary](https://forums.swift.org/)
* Bugs:

## Introduction

`AsyncDeferredSequence` provides a convenient way to postpone the initialization of a sequence to the point where it is requested by a sequence consumer.

## Motivation

Some source sequences may perform expensive work on initialization. This could be network activity, sensor activity, or anything else that consumes system resources. While this can be mitigated in some simple situtations by only passing around a sequence at the point of use, often it is favorable to be able to pass a sequence to its eventual point of use without commencing its initialization process. This is especially true for sequences which are intended for multicast/broadcast for which a reliable startup and shutdown procedure is essential.

A simple example of a seqeunce which may benefit from being deferred is provided in the documentation for AsyncStream:

```swift
extension QuakeMonitor {

static var quakes: AsyncStream<Quake> {
AsyncStream { continuation in
let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
continuation.yield(quake)
}
continuation.onTermination = { @Sendable _ in
monitor.stopMonitoring()
}
monitor.startMonitoring()
}
}
}
```

In the supplied code sample, the closure provided to the AsyncStream initializer will be executed immediately upon initialization; `QuakeMonitor.startMonitoring()` will be called, and the stream will then begin buffering its contents waiting to be iterated. Whilst this behavior is sometimes desirable, on other occasions it can cause system resources to be consumed unnecessarily.

```swift
let nonDeferredSequence = QuakeMonitor.quakes // `Quake.startMonitoring()` is called now!

...
// at some arbitrary point, possibly hours later...
for await quake in nonDeferredSequence {
print("Quake: \(quake.date)")
}
// Prints out hours of previously buffered quake data before showing the latest
```

## Proposed solution

`AsyncDeferredSequence` uses a supplied closure to create a new asynchronous sequence each time it is iterated. This has the effect of postponing the initialization of an an arbitrary async sequence until the point of use:

```swift
let deferredSequence = deferred(QuakeMonitor.quakes) // Now, initialization is postponed

...
// at some arbitrary point, possibly hours later...
for await quake in deferredSequence { // `Quake.startMonitoring()` is now called
print("Quake: \(quake.date)")
}
// Prints out only the latest quake data
```

Now, potentially expensive system resources are consumed only at the point they're needed.

## Detailed design

`AsyncDeferredSequence` is a trivial algorithm supported by some convenience functions.

### Functions

```swift
public func deferred<Base>(
_ createSequence: @escaping @Sendable () async -> Base
) -> AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable

public func deferred<Base: AsyncSequence & Sendable>(
_ createSequence: @autoclosure @escaping @Sendable () -> Base
) -> AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable
```

The synchronous function can be auto-escaped, simplifying the call-site. While the async variant allows a sequence to be initialized within a concurrency context other than that of the end consumer.

```swift
public struct AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable {
public typealias Element = Base.Element
public struct Iterator: AsyncIteratorProtocol {
public mutating func next() async rethrows -> Element?
}
public func makeAsyncIterator() -> Iterator
}
```

### Naming

The `deferred(_:)` function takes its inspiration from the Combine publisher of the same name with similar functionality. However, `lazy(_:)` could be quite fitting, too.

### Comparison with other libraries

**ReactiveX** ReactiveX has an [API definition of Defer](https://reactivex.io/documentation/operators/defer.html) as a top level operator for generating observables.

**Combine** Combine has an [API definition of Deferred](https://developer.apple.com/documentation/combine/deferred) as a top-level convenience publisher.


## Effect on API resilience

Deferred has a trivial implementation and is marked as `@frozen` and `@inlinable`. This removes the ability of this type and functions to be ABI resilient boundaries at the benefit of being highly optimizable.

## Alternatives considered
99 changes: 99 additions & 0 deletions Sources/AsyncAlgorithms/AsyncDeferredSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each
/// time it is iterated
@inlinable
public func deferred<Base>(
_ createSequence: @escaping @Sendable () async -> Base
) -> AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable {
AsyncDeferredSequence(createSequence)
}

/// Creates an asynchronous sequence that uses the supplied closure to create a new asynchronous sequence each
/// time it is iterated
@inlinable
public func deferred<Base: AsyncSequence & Sendable>(
_ createSequence: @autoclosure @escaping @Sendable () -> Base
) -> AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable {
AsyncDeferredSequence(createSequence)
}

@frozen
public struct AsyncDeferredSequence<Base> where Base: AsyncSequence, Base: Sendable {

@usableFromInline
let createSequence: @Sendable () async -> Base

@inlinable
init(_ createSequence: @escaping @Sendable () async -> Base) {
self.createSequence = createSequence
}
}

extension AsyncDeferredSequence: AsyncSequence {

public typealias Element = Base.Element

public struct Iterator: AsyncIteratorProtocol {

@usableFromInline
enum State {
case pending(@Sendable () async -> Base)
case active(Base.AsyncIterator)
case terminal
}

@usableFromInline
var state: State

@inlinable
init(_ createSequence: @escaping @Sendable () async -> Base) {
self.state = .pending(createSequence)
}

@inlinable
public mutating func next() async rethrows -> Element? {
switch state {
case .pending(let generator):
state = .active(await generator().makeAsyncIterator())
return try await next()
case .active(var base):
do {
if let value = try await base.next() {
state = .active(base)
return value
}
else {
state = .terminal
return nil
}
}
catch let error {
state = .terminal
throw error
}
case .terminal:
return nil
}
}
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(createSequence)
}
}

extension AsyncDeferredSequence: Sendable { }

@available(*, unavailable)
extension AsyncDeferredSequence.Iterator: Sendable { }
114 changes: 114 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestDeferred.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

@preconcurrency import XCTest
import AsyncAlgorithms

final class TestDeferred: XCTestCase {
func test_deferred() async {
let expected = [0,1,2,3,4]
let sequence = deferred {
return expected.async
}
var iterator = sequence.makeAsyncIterator()
var actual = [Int]()
while let item = await iterator.next() {
actual.append(item)
}
XCTAssertEqual(expected, actual)
let pastEnd = await iterator.next()
XCTAssertNil(pastEnd)
}

func test_deferred_remains_idle_pending_consumer() async {
let expectation = expectation(description: "pending")
expectation.isInverted = true
let _ = deferred {
AsyncStream { continuation in
expectation.fulfill()
continuation.yield(0)
continuation.finish()
}
}
wait(for: [expectation], timeout: 1.0)
}

func test_deferred_generates_new_sequence_per_consumer() async {
let expectation = expectation(description: "started")
expectation.expectedFulfillmentCount = 3
let sequence = deferred {
AsyncStream { continuation in
expectation.fulfill()
continuation.yield(0)
continuation.finish()
}
}
for await _ in sequence { }
for await _ in sequence { }
for await _ in sequence { }
wait(for: [expectation], timeout: 1.0)
}

func test_deferred_throws() async {
let expectation = expectation(description: "throws")
let sequence = deferred {
AsyncThrowingStream<Int, Error> { continuation in
continuation.finish(throwing: Failure())
}
}
var iterator = sequence.makeAsyncIterator()
do {
while let _ = try await iterator.next() { }
}
catch {
expectation.fulfill()
}
wait(for: [expectation], timeout: 1.0)
let pastEnd = try! await iterator.next()
XCTAssertNil(pastEnd)
}

func test_deferred_autoclosure() async {
let expected = [0,1,2,3,4]
let sequence = deferred(expected.async)
var iterator = sequence.makeAsyncIterator()
var actual = [Int]()
while let item = await iterator.next() {
actual.append(item)
}
XCTAssertEqual(expected, actual)
let pastEnd = await iterator.next()
XCTAssertNil(pastEnd)
}

func test_deferred_cancellation() async {
let source = Indefinite(value: 0)
let sequence = deferred(source.async)
let finished = expectation(description: "finished")
let iterated = expectation(description: "iterated")
let task = Task {
var firstIteration = false
for await _ in sequence {
if !firstIteration {
firstIteration = true
iterated.fulfill()
}
}
finished.fulfill()
}
// ensure the other task actually starts
wait(for: [iterated], timeout: 1.0)
// cancellation should ensure the loop finishes
// without regards to the remaining underlying sequence
task.cancel()
wait(for: [finished], timeout: 1.0)
}
}