Skip to content

Fixes for Local Lambda Server #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 27, 2025
Merged

Fixes for Local Lambda Server #486

merged 5 commits into from
Feb 27, 2025

Conversation

fabianfett
Copy link
Member

No description provided.

@fabianfett fabianfett requested a review from sebsto February 27, 2025 10:49
@sebsto sebsto added the semver/none No version bump required. label Feb 27, 2025
Copy link
Contributor

@sebsto sebsto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for having shared this update.
I left a few questions and suggestions to help make the code easier to read and understand

}

case .continuation:
fatalError("Concurrent invocations to next(). This is illigal.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "illegal"

@@ -350,44 +404,67 @@ private struct LambdaHttpServer {
private final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
typealias Element = T

private let _buffer = Mutex<CircularBuffer<T>>(.init())
private let _continuation = Mutex<CheckedContinuation<T, any Error>?>(nil)
struct State {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a struct here ? Can't we simplify and just define the enum ?

Mutex's T must be Copyable, therefore passing an enum is accepted.

        enum Test {
            case A
            case B
        }

        private let m = Mutex<Test>(.A)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced it down to an enum but marked the enum as ~Copyable to ensure we don't allocate when we hold the lock.

return result

case .serverReturned:
fatalError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a message to the fatalError : "Only one task is a server, and only one can return serverReturned"

Also what about renaming task1 and task2 with serverOrHandlerResult1 and serverOrHandlerResult2

@@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

#if DEBUG
import DequeModule
import Dispatch
import Logging
import NIOConcurrencyHelpers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this import now that we don't use CircularBuffer anymore ?

private let group: EventLoopGroup
private let host: String
private let port: Int
private struct LambdaHTTPServer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the name of this struct ? This is not the HTTP Server anymore (all the NIO bootstrap and channel creation is done in the withLocalServer static function)
This struct now just provides the handleConnection() function

But I can't think about a descriptive name :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep it for now. We can always change it later. It's internal.

return .serverReturned(.failure(error))
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 2 lines of comments to explain the logic below wouldn't hurt readability :-)

// Now that the local HTTP server and LambdaHandler tasks are started, wait for the first of the two that will terminate.
// When the first task terminates, cancel the group and collect the result of the second task

if let element = await self.popFirst() {
return element
} else {
// we can't return nil if there is nothing to dequeue otherwise the async for loop will stop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep those two comments. They explain why this is not a regular async iterator. It blocks when the buffer is empty

group.cancelAll()
let task2 = await group.next()!

switch task1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// collect and return the result of the LambdaHandler 

case .closureResult(let result):
return result

case .serverReturned:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log.error() the fact that the server terminated before the Handler? It might be an error in the server implementation or an otherwise important information to show to the user

@sebsto
Copy link
Contributor

sebsto commented Feb 27, 2025

@fabianfett Over the next week-end, I will add a unit test suite for the pool implementtaion.
Do you want me to push the unit test to this PR or submit a separate PR for it (I think a separate PR is good)

@sebsto
Copy link
Contributor

sebsto commented Feb 27, 2025

@fabianfett there is also a minor format check on LambdaRuntime.swift

--- a/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift
+++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift
@@ -85,7 +85,8 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
             #if DEBUG
             // we're not running on Lambda and we're compiled in DEBUG mode,
             // let's start a local server for testing
-            try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) {
+            try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"))
+            {
 
                 try await LambdaRuntimeClient.withRuntimeClient(
                     configuration: .init(ip: "127.0.0.1", port: 7000),

@fabianfett fabianfett added 🔨 semver/patch No public API change. and removed semver/none No version bump required. labels Feb 27, 2025
@fabianfett fabianfett merged commit 5de00c9 into main Feb 27, 2025
31 checks passed
@fabianfett fabianfett deleted the ff-local-lambda-fixes branch February 27, 2025 15:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🔨 semver/patch No public API change.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants