1616import DequeModule
1717import Dispatch
1818import Logging
19- import NIOConcurrencyHelpers
2019import NIOCore
2120import NIOHTTP1
2221import NIOPosix
@@ -183,21 +182,29 @@ private struct LambdaHTTPServer {
183182 }
184183 }
185184
186- let task1 = await group. next ( ) !
185+ // Now that the local HTTP server and LambdaHandler tasks are started, wait for the
186+ // first of the two that will terminate.
187+ // When the first task terminates, cancel the group and collect the result of the
188+ // second task.
189+
190+ // collect and return the result of the LambdaHandler
191+ let serverOrHandlerResult1 = await group. next ( ) !
187192 group. cancelAll ( )
188- let task2 = await group. next ( ) !
189193
190- switch task1 {
194+ switch serverOrHandlerResult1 {
191195 case . closureResult( let result) :
192196 return result
193197
194- case . serverReturned:
195- switch task2 {
198+ case . serverReturned( let result) :
199+ logger. error ( " Server shutdown before closure completed " , metadata: [
200+ " error " : " \( result. maybeError != nil ? " \( result. maybeError!) " : " none " ) "
201+ ] )
202+ switch await group. next ( ) ! {
196203 case . closureResult( let result) :
197204 return result
198205
199206 case . serverReturned:
200- fatalError ( )
207+ fatalError ( " Only one task is a server, and only one can return `serverReturned` " )
201208 }
202209 }
203210 }
@@ -404,34 +411,26 @@ private struct LambdaHTTPServer {
404411 private final class Pool < T> : AsyncSequence , AsyncIteratorProtocol , Sendable where T: Sendable {
405412 typealias Element = T
406413
407- struct State {
408- enum State {
409- case buffer( Deque < T > )
410- case continuation( CheckedContinuation < T , any Error > ? )
411- }
412-
413- var state : State
414-
415- init ( ) {
416- self . state = . buffer( [ ] )
417- }
414+ enum State : ~ Copyable {
415+ case buffer( Deque < T > )
416+ case continuation( CheckedContinuation < T , any Error > ? )
418417 }
419418
420- private let lock = Mutex < State > ( . init ( ) )
419+ private let lock = Mutex < State > ( . buffer ( [ ] ) )
421420
422421 /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
423422 public func push( _ invocation: T ) async {
424423 // if the iterator is waiting for an element, give it to it
425424 // otherwise, enqueue the element
426425 let maybeContinuation = self . lock. withLock { state -> CheckedContinuation < T , any Error > ? in
427- switch state . state {
426+ switch consume state {
428427 case . continuation( let continuation) :
429- state. state = . buffer( [ ] )
428+ state = . buffer( [ ] )
430429 return continuation
431430
432431 case . buffer( var buffer) :
433432 buffer. append ( invocation)
434- state. state = . buffer( buffer)
433+ state = . buffer( buffer)
435434 return nil
436435 }
437436 }
@@ -447,18 +446,18 @@ private struct LambdaHTTPServer {
447446
448447 return try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < T , any Error > ) in
449448 let nextAction = self . lock. withLock { state -> T ? in
450- switch state . state {
449+ switch consume state {
451450 case . buffer( var buffer) :
452451 if let first = buffer. popFirst ( ) {
453- state. state = . buffer( buffer)
452+ state = . buffer( buffer)
454453 return first
455454 } else {
456- state. state = . continuation( continuation)
455+ state = . continuation( continuation)
457456 return nil
458457 }
459458
460459 case . continuation:
461- fatalError ( " Concurrent invocations to next(). This is illigal . " )
460+ fatalError ( " Concurrent invocations to next(). This is illegal . " )
462461 }
463462 }
464463
@@ -509,3 +508,14 @@ private struct LambdaHTTPServer {
509508 }
510509}
511510#endif
511+
512+ extension Result {
513+ var maybeError : Failure ? {
514+ switch self {
515+ case . success:
516+ return nil
517+ case . failure( let error) :
518+ return error
519+ }
520+ }
521+ }
0 commit comments