From 2ecddcfd9c26bbce846ecfc6cd688240e21e50ba Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 21 Feb 2025 17:42:31 +0100 Subject: [PATCH 1/5] Fixes to Local Lambda Server --- .../Lambda+LocalServer.swift | 214 ++++++++++++------ .../AWSLambdaRuntimeCore/LambdaRuntime.swift | 3 +- 2 files changed, 142 insertions(+), 75 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 64a9acb7..0732ab28 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// #if DEBUG +import DequeModule import Dispatch import Logging import NIOConcurrencyHelpers @@ -47,24 +48,15 @@ extension Lambda { /// - note: This API is designed strictly for local testing and is behind a DEBUG flag static func withLocalServer( invocationEndpoint: String? = nil, - _ body: @escaping () async throws -> Void + _ body: sending @escaping () async throws -> Void ) async throws { + var logger = Logger(label: "LocalServer") + logger.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info - // launch the local server and wait for it to be started before running the body - try await withThrowingTaskGroup(of: Void.self) { group in - // this call will return when the server calls continuation.resume() - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - group.addTask { - do { - try await LambdaHttpServer(invocationEndpoint: invocationEndpoint).start( - continuation: continuation - ) - } catch { - continuation.resume(throwing: error) - } - } - } - // now that server is started, run the Lambda function itself + try await LambdaHTTPServer.withLocalServer( + invocationEndpoint: invocationEndpoint, + logger: logger + ) { try await body() } } @@ -84,34 +76,38 @@ extension Lambda { /// 1. POST /invoke - the client posts the event to the lambda function /// /// This server passes the data received from /invoke POST request to the lambda function (GET /next) and then forwards the response back to the client. -private struct LambdaHttpServer { - private let logger: Logger - private let group: EventLoopGroup - private let host: String - private let port: Int +private struct LambdaHTTPServer { private let invocationEndpoint: String private let invocationPool = Pool() private let responsePool = Pool() - init(invocationEndpoint: String?) { - var logger = Logger(label: "LocalServer") - logger.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info - self.logger = logger - self.group = MultiThreadedEventLoopGroup.singleton - self.host = "127.0.0.1" - self.port = 7000 + private init( + invocationEndpoint: String? + ) { self.invocationEndpoint = invocationEndpoint ?? "/invoke" } - func start(continuation: CheckedContinuation) async throws { - let channel = try await ServerBootstrap(group: self.group) + private enum TaskResult: Sendable { + case closureResult(Swift.Result) + case serverReturned(Swift.Result) + } + + static func withLocalServer( + invocationEndpoint: String?, + host: String = "127.0.0.1", + port: Int = 7000, + eventLoopGroup: MultiThreadedEventLoopGroup = .singleton, + logger: Logger, + _ closure: sending @escaping () async throws -> Result + ) async throws -> Result { + let channel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(.backlog, value: 256) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) .childChannelOption(.maxMessagesPerRead, value: 1) .bind( - host: self.host, - port: self.port + host: host, + port: port ) { channel in channel.eventLoop.makeCompletedFuture { @@ -129,8 +125,6 @@ private struct LambdaHttpServer { } } - // notify the caller that the server is started - continuation.resume() logger.info( "Server started and listening", metadata: [ @@ -139,30 +133,76 @@ private struct LambdaHttpServer { ] ) + let server = LambdaHTTPServer(invocationEndpoint: invocationEndpoint) + // We are handling each incoming connection in a separate child task. It is important // to use a discarding task group here which automatically discards finished child tasks. // A normal task group retains all child tasks and their outputs in memory until they are // consumed by iterating the group or by exiting the group. Since, we are never consuming // the results of the group we need the group to automatically discard them; otherwise, this // would result in a memory leak over time. - try await withThrowingDiscardingTaskGroup { group in - try await channel.executeThenClose { inbound in - for try await connectionChannel in inbound { - - group.addTask { - logger.trace("Handling a new connection") - await self.handleConnection(channel: connectionChannel) - logger.trace("Done handling the connection") + let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { group in + + let c = closure + group.addTask { + do { + + let result = try await c() + return .closureResult(.success(result)) + } catch { + return .closureResult(.failure(error)) + } + } + + group.addTask { + do { + try await withThrowingDiscardingTaskGroup { taskGroup in + try await channel.executeThenClose { inbound in + for try await connectionChannel in inbound { + + taskGroup.addTask { + logger.trace("Handling a new connection") + await server.handleConnection(channel: connectionChannel, logger: logger) + logger.trace("Done handling the connection") + } + } + } } + return .serverReturned(.success(())) + } catch { + return .serverReturned(.failure(error)) + } + } + + let task1 = await group.next()! + group.cancelAll() + let task2 = await group.next()! + + switch task1 { + case .closureResult(let result): + return result + + case .serverReturned: + switch task2 { + case .closureResult(let result): + return result + + case .serverReturned: + fatalError() } } } + logger.info("Server shutting down") + return try result.get() } + + /// This method handles individual TCP connections private func handleConnection( - channel: NIOAsyncChannel + channel: NIOAsyncChannel, + logger: Logger ) async { var requestHead: HTTPRequestHead! @@ -186,12 +226,14 @@ private struct LambdaHttpServer { // process the request let response = try await self.processRequest( head: requestHead, - body: requestBody + body: requestBody, + logger: logger ) // send the responses try await self.sendResponse( response: response, - outbound: outbound + outbound: outbound, + logger: logger ) requestHead = nil @@ -214,15 +256,15 @@ private struct LambdaHttpServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processRequest(head: HTTPRequestHead, body: ByteBuffer?) async throws -> LocalServerResponse { + private func processRequest(head: HTTPRequestHead, body: ByteBuffer?, logger: Logger) async throws -> LocalServerResponse { if let body { - self.logger.trace( + logger.trace( "Processing request", metadata: ["URI": "\(head.method) \(head.uri)", "Body": "\(String(buffer: body))"] ) } else { - self.logger.trace("Processing request", metadata: ["URI": "\(head.method) \(head.uri)"]) + logger.trace("Processing request", metadata: ["URI": "\(head.method) \(head.uri)"]) } switch (head.method, head.uri) { @@ -237,7 +279,9 @@ private struct LambdaHttpServer { } // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" - logger.trace("/invoke received invocation", metadata: ["requestId": "\(requestId)"]) + var logger = logger + logger[metadataKey: "requestID"] = "\(requestId)" + logger.trace("/invoke received invocation") await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request @@ -273,9 +317,9 @@ private struct LambdaHttpServer { case (.GET, let url) where url.hasSuffix(Consts.getNextInvocationURLSuffix): // pop the tasks from the queue - self.logger.trace("/next waiting for /invoke") + logger.trace("/next waiting for /invoke") for try await invocation in self.invocationPool { - self.logger.trace("/next retrieved invocation", metadata: ["requestId": "\(invocation.requestId)"]) + logger.trace("/next retrieved invocation", metadata: ["requestId": "\(invocation.requestId)"]) // this call also stores the invocation requestId into the response return invocation.makeResponse(status: .accepted) } @@ -322,12 +366,13 @@ private struct LambdaHttpServer { private func sendResponse( response: LocalServerResponse, - outbound: NIOAsyncChannelOutboundWriter + outbound: NIOAsyncChannelOutboundWriter, + logger: Logger ) async throws { var headers = HTTPHeaders(response.headers ?? []) headers.add(name: "Content-Length", value: "\(response.body?.readableBytes ?? 0)") - self.logger.trace("Writing response", metadata: ["requestId": "\(response.requestId ?? "")"]) + logger.trace("Writing response", metadata: ["requestId": "\(response.requestId ?? "")"]) try await outbound.write( HTTPServerResponsePart.head( HTTPResponseHead( @@ -350,44 +395,67 @@ private struct LambdaHttpServer { private final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { typealias Element = T - private let _buffer = Mutex>(.init()) - private let _continuation = Mutex?>(nil) + struct State { + enum State { + case buffer(Deque) + case continuation(CheckedContinuation?) + } - /// retrieve the first element from the buffer - public func popFirst() async -> T? { - self._buffer.withLock { $0.popFirst() } + var state: State + + init() { + self.state = .buffer([]) + } } + private let lock = Mutex(.init()) + /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ invocation: T) async { // if the iterator is waiting for an element, give it to it // otherwise, enqueue the element - if let continuation = self._continuation.withLock({ $0 }) { - self._continuation.withLock { $0 = nil } - continuation.resume(returning: invocation) - } else { - self._buffer.withLock { $0.append(invocation) } + let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in + switch state.state { + case .continuation(let continuation): + state.state = .buffer([]) + return continuation + + case .buffer(var buffer): + buffer.append(invocation) + state.state = .buffer(buffer) + return nil + } } + + maybeContinuation?.resume(returning: invocation) } func next() async throws -> T? { - // exit the async for loop if the task is cancelled guard !Task.isCancelled else { return nil } - if let element = await self.popFirst() { - return element - } else { - // we can't return nil if there is nothing to dequeue otherwise the async for loop will stop - // wait for an element to be enqueued - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - // store the continuation for later, when an element is enqueued - self._continuation.withLock { - $0 = continuation + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let nextAction = self.lock.withLock { state -> T? in + switch state.state { + case .buffer(var buffer): + if let first = buffer.popFirst() { + state.state = .buffer(buffer) + return first + } else { + state.state = .continuation(continuation) + return nil + } + + case .continuation: + fatalError("Concurrent invocations to next(). This is illigal.") } } + + guard let nextAction else { return } + + continuation.resume(returning: nextAction) } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift index 317ee7ea..9549b6d0 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift @@ -85,8 +85,7 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St #if DEBUG // we're not running on Lambda and we're compiled in DEBUG mode, // let's start a local server for testing - try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) - { + try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) { try await LambdaRuntimeClient.withRuntimeClient( configuration: .init(ip: "127.0.0.1", port: 7000), From 8cf441634831d41c7443eb270afd80cdb5b540db Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 27 Feb 2025 11:48:25 +0100 Subject: [PATCH 2/5] Fixes for Swift 6. --- .../Lambda+LocalServer.swift | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 0732ab28..63539778 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -93,6 +93,14 @@ private struct LambdaHTTPServer { case serverReturned(Swift.Result) } + struct UnsafeTransferBox: @unchecked Sendable { + let value: Value + + init(value: sending Value) { + self.value = value + } + } + static func withLocalServer( invocationEndpoint: String?, host: String = "127.0.0.1", @@ -135,18 +143,13 @@ private struct LambdaHTTPServer { let server = LambdaHTTPServer(invocationEndpoint: invocationEndpoint) - // We are handling each incoming connection in a separate child task. It is important - // to use a discarding task group here which automatically discards finished child tasks. - // A normal task group retains all child tasks and their outputs in memory until they are - // consumed by iterating the group or by exiting the group. Since, we are never consuming - // the results of the group we need the group to automatically discard them; otherwise, this - // would result in a memory leak over time. + // Sadly the Swift compiler does not understand that the passed in closure will only be + // invoked once. Because of this we need an unsafe transfer box here. Buuuh! + let closureBox = UnsafeTransferBox(value: closure) let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { group in - - let c = closure group.addTask { + let c = closureBox.value do { - let result = try await c() return .closureResult(.success(result)) } catch { @@ -156,6 +159,12 @@ private struct LambdaHTTPServer { group.addTask { do { + // We are handling each incoming connection in a separate child task. It is important + // to use a discarding task group here which automatically discards finished child tasks. + // A normal task group retains all child tasks and their outputs in memory until they are + // consumed by iterating the group or by exiting the group. Since, we are never consuming + // the results of the group we need the group to automatically discard them; otherwise, this + // would result in a memory leak over time. try await withThrowingDiscardingTaskGroup { taskGroup in try await channel.executeThenClose { inbound in for try await connectionChannel in inbound { From 59a9a188fc578e6dd109725fb348ca9dee46bb7e Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 27 Feb 2025 15:19:07 +0100 Subject: [PATCH 3/5] PR comments --- Package.swift | 3 +- .../Lambda+LocalServer.swift | 62 +++++++++++-------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/Package.swift b/Package.swift index bcc45444..9eac9771 100644 --- a/Package.swift +++ b/Package.swift @@ -19,6 +19,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.4"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"), ], targets: [ .target( @@ -31,10 +32,10 @@ let package = Package( .target( name: "AWSLambdaRuntimeCore", dependencies: [ + .product(name: "DequeModule", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), .product(name: "NIOHTTP1", package: "swift-nio"), .product(name: "NIOCore", package: "swift-nio"), - .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), ] ), diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 63539778..9c58c698 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -16,7 +16,6 @@ import DequeModule import Dispatch import Logging -import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import NIOPosix @@ -183,21 +182,29 @@ private struct LambdaHTTPServer { } } - let task1 = await group.next()! + // Now that the local HTTP server and LambdaHandler tasks are started, wait for the + // first of the two that will terminate. + // When the first task terminates, cancel the group and collect the result of the + // second task. + + // collect and return the result of the LambdaHandler + let serverOrHandlerResult1 = await group.next()! group.cancelAll() - let task2 = await group.next()! - switch task1 { + switch serverOrHandlerResult1 { case .closureResult(let result): return result - case .serverReturned: - switch task2 { + case .serverReturned(let result): + logger.error("Server shutdown before closure completed", metadata: [ + "error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")" + ]) + switch await group.next()! { case .closureResult(let result): return result case .serverReturned: - fatalError() + fatalError("Only one task is a server, and only one can return `serverReturned`") } } } @@ -404,34 +411,26 @@ private struct LambdaHTTPServer { private final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { typealias Element = T - struct State { - enum State { - case buffer(Deque) - case continuation(CheckedContinuation?) - } - - var state: State - - init() { - self.state = .buffer([]) - } + enum State: ~Copyable { + case buffer(Deque) + case continuation(CheckedContinuation?) } - private let lock = Mutex(.init()) + private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ invocation: T) async { // if the iterator is waiting for an element, give it to it // otherwise, enqueue the element let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in - switch state.state { + switch consume state { case .continuation(let continuation): - state.state = .buffer([]) + state = .buffer([]) return continuation case .buffer(var buffer): buffer.append(invocation) - state.state = .buffer(buffer) + state = .buffer(buffer) return nil } } @@ -447,18 +446,18 @@ private struct LambdaHTTPServer { return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let nextAction = self.lock.withLock { state -> T? in - switch state.state { + switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { - state.state = .buffer(buffer) + state = .buffer(buffer) return first } else { - state.state = .continuation(continuation) + state = .continuation(continuation) return nil } case .continuation: - fatalError("Concurrent invocations to next(). This is illigal.") + fatalError("Concurrent invocations to next(). This is illegal.") } } @@ -509,3 +508,14 @@ private struct LambdaHTTPServer { } } #endif + +extension Result { + var maybeError: Failure? { + switch self { + case .success: + return nil + case .failure(let error): + return error + } + } +} From 207905ce81be03f126b7ba77e6f7f953ec773a65 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 27 Feb 2025 15:44:45 +0100 Subject: [PATCH 4/5] Format fix --- Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift index 9549b6d0..317ee7ea 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift @@ -85,7 +85,8 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St #if DEBUG // we're not running on Lambda and we're compiled in DEBUG mode, // let's start a local server for testing - try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) { + try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) + { try await LambdaRuntimeClient.withRuntimeClient( configuration: .init(ip: "127.0.0.1", port: 7000), From 86548c193b19f2b0a16aa543c56178cdfe59e53e Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 27 Feb 2025 15:59:46 +0100 Subject: [PATCH 5/5] swift-format --- .../Lambda+LocalServer.swift | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 9c58c698..4bb113a7 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -104,7 +104,7 @@ private struct LambdaHTTPServer { invocationEndpoint: String?, host: String = "127.0.0.1", port: Int = 7000, - eventLoopGroup: MultiThreadedEventLoopGroup = .singleton, + eventLoopGroup: MultiThreadedEventLoopGroup = .singleton, logger: Logger, _ closure: sending @escaping () async throws -> Result ) async throws -> Result { @@ -145,7 +145,8 @@ private struct LambdaHTTPServer { // Sadly the Swift compiler does not understand that the passed in closure will only be // invoked once. Because of this we need an unsafe transfer box here. Buuuh! let closureBox = UnsafeTransferBox(value: closure) - let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { group in + let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { + group in group.addTask { let c = closureBox.value do { @@ -196,9 +197,12 @@ private struct LambdaHTTPServer { return result case .serverReturned(let result): - logger.error("Server shutdown before closure completed", metadata: [ - "error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")" - ]) + logger.error( + "Server shutdown before closure completed", + metadata: [ + "error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")" + ] + ) switch await group.next()! { case .closureResult(let result): return result @@ -213,8 +217,6 @@ private struct LambdaHTTPServer { return try result.get() } - - /// This method handles individual TCP connections private func handleConnection( channel: NIOAsyncChannel, @@ -272,7 +274,11 @@ private struct LambdaHTTPServer { /// - body: the HTTP request body /// - Throws: /// - Returns: the response to send back to the client or the Lambda function - private func processRequest(head: HTTPRequestHead, body: ByteBuffer?, logger: Logger) async throws -> LocalServerResponse { + private func processRequest( + head: HTTPRequestHead, + body: ByteBuffer?, + logger: Logger + ) async throws -> LocalServerResponse { if let body { logger.trace( @@ -296,7 +302,7 @@ private struct LambdaHTTPServer { // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" var logger = logger - logger[metadataKey: "requestID"] = "\(requestId)" + logger[metadataKey: "requestID"] = "\(requestId)" logger.trace("/invoke received invocation") await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))