diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index 64a9acb7..b0a7e162 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -350,23 +350,25 @@ private struct LambdaHttpServer { private final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { typealias Element = T - private let _buffer = Mutex>(.init()) - private let _continuation = Mutex?>(nil) + private let mutex = Mutex<(CircularBuffer, CheckedContinuation?)>((.init(), nil)) /// retrieve the first element from the buffer - public func popFirst() async -> T? { - self._buffer.withLock { $0.popFirst() } + public func popFirst() -> T? { + self.mutex.withLock { $0.0.popFirst() } } /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ invocation: T) async { - // if the iterator is waiting for an element, give it to it - // otherwise, enqueue the element - if let continuation = self._continuation.withLock({ $0 }) { - self._continuation.withLock { $0 = nil } - continuation.resume(returning: invocation) - } else { - self._buffer.withLock { $0.append(invocation) } + self.mutex.withLock { mutexContent in + var (_buffer, _continuation) = mutexContent + // if the iterator is waiting for an element, give it to it + // otherwise, enqueue the element + if let continuation = _continuation { + continuation.resume(returning: invocation) + _continuation = nil + } else { + _buffer.append(invocation) + } } } @@ -377,15 +379,17 @@ private struct LambdaHttpServer { return nil } - if let element = await self.popFirst() { + if let element = self.popFirst() { + // if there is an element in the buffer, dequeue it return element } else { // we can't return nil if there is nothing to dequeue otherwise the async for loop will stop - // wait for an element to be enqueued - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - // store the continuation for later, when an element is enqueued - self._continuation.withLock { - $0 = continuation + // so, wait for an element to be enqueued + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + self.mutex.withLock { mutexContent in + // store the continuation for later, when an element is enqueued + mutexContent.1 = continuation } } }