diff --git a/Examples/ServiceLifeCycle/.gitignore b/Examples/ServiceLifeCycle/.gitignore new file mode 100644 index 00000000..0023a534 --- /dev/null +++ b/Examples/ServiceLifeCycle/.gitignore @@ -0,0 +1,8 @@ +.DS_Store +/.build +/Packages +xcuserdata/ +DerivedData/ +.swiftpm/configuration/registries.json +.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata +.netrc diff --git a/Examples/ServiceLifeCycle/Package.swift b/Examples/ServiceLifeCycle/Package.swift new file mode 100644 index 00000000..c6aacd74 --- /dev/null +++ b/Examples/ServiceLifeCycle/Package.swift @@ -0,0 +1,56 @@ +// swift-tools-version: 6.0 +// The swift-tools-version declares the minimum version of Swift required to build this package. + +import PackageDescription + +// needed for CI to test the local version of the library +import struct Foundation.URL + +let package = Package( + name: "LambdaWithServiceLifecycle", + platforms: [ + .macOS(.v15) + ], + dependencies: [ + .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.23.0"), + .package(url: "https://github.com/swift-server/swift-aws-lambda-runtime.git", branch: "main"), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.6.3"), + ], + targets: [ + .executableTarget( + name: "LambdaWithServiceLifecycle", + dependencies: [ + .product(name: "PostgresNIO", package: "postgres-nio"), + .product(name: "AWSLambdaRuntimeService", package: "swift-aws-lambda-runtime"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), + ] + ) + ] +) + +if let localDepsPath = Context.environment["LAMBDA_USE_LOCAL_DEPS"], + localDepsPath != "", + let v = try? URL(fileURLWithPath: localDepsPath).resourceValues(forKeys: [.isDirectoryKey]), + v.isDirectory == true +{ + // when we use the local runtime as deps, let's remove the dependency added above + let indexToRemove = package.dependencies.firstIndex { dependency in + if case .sourceControl( + name: _, + location: "https://github.com/swift-server/swift-aws-lambda-runtime.git", + requirement: _ + ) = dependency.kind { + return true + } + return false + } + if let indexToRemove { + package.dependencies.remove(at: indexToRemove) + } + + // then we add the dependency on LAMBDA_USE_LOCAL_DEPS' path (typically ../..) + print("[INFO] Compiling against swift-aws-lambda-runtime located at \(localDepsPath)") + package.dependencies += [ + .package(name: "swift-aws-lambda-runtime", path: localDepsPath) + ] +} diff --git a/Examples/ServiceLifeCycle/Sources/Lambda.swift b/Examples/ServiceLifeCycle/Sources/Lambda.swift new file mode 100644 index 00000000..f199cf99 --- /dev/null +++ b/Examples/ServiceLifeCycle/Sources/Lambda.swift @@ -0,0 +1,92 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntimeService +import Logging +import PostgresNIO +import ServiceLifecycle + +@main +struct LambdaFunction { + + static func main() async throws { + + var logger = Logger(label: "ServiceLifecycleExample") + logger.logLevel = .trace + + let pgClient = try preparePostgresClient( + host: Lambda.env("DB_HOST") ?? "localhost", + user: Lambda.env("DB_USER") ?? "postgres", + password: Lambda.env("DB_PASSWORD") ?? "secret", + dbName: Lambda.env("DB_NAME") ?? "test" + ) + + /// Instantiate LambdaRuntime with a closure handler implementing the business logic of the Lambda function + let runtime = LambdaRuntimeService(logger: logger) { (event: String, context: LambdaContext) in + + do { + // Use initialized service within the handler + // IMPORTANT - CURRENTLY WHEN THERE IS AN ERROR, THIS CALL HANGS WHEN DB IS NOT REACHABLE + // https://github.com/vapor/postgres-nio/issues/489 + let rows = try await pgClient.query("SELECT id, username FROM users") + for try await (id, username) in rows.decode((Int, String).self) { + logger.debug("\(id) : \(username)") + } + } catch { + logger.error("PG Error: \(error)") + } + } + + /// Use ServiceLifecycle to manage the initialization and termination + /// of the PGClient together with the LambdaRuntime + let serviceGroup = ServiceGroup( + services: [pgClient, runtime], + gracefulShutdownSignals: [.sigterm, .sigint], // add SIGINT for CTRL+C in local testing + // cancellationSignals: [.sigint], + logger: logger + ) + try await serviceGroup.run() + + // perform any cleanup here + } + + private static func preparePostgresClient( + host: String, + user: String, + password: String, + dbName: String + ) throws -> PostgresClient { + + var tlsConfig = TLSConfiguration.makeClientConfiguration() + // Load the root certificate + let rootCert = try NIOSSLCertificate.fromPEMBytes(Array(eu_central_1_bundle_pem.utf8)) + + // Add the root certificate to the TLS configuration + tlsConfig.trustRoots = .certificates(rootCert) + + // Enable full verification + tlsConfig.certificateVerification = .fullVerification + + let config = PostgresClient.Configuration( + host: host, + port: 5432, + username: user, + password: password, + database: dbName, + tls: .prefer(tlsConfig) + ) + + return PostgresClient(configuration: config) + } +} diff --git a/Examples/ServiceLifeCycle/Sources/RootRDSCert.swift b/Examples/ServiceLifeCycle/Sources/RootRDSCert.swift new file mode 100644 index 00000000..23cab9f3 --- /dev/null +++ b/Examples/ServiceLifeCycle/Sources/RootRDSCert.swift @@ -0,0 +1,95 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// you can download the root certificate for your RDS instance region from the following link: +// https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.SSL.html + +let eu_central_1_bundle_pem = """ + -----BEGIN CERTIFICATE----- + MIICtDCCAjmgAwIBAgIQenQbcP/Zbj9JxvZ+jXbRnTAKBggqhkjOPQQDAzCBmTEL + MAkGA1UEBhMCVVMxIjAgBgNVBAoMGUFtYXpvbiBXZWIgU2VydmljZXMsIEluYy4x + EzARBgNVBAsMCkFtYXpvbiBSRFMxCzAJBgNVBAgMAldBMTIwMAYDVQQDDClBbWF6 + b24gUkRTIGV1LWNlbnRyYWwtMSBSb290IENBIEVDQzM4NCBHMTEQMA4GA1UEBwwH + U2VhdHRsZTAgFw0yMTA1MjEyMjMzMjRaGA8yMTIxMDUyMTIzMzMyNFowgZkxCzAJ + BgNVBAYTAlVTMSIwIAYDVQQKDBlBbWF6b24gV2ViIFNlcnZpY2VzLCBJbmMuMRMw + EQYDVQQLDApBbWF6b24gUkRTMQswCQYDVQQIDAJXQTEyMDAGA1UEAwwpQW1hem9u + IFJEUyBldS1jZW50cmFsLTEgUm9vdCBDQSBFQ0MzODQgRzExEDAOBgNVBAcMB1Nl + YXR0bGUwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAATlBHiEM9LoEb1Hdnd5j2VpCDOU + 5nGuFoBD8ROUCkFLFh5mHrHfPXwBc63heW9WrP3qnDEm+UZEUvW7ROvtWCTPZdLz + Z4XaqgAlSqeE2VfUyZOZzBSgUUJk7OlznXfkCMOjQjBAMA8GA1UdEwEB/wQFMAMB + Af8wHQYDVR0OBBYEFDT/ThjQZl42Nv/4Z/7JYaPNMly2MA4GA1UdDwEB/wQEAwIB + hjAKBggqhkjOPQQDAwNpADBmAjEAnZWmSgpEbmq+oiCa13l5aGmxSlfp9h12Orvw + Dq/W5cENJz891QD0ufOsic5oGq1JAjEAp5kSJj0MxJBTHQze1Aa9gG4sjHBxXn98 + 4MP1VGsQuhfndNHQb4V0Au7OWnOeiobq + -----END CERTIFICATE----- + -----BEGIN CERTIFICATE----- + MIIEBTCCAu2gAwIBAgIRAO8bekN7rUReuNPG8pSTKtEwDQYJKoZIhvcNAQELBQAw + gZoxCzAJBgNVBAYTAlVTMSIwIAYDVQQKDBlBbWF6b24gV2ViIFNlcnZpY2VzLCBJ + bmMuMRMwEQYDVQQLDApBbWF6b24gUkRTMQswCQYDVQQIDAJXQTEzMDEGA1UEAwwq + QW1hem9uIFJEUyBldS1jZW50cmFsLTEgUm9vdCBDQSBSU0EyMDQ4IEcxMRAwDgYD + VQQHDAdTZWF0dGxlMCAXDTIxMDUyMTIyMjM0N1oYDzIwNjEwNTIxMjMyMzQ3WjCB + mjELMAkGA1UEBhMCVVMxIjAgBgNVBAoMGUFtYXpvbiBXZWIgU2VydmljZXMsIElu + Yy4xEzARBgNVBAsMCkFtYXpvbiBSRFMxCzAJBgNVBAgMAldBMTMwMQYDVQQDDCpB + bWF6b24gUkRTIGV1LWNlbnRyYWwtMSBSb290IENBIFJTQTIwNDggRzExEDAOBgNV + BAcMB1NlYXR0bGUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCTTYds + Tray+Q9VA5j5jTh5TunHKFQzn68ZbOzdqaoi/Rq4ohfC0xdLrxCpfqn2TGDHN6Zi + 2qGK1tWJZEd1H0trhzd9d1CtGK+3cjabUmz/TjSW/qBar7e9MA67/iJ74Gc+Ww43 + A0xPNIWcL4aLrHaLm7sHgAO2UCKsrBUpxErOAACERScVYwPAfu79xeFcX7DmcX+e + lIqY16pQAvK2RIzrekSYfLFxwFq2hnlgKHaVgZ3keKP+nmXcXmRSHQYUUr72oYNZ + HcNYl2+gxCc9ccPEHM7xncVEKmb5cWEWvVoaysgQ+osi5f5aQdzgC2X2g2daKbyA + XL/z5FM9GHpS5BJjAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYE + FBDAiJ7Py9/A9etNa/ebOnx5l5MGMA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG9w0B + AQsFAAOCAQEALMh/+81fFPdJV/RrJUeoUvFCGMp8iaANu97NpeJyKitNOv7RoeVP + WjivS0KcCqZaDBs+p6IZ0sLI5ZH098LDzzytcfZg0PsGqUAb8a0MiU/LfgDCI9Ee + jsOiwaFB8k0tfUJK32NPcIoQYApTMT2e26lPzYORSkfuntme2PTHUnuC7ikiQrZk + P+SZjWgRuMcp09JfRXyAYWIuix4Gy0eZ4rpRuaTK6mjAb1/LYoNK/iZ/gTeIqrNt + l70OWRsWW8jEmSyNTIubGK/gGGyfuZGSyqoRX6OKHESkP6SSulbIZHyJ5VZkgtXo + 2XvyRyJ7w5pFyoofrL3Wv0UF8yt/GDszmg== + -----END CERTIFICATE----- + -----BEGIN CERTIFICATE----- + MIIGBDCCA+ygAwIBAgIQM4C8g5iFRucSWdC8EdqHeDANBgkqhkiG9w0BAQwFADCB + mjELMAkGA1UEBhMCVVMxIjAgBgNVBAoMGUFtYXpvbiBXZWIgU2VydmljZXMsIElu + Yy4xEzARBgNVBAsMCkFtYXpvbiBSRFMxCzAJBgNVBAgMAldBMTMwMQYDVQQDDCpB + bWF6b24gUkRTIGV1LWNlbnRyYWwtMSBSb290IENBIFJTQTQwOTYgRzExEDAOBgNV + BAcMB1NlYXR0bGUwIBcNMjEwNTIxMjIyODI2WhgPMjEyMTA1MjEyMzI4MjZaMIGa + MQswCQYDVQQGEwJVUzEiMCAGA1UECgwZQW1hem9uIFdlYiBTZXJ2aWNlcywgSW5j + LjETMBEGA1UECwwKQW1hem9uIFJEUzELMAkGA1UECAwCV0ExMzAxBgNVBAMMKkFt + YXpvbiBSRFMgZXUtY2VudHJhbC0xIFJvb3QgQ0EgUlNBNDA5NiBHMTEQMA4GA1UE + BwwHU2VhdHRsZTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBANeTsD/u + 6saPiY4Sg0GlJlMXMBltnrcGAEkwq34OKQ0bCXqcoNJ2rcAMmuFC5x9Ho1Y3YzB7 + NO2GpIh6bZaO76GzSv4cnimcv9n/sQSYXsGbPD+bAtnN/RvNW1avt4C0q0/ghgF1 + VFS8JihIrgPYIArAmDtGNEdl5PUrdi9y6QGggbRfidMDdxlRdZBe1C18ZdgERSEv + UgSTPRlVczONG5qcQkUGCH83MMqL5MKQiby/Br5ZyPq6rxQMwRnQ7tROuElzyYzL + 7d6kke+PNzG1mYy4cbYdjebwANCtZ2qYRSUHAQsOgybRcSoarv2xqcjO9cEsDiRU + l97ToadGYa4VVERuTaNZxQwrld4mvzpyKuirqZltOqg0eoy8VUsaRPL3dc5aChR0 + dSrBgRYmSAClcR2/2ZCWpXemikwgt031Dsc0A/+TmVurrsqszwbr0e5xqMow9LzO + MI/JtLd0VFtoOkL/7GG2tN8a+7gnLFxpv+AQ0DH5n4k/BY/IyS+H1erqSJhOTQ11 + vDOFTM5YplB9hWV9fp5PRs54ILlHTlZLpWGs3I2BrJwzRtg/rOlvsosqcge9ryai + AKm2j+JBg5wJ19R8oxRy8cfrNTftZePpISaLTyV2B16w/GsSjqixjTQe9LRN2DHk + cC+HPqYyzW2a3pUVyTGHhW6a7YsPBs9yzt6hAgMBAAGjQjBAMA8GA1UdEwEB/wQF + MAMBAf8wHQYDVR0OBBYEFIqA8QkOs2cSirOpCuKuOh9VDfJfMA4GA1UdDwEB/wQE + AwIBhjANBgkqhkiG9w0BAQwFAAOCAgEAOUI90mEIsa+vNJku0iUwdBMnHiO4gm7E + 5JloP7JG0xUr7d0hypDorMM3zVDAL+aZRHsq8n934Cywj7qEp1304UF6538ByGdz + tkfacJsUSYfdlNJE9KbA4T+U+7SNhj9jvePpVjdQbhgzxITE9f8CxY/eM40yluJJ + PhbaWvOiRagzo74wttlcDerzLT6Y/JrVpWhnB7IY8HvzK+BwAdaCsBUPC3HF+kth + CIqLq7J3YArTToejWZAp5OOI6DLPM1MEudyoejL02w0jq0CChmZ5i55ElEMnapRX + 7GQTARHmjgAOqa95FjbHEZzRPqZ72AtZAWKFcYFNk+grXSeWiDgPFOsq6mDg8DDB + 0kfbYwKLFFCC9YFmYzR2YrWw2NxAScccUc2chOWAoSNHiqBbHR8ofrlJSWrtmKqd + YRCXzn8wqXnTS3NNHNccqJ6dN+iMr9NGnytw8zwwSchiev53Fpc1mGrJ7BKTWH0t + ZrA6m32wzpMymtKozlOPYoE5mtZEzrzHEXfa44Rns7XIHxVQSXVWyBHLtIsZOrvW + U5F41rQaFEpEeUQ7sQvqUoISfTUVRNDn6GK6YaccEhCji14APLFIvhRQUDyYMIiM + 4vll0F/xgVRHTgDVQ8b8sxdhSYlqB4Wc2Ym41YRz+X2yPqk3typEZBpc4P5Tt1/N + 89cEIGdbjsA= + -----END CERTIFICATE----- + """ diff --git a/Package.swift b/Package.swift index 96068884..bf8ed462 100644 --- a/Package.swift +++ b/Package.swift @@ -8,19 +8,33 @@ let package = Package( products: [ // this library exports `AWSLambdaRuntimeCore` and adds Foundation convenience methods .library(name: "AWSLambdaRuntime", targets: ["AWSLambdaRuntime"]), + + // this library exports `AWSLambdaRuntime` and adds conformances to `Service` from Swift Service Lifecycle + .library(name: "AWSLambdaRuntimeService", targets: ["AWSLambdaRuntimeService"]), + // this has all the main functionality for lambda and it does not link Foundation .library(name: "AWSLambdaRuntimeCore", targets: ["AWSLambdaRuntimeCore"]), + // plugin to package the lambda, creating an archive that can be uploaded to AWS // requires Linux or at least macOS v15 .plugin(name: "AWSLambdaPackager", targets: ["AWSLambdaPackager"]), + // for testing only .library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.77.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.4"), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.6.3"), ], targets: [ + .target( + name: "AWSLambdaRuntimeService", + dependencies: [ + .byName(name: "AWSLambdaRuntime"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), + ] + ), .target( name: "AWSLambdaRuntime", dependencies: [ diff --git a/Sources/AWSLambdaRuntimeCore/Lambda.swift b/Sources/AWSLambdaRuntimeCore/Lambda.swift index 3ba90e9c..e03fcd9d 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda.swift @@ -16,6 +16,7 @@ import Dispatch import Logging import NIOCore import NIOPosix +import Synchronization #if os(macOS) import Darwin.C @@ -30,33 +31,55 @@ import ucrt #endif public enum Lambda { + + // allow to gracefully shitdown the runtime client loop + // this supports gracefull shutdown of the Lambda runtime when integarted with Swift ServiceLifeCycle + private static let gracefulShutdown: Mutex = Mutex(false) + public static func shutdown() { + Lambda.gracefulShutdown.withLock { + $0 = true + } + } package static func runLoop( runtimeClient: RuntimeClient, handler: Handler, logger: Logger ) async throws where Handler: StreamingLambdaHandler { var handler = handler + var gracefulShutdown: Bool = Lambda.gracefulShutdown.withLock { $0 } + do { + while !Task.isCancelled && !gracefulShutdown { + logger.trace("Waiting for next invocation") + let (invocation, writer) = try await runtimeClient.nextInvocation() - while !Task.isCancelled { - let (invocation, writer) = try await runtimeClient.nextInvocation() - - do { - try await handler.handle( - invocation.event, - responseWriter: writer, - context: LambdaContext( - requestID: invocation.metadata.requestID, - traceID: invocation.metadata.traceID, - invokedFunctionARN: invocation.metadata.invokedFunctionARN, - deadline: DispatchWallTime(millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch), - logger: logger + logger.trace("Received invocation : \(invocation.metadata.requestID)") + do { + try await handler.handle( + invocation.event, + responseWriter: writer, + context: LambdaContext( + requestID: invocation.metadata.requestID, + traceID: invocation.metadata.traceID, + invokedFunctionARN: invocation.metadata.invokedFunctionARN, + deadline: DispatchWallTime( + millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch + ), + logger: logger + ) ) - ) - } catch { - try await writer.reportError(error) - continue + } catch { + try await writer.reportError(error) + continue + } + logger.trace("Completed invocation : \(invocation.metadata.requestID)") + gracefulShutdown = Lambda.gracefulShutdown.withLock { $0 } } + + } catch is CancellationError { + // don't allow cancellation error to propagate further + logger.trace("Lambda runLoop() task has been cancelled") } + logger.trace("Lambda runLoop() terminated \(gracefulShutdown ? "with gracefull shutdown" : "")") } /// The default EventLoop the Lambda is scheduled on. diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift index 317ee7ea..e59f9e6d 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift @@ -13,8 +13,8 @@ //===----------------------------------------------------------------------===// import Logging -import NIOConcurrencyHelpers import NIOCore +import Synchronization #if canImport(FoundationEssentials) import FoundationEssentials @@ -22,12 +22,10 @@ import FoundationEssentials import Foundation #endif -// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today. -// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this -// sadly crashes the compiler today. +// We need `@unchecked` Sendable here until we can make `Handler` `Sendable`. public final class LambdaRuntime: @unchecked Sendable where Handler: StreamingLambdaHandler { - // TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore - let handlerMutex: NIOLockedValueBox + let handlerMutex: Mutex = Mutex(nil) + let logger: Logger let eventLoop: EventLoop @@ -36,7 +34,8 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St eventLoop: EventLoop = Lambda.defaultEventLoop, logger: Logger = Logger(label: "LambdaRuntime") ) { - self.handlerMutex = NIOLockedValueBox(handler) + + handlerMutex.withLock { $0 = handler } self.eventLoop = eventLoop // by setting the log level here, we understand it can not be changed dynamically at runtime @@ -49,11 +48,7 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St } public func run() async throws { - let handler = self.handlerMutex.withLockedValue { handler in - let result = handler - handler = nil - return result - } + let handler = self.handlerMutex.withLock { $0 } guard let handler else { throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce) @@ -106,4 +101,9 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St #endif } } + + /// Gracefully shutdown the runtime client loop. + public func shutdown() { + Lambda.shutdown() + } } diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index bbd16efa..5fafa939 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -406,6 +406,17 @@ private protocol LambdaChannelHandlerDelegate { func connectionErrorHappened(_ error: any Error, channel: any Channel) } +struct UnsafeContext: @unchecked Sendable { + private let _context: ChannelHandlerContext + var context: ChannelHandlerContext { + self._context.eventLoop.preconditionInEventLoop() + return _context + } + init(_ context: ChannelHandlerContext) { + self._context = context + } +} + private final class LambdaChannelHandler { let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix @@ -465,10 +476,37 @@ private final class LambdaChannelHandler func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { switch self.state { case .connected(let context, .idle): - return try await withCheckedThrowingContinuation { - (continuation: CheckedContinuation) in - self.state = .connected(context, .waitingForNextInvocation(continuation)) - self.sendNextRequest(context: context) + return try await withTaskCancellationHandler { + try Task.checkCancellation() + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + self.state = .connected(context, .waitingForNextInvocation(continuation)) + + let unsafeContext = UnsafeContext(context) + context.eventLoop.execute { [nextInvocationPath, defaultHeaders] in + // Send next request. The function `sendNextRequest` requires `self` which is not + // Sendable so just inlined the code instead + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .GET, + uri: nextInvocationPath, + headers: defaultHeaders + ) + let context = unsafeContext.context + context.write(Self.wrapOutboundOut(.head(httpRequest)), promise: nil) + context.write(Self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + } + } onCancel: { + switch self.state { + case .connected(_, .waitingForNextInvocation(let continuation)): + continuation.resume(throwing: CancellationError()) + case .connected(_, .idle): + break + default: + fatalError("Invalid state: \(self.state)") + } } case .connected(_, .sendingResponse), diff --git a/Sources/AWSLambdaRuntimeService/LambdaRuntimeService+init.swift b/Sources/AWSLambdaRuntimeService/LambdaRuntimeService+init.swift new file mode 100644 index 00000000..b7ec569d --- /dev/null +++ b/Sources/AWSLambdaRuntimeService/LambdaRuntimeService+init.swift @@ -0,0 +1,84 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntime +import Logging + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import struct Foundation.Data +import class Foundation.JSONDecoder +import class Foundation.JSONEncoder +#endif + +extension LambdaRuntimeService { + + /// Initialize an instance with a `LambdaHandler` defined in the form of a closure **with a non-`Void` return type**. + /// - Parameters: + /// - logger: The logger object that will be used to log messages. `Logger(label: "LambdaRuntimeService")` used as default. + /// - decoder: The decoder object that will be used to decode the incoming `ByteBuffer` event into the generic `Event` type. `JSONDecoder()` used as default. + /// - encoder: The encoder object that will be used to encode the generic `Output` into a `ByteBuffer`. `JSONEncoder()` used as default. + /// - body: The handler in the form of a closure. + public convenience init( + logger: Logger = Logger(label: "LambdaRuntimeService"), + decoder: JSONDecoder = JSONDecoder(), + encoder: JSONEncoder = JSONEncoder(), + body: sending @escaping (Event, LambdaContext) async throws -> Output + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + LambdaJSONEventDecoder, + LambdaJSONOutputEncoder + > + { + let handler = LambdaCodableAdapter( + encoder: encoder, + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler, logger: logger) + } + + /// Initialize an instance with a `LambdaHandler` defined in the form of a closure **with a `Void` return type**. + /// - Parameters: + /// - logger: The logger object that will be used to log messages. `Logger(label: "LambdaRuntimeService")` used as default. + /// - decoder: The decoder object that will be used to decode the incoming `ByteBuffer` event into the generic `Event` type. `JSONDecoder()` used as default. + /// - body: The handler in the form of a closure. + public convenience init( + logger: Logger = Logger(label: "LambdaRuntimeService"), + decoder: JSONDecoder = JSONDecoder(), + body: sending @escaping (Event, LambdaContext) async throws -> Void + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + LambdaJSONEventDecoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: LambdaJSONEventDecoder(decoder), + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler, logger: logger) + } +} diff --git a/Sources/AWSLambdaRuntimeService/LambdaRuntimeService.swift b/Sources/AWSLambdaRuntimeService/LambdaRuntimeService.swift new file mode 100644 index 00000000..a626c7bb --- /dev/null +++ b/Sources/AWSLambdaRuntimeService/LambdaRuntimeService.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_exported import AWSLambdaRuntime +import Logging +import ServiceLifecycle + +/// +/// Encapsulate a LambdaRuntime+Codable to offer the same API but this time exposed as a Swift Service +/// This allows to avoid the Service extra payload for Lambda functions that doesn't need it +/// +public class LambdaRuntimeService: Service, @unchecked Sendable where Handler: StreamingLambdaHandler { + + private let logger: Logger + private let runtime: LambdaRuntime + + public func run() async throws { + try await withTaskCancellationOrGracefulShutdownHandler { + self.logger.debug("LambdaRuntime will start") + try await runtime.run() + } onCancelOrGracefulShutdown: { + self.logger.debug("LambdaRuntime will be cancelled or gracefully shutdown") + self.runtime.shutdown() + } + } + + init(handler: sending Handler, logger: Logger) { + self.logger = logger + self.runtime = LambdaRuntime(handler: handler, logger: logger) + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTests.swift index e779b931..c9679c6e 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTests.swift @@ -86,4 +86,24 @@ struct LambdaRuntimeClientTests { } } } + + @Test + func testCancellation() async throws { + try await LambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: "127.0.0.1", port: 7000), + eventLoop: NIOSingletons.posixEventLoopGroup.next(), + logger: self.logger + ) { runtimeClient in + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + while true { + _ = try await runtimeClient.nextInvocation() + } + } + // wait a small amount to ensure we are waiting for continuation + try await Task.sleep(for: .milliseconds(100)) + group.cancelAll() + } + } + } }