Skip to content

Commit 79fa2c2

Browse files
Burattisebsto
andauthored
[Draft] Detached tasks (#334)
* First prototype * Fix build * Removes task cancellation #334 (comment) * Force user to handle errors #334 (comment) * Remove EventLoop API #334 (comment) * Make DetachedTaskContainer internal #334 (comment) #334 (comment) * Removes @unchecked Sendable #334 (comment) * Invoke awaitAll() from async context * Fix ambiguous expression type for swift 5.7 * Fix visibility of detachedBackgroundTask * Add swift-doc * Add example usage to readme * Add tests --------- Co-authored-by: Sébastien Stormacq <[email protected]>
1 parent ed72bdd commit 79fa2c2

File tree

5 files changed

+239
-5
lines changed

5 files changed

+239
-5
lines changed

Diff for: Sources/AWSLambdaRuntimeCore/DetachedTasks.swift

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import Foundation
15+
import NIOConcurrencyHelpers
16+
import NIOCore
17+
import Logging
18+
19+
/// A container that allows tasks to finish after a synchronous invocation
20+
/// has produced its response.
21+
actor DetachedTasksContainer: Sendable {
22+
23+
struct Context: Sendable {
24+
let eventLoop: EventLoop
25+
let logger: Logger
26+
}
27+
28+
private var context: Context
29+
private var storage: [RegistrationKey: EventLoopFuture<Void>] = [:]
30+
31+
init(context: Context) {
32+
self.context = context
33+
}
34+
35+
/// Adds a detached async task.
36+
///
37+
/// - Parameters:
38+
/// - name: The name of the task.
39+
/// - task: The async task to execute.
40+
/// - Returns: A `RegistrationKey` for the registered task.
41+
func detached(task: @Sendable @escaping () async -> Void) {
42+
let key = RegistrationKey()
43+
let promise = context.eventLoop.makePromise(of: Void.self)
44+
promise.completeWithTask(task)
45+
let task = promise.futureResult.always { [weak self] result in
46+
guard let self else { return }
47+
Task {
48+
await self.removeTask(forKey: key)
49+
}
50+
}
51+
self.storage[key] = task
52+
}
53+
54+
func removeTask(forKey key: RegistrationKey) {
55+
self.storage.removeValue(forKey: key)
56+
}
57+
58+
/// Awaits all registered tasks to complete.
59+
///
60+
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
61+
func awaitAll() -> EventLoopFuture<Void> {
62+
let tasks = storage.values
63+
if tasks.isEmpty {
64+
return context.eventLoop.makeSucceededVoidFuture()
65+
} else {
66+
let context = context
67+
return EventLoopFuture.andAllComplete(Array(tasks), on: context.eventLoop).flatMap { [weak self] in
68+
guard let self else {
69+
return context.eventLoop.makeSucceededFuture(())
70+
}
71+
let promise = context.eventLoop.makePromise(of: Void.self)
72+
promise.completeWithTask {
73+
try await self.awaitAll().get()
74+
}
75+
return promise.futureResult
76+
}
77+
}
78+
}
79+
}
80+
81+
extension DetachedTasksContainer {
82+
/// Lambda detached task registration key.
83+
struct RegistrationKey: Hashable, CustomStringConvertible, Sendable {
84+
var value: String
85+
86+
init() {
87+
// UUID basically
88+
self.value = UUID().uuidString
89+
}
90+
91+
var description: String {
92+
self.value
93+
}
94+
}
95+
}

Diff for: Sources/AWSLambdaRuntimeCore/LambdaContext.swift

+28-2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
8181
let logger: Logger
8282
let eventLoop: EventLoop
8383
let allocator: ByteBufferAllocator
84+
let tasks: DetachedTasksContainer
8485

8586
init(
8687
requestID: String,
@@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
9192
clientContext: String?,
9293
logger: Logger,
9394
eventLoop: EventLoop,
94-
allocator: ByteBufferAllocator
95+
allocator: ByteBufferAllocator,
96+
tasks: DetachedTasksContainer
9597
) {
9698
self.requestID = requestID
9799
self.traceID = traceID
@@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
102104
self.logger = logger
103105
self.eventLoop = eventLoop
104106
self.allocator = allocator
107+
self.tasks = tasks
105108
}
106109
}
107110

@@ -177,7 +180,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
177180
clientContext: clientContext,
178181
logger: logger,
179182
eventLoop: eventLoop,
180-
allocator: allocator
183+
allocator: allocator,
184+
tasks: DetachedTasksContainer(
185+
context: DetachedTasksContainer.Context(
186+
eventLoop: eventLoop,
187+
logger: logger
188+
)
189+
)
181190
)
182191
}
183192

@@ -188,6 +197,23 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
188197
let remaining = deadline - now
189198
return .milliseconds(remaining)
190199
}
200+
201+
var tasks: DetachedTasksContainer {
202+
self.storage.tasks
203+
}
204+
205+
206+
/// Registers a background task that continues running after the synchronous invocation has completed.
207+
/// This is useful for tasks like flushing metrics or performing clean-up operations without delaying the response.
208+
///
209+
/// - Parameter body: An asynchronous closure that performs the background task.
210+
/// - Warning: You will be billed for the milliseconds of Lambda execution time until the very last
211+
/// background task is finished.
212+
public func detachedBackgroundTask(_ body: @escaping @Sendable () async -> ()) {
213+
Task {
214+
await self.tasks.detached(task: body)
215+
}
216+
}
191217

