Skip to content

Add new LambdaRuntime #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 5, 2024
Merged
53 changes: 53 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,56 @@ extension LambdaCodableAdapter {
)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable, Output>(
body: @escaping (Event, NewLambdaContext) async throws -> Output,
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
JSONDecoder,
LambdaJSONOutputEncoder<Output>
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default.
package convenience init<Event: Decodable>(
body: @escaping (Event, NewLambdaContext) async throws -> Void,
decoder: JSONDecoder = JSONDecoder()
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
JSONDecoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
4 changes: 4 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Dispatch
import Logging
import NIOCore

extension Lambda {
package static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
Expand Down Expand Up @@ -44,4 +45,7 @@ extension Lambda {
}
}
}

/// The default EventLoop the Lambda is scheduled on.
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}
67 changes: 67 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,70 @@ package struct ClosureHandler<Event: Decodable, Output>: NewLambdaHandler {
try await self.body(event, context)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure.
/// - Parameter body: The handler in the form of a closure.
package convenience init(
body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void
) where Handler == StreamingClosureHandler {
self.init(handler: StreamingClosureHandler(body: body))
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<
Event: Decodable,
Output: Encodable,
Encoder: LambdaOutputEncoder,
Decoder: LambdaEventDecoder
>(
encoder: Encoder,
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
Decoder,
Encoder
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
Decoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
70 changes: 70 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import Logging
import NIOCore
import NIOConcurrencyHelpers

// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
// sadly crashes the compiler today.
package final class NewLambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
let handlerMutex: NIOLockedValueBox<Optional<Handler>>
let logger: Logger
let eventLoop: EventLoop

package init(
handler: sending Handler,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerMutex = NIOLockedValueBox(handler)
self.eventLoop = eventLoop
self.logger = logger
}

package func run() async throws {
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
throw NewLambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable)
}

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) }

let handler = self.handlerMutex.withLockedValue { handler in
let result = handler
handler = nil
return result
}

guard let handler else {
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
}
55 changes: 30 additions & 25 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(
LambdaChannelHandler(delegate: self, logger: self.logger)
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
)
return channel.eventLoop.makeSucceededFuture(())
} catch {
Expand Down Expand Up @@ -433,10 +433,33 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
private let delegate: Delegate
private let configuration: NewLambdaRuntimeClient.Configuration

init(delegate: Delegate, logger: Logger) {
/// These are the default headers that must be sent along an invocation
let defaultHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
let errorHeaders: HTTPHeaders
/// These headers must be sent when streaming a response
let streamingHeaders: HTTPHeaders

init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
self.delegate = delegate
self.logger = logger
self.configuration = configuration
self.defaultHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
]
self.errorHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
self.streamingHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "chunked",
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
Expand Down Expand Up @@ -578,7 +601,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.streamingHeaders
headers: self.streamingHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -604,11 +627,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
NewLambdaRuntimeClient.streamingHeaders
self.streamingHeaders
}

let httpRequest = HTTPRequestHead(
Expand All @@ -634,7 +658,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: NewLambdaRuntimeClient.defaultHeaders
headers: self.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -650,7 +674,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.errorHeaders
headers: self.errorHeaders
)

if self.reusableErrorBuffer == nil {
Expand Down Expand Up @@ -797,22 +821,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
context.fireChannelInactive()
}
}

extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]

/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]

/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]

}
3 changes: 3 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ package struct NewLambdaRuntimeError: Error {
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN

case missingLambdaRuntimeAPIEnvironmentVariable
case runtimeCanOnlyBeStartedOnce
case invalidPort
}

package init(code: Code, underlying: (any Error)? = nil) {
Expand Down
Loading