Skip to content

Commit 03b251b

Browse files
committed
fix connection closure
1 parent 22634b4 commit 03b251b

File tree

2 files changed

+39
-33
lines changed

2 files changed

+39
-33
lines changed

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift

+31-3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
114114
}
115115

116116
private func close() async {
117+
self.logger.trace("Close lambda runtime client")
118+
117119
guard case .notClosing = self.closingState else {
118120
return
119121
}
@@ -229,10 +231,24 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
229231

230232
private func channelClosed(_ channel: any Channel) {
231233
switch (self.connectionState, self.closingState) {
232-
case (.disconnected, _),
233-
(_, .closed):
234+
case (_, .closed):
234235
fatalError("Invalid state: \(self.connectionState), \(self.closingState)")
235236

237+
case (.disconnected, .notClosing):
238+
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
239+
self.closingConnections.remove(at: index)
240+
}
241+
242+
case (.disconnected, .closing(let continuation)):
243+
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
244+
self.closingConnections.remove(at: index)
245+
}
246+
247+
if self.closingConnections.isEmpty {
248+
self.closingState = .closed
249+
continuation.resume()
250+
}
251+
236252
case (.connecting(let array), .notClosing):
237253
self.connectionState = .disconnected
238254
for continuation in array {
@@ -303,8 +319,14 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
303319
let handler = try channel.pipeline.syncOperations.handler(
304320
type: LambdaChannelHandler<NewLambdaRuntimeClient>.self
305321
)
322+
self.logger.trace(
323+
"Connection to control plane created",
324+
metadata: [
325+
"lambda_port": "\(self.configuration.port)",
326+
"lambda_ip": "\(self.configuration.ip)",
327+
]
328+
)
306329
channel.closeFuture.whenComplete { result in
307-
self.eventLoop.preconditionInEventLoop()
308330
self.assumeIsolated { runtimeClient in
309331
runtimeClient.channelClosed(channel)
310332
}
@@ -754,6 +776,12 @@ extension LambdaChannelHandler: ChannelInboundHandler {
754776
}
755777

756778
func errorCaught(context: ChannelHandlerContext, error: Error) {
779+
self.logger.trace(
780+
"Channel error caught",
781+
metadata: [
782+
"error": "\(error)"
783+
]
784+
)
757785
// pending responses will fail with lastError in channelInactive since we are calling context.close
758786
self.delegate.connectionErrorHappened(error, channel: context.channel)
759787

Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift

+8-30
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import struct Foundation.UUID
2424
@Suite
2525
struct NewLambdaRuntimeClientTests {
2626

27-
let logger = Logger(label: "NewLambdaClientRuntimeTest")
28-
29-
init() {
30-
31-
}
27+
let logger = {
28+
var logger = Logger(label: "NewLambdaClientRuntimeTest")
29+
logger.logLevel = .trace
30+
return logger
31+
}()
3232

3333
@Test
3434
func testSimpleInvocations() async throws {
@@ -57,12 +57,12 @@ struct NewLambdaRuntimeClientTests {
5757
}
5858
}
5959

60-
try await self.withMockServer(behaviour: HappyBehavior()) { mockServer, eventLoopGroup in
61-
let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: 7000)
60+
try await withMockServer(behaviour: HappyBehavior()) { port in
61+
let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port)
6262

6363
try await NewLambdaRuntimeClient.withRuntimeClient(
6464
configuration: configuration,
65-
eventLoop: eventLoopGroup.next(),
65+
eventLoop: NIOSingletons.posixEventLoopGroup.next(),
6666
logger: self.logger
6767
) { runtimeClient in
6868
do {
@@ -86,26 +86,4 @@ struct NewLambdaRuntimeClientTests {
8686
}
8787
}
8888
}
89-
90-
func withMockServer<Result>(
91-
behaviour: some LambdaServerBehavior,
92-
_ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result
93-
) async throws -> Result {
94-
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
95-
let server = MockLambdaServer(behavior: behaviour)
96-
_ = try await server.start().get()
97-
98-
let result: Swift.Result<Result, any Error>
99-
do {
100-
result = .success(try await body(server, eventLoopGroup))
101-
} catch {
102-
result = .failure(error)
103-
}
104-
105-
try? await server.stop().get()
106-
try? await eventLoopGroup.shutdownGracefully()
107-
108-
return try result.get()
109-
}
110-
11189
}

0 commit comments

Comments
 (0)