192218
public var debugDescription: String {
193219
"\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))"

Diff for: Sources/AWSLambdaRuntimeCore/LambdaRunner.swift

+16-2
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,27 @@ internal final class LambdaRunner {
9595
if case .failure(let error) = result {
9696
logger.warning("lambda handler returned an error: \(error)")
9797
}
98-
return (invocation, result)
98+
return (invocation, result, context)
9999
}
100-
}.flatMap { invocation, result in
100+
}.flatMap { invocation, result, context in
101101
// 3. report results to runtime engine
102102
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
103103
logger.error("could not report results to lambda runtime engine: \(error)")
104+
// To discuss:
105+
// Do we want to await the tasks in this case?
106+
let promise = context.eventLoop.makePromise(of: Void.self)
107+
promise.completeWithTask {
108+
return try await context.tasks.awaitAll().get()
109+
}
110+
return promise.futureResult
111+
}.map { _ in context }
112+
}
113+
.flatMap { (context: LambdaContext) -> EventLoopFuture<Void> in
114+
let promise = context.eventLoop.makePromise(of: Void.self)
115+
promise.completeWithTask {
116+
try await context.tasks.awaitAll().get()
104117
}
118+
return promise.futureResult
105119
}
106120
}
107121

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AWSLambdaRuntimeCore
16+
import NIO
17+
import XCTest
18+
import Logging
19+
20+
class DetachedTasksTest: XCTestCase {
21+
22+
actor Expectation {
23+
var isFulfilled = false
24+
func fulfill() {
25+
isFulfilled = true
26+
}
27+
}
28+
29+
func testAwaitTasks() async throws {
30+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
31+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
32+
33+
let context = DetachedTasksContainer.Context(
34+
eventLoop: eventLoopGroup.next(),
35+
logger: Logger(label: "test")
36+
)
37+
let expectation = Expectation()
38+
39+
let container = DetachedTasksContainer(context: context)
40+
await container.detached {
41+
try! await Task.sleep(for: .milliseconds(200))
42+
await expectation.fulfill()
43+
}
44+
45+
try await container.awaitAll().get()
46+
let isFulfilled = await expectation.isFulfilled
47+
XCTAssert(isFulfilled)
48+
}
49+
50+
func testAwaitChildrenTasks() async throws {
51+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
52+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
53+
54+
let context = DetachedTasksContainer.Context(
55+
eventLoop: eventLoopGroup.next(),
56+
logger: Logger(label: "test")
57+
)
58+
let expectation1 = Expectation()
59+
let expectation2 = Expectation()
60+
61+
let container = DetachedTasksContainer(context: context)
62+
await container.detached {
63+
await container.detached {
64+
try! await Task.sleep(for: .milliseconds(300))
65+
await expectation1.fulfill()
66+
}
67+
try! await Task.sleep(for: .milliseconds(200))
68+
await container.detached {
69+
try! await Task.sleep(for: .milliseconds(100))
70+
await expectation2.fulfill()
71+
}
72+
}
73+
74+
try await container.awaitAll().get()
75+
let isFulfilled1 = await expectation1.isFulfilled
76+
let isFulfilled2 = await expectation2.isFulfilled
77+
XCTAssert(isFulfilled1)
78+
XCTAssert(isFulfilled2)
79+
}
80+
}

Diff for: readme.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public protocol SimpleLambdaHandler {
474474

475475
### Context
476476

477-
When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging and allocating buffers.
477+
When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging, allocating buffers and dispatch background tasks.
478478

479479
```swift
480480
public struct LambdaContext: CustomDebugStringConvertible, Sendable {
@@ -555,6 +555,25 @@ public struct LambdaInitializationContext: Sendable {
555555
}
556556
```
557557

558+
### Background tasks
559+
560+
The detachedBackgroundTask method allows you to register background tasks that continue running even after the Lambda runtime has reported the result of a synchronous invocation. This is particularly useful for integrations with services like API Gateway or CloudFront, where you can quickly return a response without waiting for non-essential tasks such as flushing metrics or performing non-critical clean-up operations.
561+
562+
```swift
563+
@main
564+
struct MyLambda: SimpleLambdaHandler {
565+
func handle(_ request: APIGatewayV2Request, context: LambdaContext) async throws -> APIGatewayV2Response {
566+
let response = makeResponse()
567+
context.detachedBackgroundTask {
568+
try? await Task.sleep(for: .seconds(3))
569+
print("Background task completed")
570+
}
571+
print("Returning response")
572+
return response
573+
}
574+
}
575+
```
576+
558577
### Configuration
559578

560579
The library’s behavior can be fine tuned using environment variables based configuration. The library supported the following environment variables:

0 commit comments

Comments
 (0)