@@ -233,6 +233,7 @@ private struct LambdaHttpServer {
233
233
logger. trace ( " /invoke received invocation " , metadata: [ " requestId " : " \( requestId) " ] )
234
234
await self . invocationPool. push ( LocalServerInvocation ( requestId: requestId, request: body) )
235
235
236
+ // wait for the lambda function to process the request
236
237
for try await response in self . responsePool {
237
238
logger. trace (
238
239
" Received response to return to client " ,
@@ -245,6 +246,7 @@ private struct LambdaHttpServer {
245
246
" Received response for a different request id " ,
246
247
metadata: [ " response requestId " : " \( response. requestId ?? " " ) " , " requestId " : " \( requestId) " ]
247
248
)
249
+ // should we return an error here ? Or crash as this is probably a programming error?
248
250
}
249
251
}
250
252
// What todo when there is no more responses to process?
@@ -263,19 +265,18 @@ private struct LambdaHttpServer {
263
265
// this call only returns when there is a task to give to the lambda function
264
266
case ( . GET, let url) where url. hasSuffix ( Consts . getNextInvocationURLSuffix) :
265
267
266
- // pop the tasks from the queue, until there is no more to process
268
+ // pop the tasks from the queue
267
269
self . logger. trace ( " /next waiting for /invoke " )
268
270
for try await invocation in self . invocationPool {
269
271
self . logger. trace ( " /next retrieved invocation " , metadata: [ " requestId " : " \( invocation. requestId) " ] )
270
- // this stores the invocation request id into the response
272
+ // this call also stores the invocation requestId into the response
271
273
return invocation. makeResponse ( status: . accepted)
272
274
}
273
275
// What todo when there is no more tasks to process?
274
276
// This should not happen as the async iterator blocks until there is a task to process
275
277
fatalError ( " No more invocations to process - the async for loop should not return " )
276
278
277
279
// :requestID/response endpoint is called by the lambda posting the response
278
- // we accept all requestID and we do not handle the body
279
280
case ( . POST, let url) where url. hasSuffix ( Consts . postResponseURLSuffix) :
280
281
let parts = head. uri. split ( separator: " / " )
281
282
guard let requestId = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
@@ -297,7 +298,7 @@ private struct LambdaHttpServer {
297
298
return . init( id: requestId, status: . accepted)
298
299
299
300
// :requestID/error endpoint is called by the lambda posting an error response
300
- // we accept all requestID and we do not handle the body
301
+ // we accept all requestID and we do not handle the body, we just acknowledge the request
301
302
case ( . POST, let url) where url. hasSuffix ( Consts . postErrorURLSuffix) :
302
303
let parts = head. uri. split ( separator: " / " )
303
304
guard let _ = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
@@ -323,7 +324,7 @@ private struct LambdaHttpServer {
323
324
try await outbound. write (
324
325
HTTPServerResponsePart . head (
325
326
HTTPResponseHead (
326
- version: . init( major: 1 , minor: 1 ) , // use HTTP 1.1 it keeps connection alive between requests
327
+ version: . init( major: 1 , minor: 1 ) ,
327
328
status: response. status,
328
329
headers: headers
329
330
)
@@ -336,7 +337,7 @@ private struct LambdaHttpServer {
336
337
try await outbound. write ( HTTPServerResponsePart . end ( nil ) )
337
338
}
338
339
339
- /// A shared data structure to store the current invocation or response request and the continuation.
340
+ /// A shared data structure to store the current invocation or response requests and the continuation objects .
340
341
/// This data structure is shared between instances of the HTTPHandler
341
342
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
342
343
private final class Pool < T> : AsyncSequence , AsyncIteratorProtocol , Sendable where T: Sendable {
@@ -345,13 +346,15 @@ private struct LambdaHttpServer {
345
346
private let _buffer = Mutex < CircularBuffer < T > > ( . init( ) )
346
347
private let _continuation = Mutex < CheckedContinuation < T , any Error > ? > ( nil )
347
348
349
+ /// retrieve the first element from the buffer
348
350
public func popFirst( ) async -> T ? {
349
351
self . _buffer. withLock { $0. popFirst ( ) }
350
352
}
351
353
352
- // if the iterator is waiting for an element, give it to it
353
- // otherwise, enqueue the element
354
+ /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
354
355
public func push( _ invocation: T ) async {
356
+ // if the iterator is waiting for an element, give it to it
357
+ // otherwise, enqueue the element
355
358
if let continuation = self . _continuation. withLock ( { $0 } ) {
356
359
self . _continuation. withLock { $0 = nil }
357
360
continuation. resume ( returning: invocation)
0 commit comments