@@ -55,7 +55,8 @@ struct HttpServer {
55
55
try await server. run ( )
56
56
}
57
57
58
- /// This method starts the server and handles incoming connections.
58
+ /// This method starts the server and handles one unique incoming connections
59
+ /// The Lambda function will send two HTTP requests over this connection: one for the next invocation and one for the response.
59
60
private func run( ) async throws {
60
61
let channel = try await ServerBootstrap ( group: self . eventLoopGroup)
61
62
. serverChannelOption ( . backlog, value: 256 )
@@ -86,9 +87,14 @@ struct HttpServer {
86
87
metadata: [
87
88
" host " : " \( channel. channel. localAddress? . ipAddress? . debugDescription ?? " " ) " ,
88
89
" port " : " \( channel. channel. localAddress? . port ?? 0 ) " ,
90
+ " maxInvocations " : " \( self . maxInvocations) " ,
89
91
]
90
92
)
91
93
94
+ // This counter is used to track the number of incoming connections.
95
+ // This mock servers accepts n TCP connection then shutdowns
96
+ let connectionCounter = SharedCounter ( maxValue: self . maxInvocations)
97
+
92
98
// We are handling each incoming connection in a separate child task. It is important
93
99
// to use a discarding task group here which automatically discards finished child tasks.
94
100
// A normal task group retains all child tasks and their outputs in memory until they are
@@ -98,22 +104,31 @@ struct HttpServer {
98
104
try await withThrowingDiscardingTaskGroup { group in
99
105
try await channel. executeThenClose { inbound in
100
106
for try await connectionChannel in inbound {
101
- logger . trace ( " Handling new connection " )
102
- logger . info (
103
- " This mock server accepts only one connection, it will shutdown the server after handling the current connection. "
104
- )
107
+
108
+ let counter = connectionCounter . current ( )
109
+ logger . trace ( " Handling new connection" , metadata : [ " connectionNumber " : " \( counter ) " ] )
110
+
105
111
group. addTask {
106
112
await self . handleConnection ( channel: connectionChannel)
107
- logger. trace ( " Done handling connection " )
113
+ logger. trace ( " Done handling connection " , metadata: [ " connectionNumber " : " \( counter) " ] )
114
+ }
115
+
116
+ if connectionCounter. increment ( ) {
117
+ logger. info (
118
+ " Maximum number of connections reached, shutting down after current connection " ,
119
+ metadata: [ " maxConnections " : " \( self . maxInvocations) " ]
120
+ )
121
+ break // this causes the server to shutdown after handling the connection
108
122
}
109
- break
110
123
}
111
124
}
112
125
}
113
126
logger. info ( " Server shutting down " )
114
127
}
115
128
116
- /// This method handles a single connection by echoing back all inbound data.
129
+ /// This method handles a single connection by responsing hard coded value to a Lambda function request.
130
+ /// It handles two requests: one for the next invocation and one for the response.
131
+ /// when the maximum number of requests is reached, it closes the connection.
117
132
private func handleConnection(
118
133
channel: NIOAsyncChannel < HTTPServerRequestPart , HTTPServerResponsePart >
119
134
) async {
@@ -122,7 +137,7 @@ struct HttpServer {
122
137
var requestBody : ByteBuffer ?
123
138
124
139
// each Lambda invocation results in TWO HTTP requests (next and response)
125
- let requestCount = RequestCounter ( maxRequest : self . maxInvocations * 2 )
140
+ let requestCount = SharedCounter ( maxValue : 2 )
126
141
127
142
// Note that this method is non-throwing and we are catching any error.
128
143
// We do this since we don't want to tear down the whole server when a single connection
@@ -161,10 +176,10 @@ struct HttpServer {
161
176
162
177
if requestCount. increment ( ) {
163
178
logger. info (
164
- " Maximum number of invocations reached, closing this connection " ,
165
- metadata: [ " maxInvocations " : " \( self . maxInvocations ) " ]
179
+ " Maximum number of requests reached, closing this connection " ,
180
+ metadata: [ " maxRequest " : " 2 " ]
166
181
)
167
- break
182
+ break // this finishes handiling request on this connection
168
183
}
169
184
}
170
185
}
@@ -224,12 +239,13 @@ struct HttpServer {
224
239
) async throws {
225
240
var headers = HTTPHeaders ( responseHeaders)
226
241
headers. add ( name: " Content-Length " , value: " \( responseBody. utf8. count) " )
242
+ headers. add ( name: " KeepAlive " , value: " timeout=1, max=2 " )
227
243
228
244
logger. trace ( " Writing response head " )
229
245
try await outbound. write (
230
246
HTTPServerResponsePart . head (
231
247
HTTPResponseHead (
232
- version: . init( major: 1 , minor: 1 ) ,
248
+ version: . init( major: 1 , minor: 1 ) , // use HTTP 1.1 it keeps connection alive between requests
233
249
status: responseStatus,
234
250
headers: headers
235
251
)
@@ -267,20 +283,20 @@ struct HttpServer {
267
283
static let invokedFunctionARN = " Lambda-Runtime-Invoked-Function-Arn "
268
284
}
269
285
270
- private final class RequestCounter : Sendable {
286
+ private final class SharedCounter : Sendable {
271
287
private let counterMutex = Mutex < Int > ( 0 )
272
- private let maxRequest : Int
288
+ private let maxValue : Int
273
289
274
- init ( maxRequest : Int ) {
275
- self . maxRequest = maxRequest
290
+ init ( maxValue : Int ) {
291
+ self . maxValue = maxValue
276
292
}
277
293
func current( ) -> Int {
278
294
counterMutex. withLock { $0 }
279
295
}
280
296
func increment( ) -> Bool {
281
297
counterMutex. withLock {
282
298
$0 += 1
283
- return $0 >= maxRequest
299
+ return $0 >= maxValue
284
300
}
285
301
}
286
302
}
0 commit comments