From 7b6fe175684a783dd6a434de5f887071f4419d70 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sun, 16 Feb 2025 13:33:36 +0100 Subject: [PATCH] fix possible data race --- .../Lambda+LocalServer.swift | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift index fa907def..b88ab5fd 100644 --- a/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntimeCore/Lambda+LocalServer.swift @@ -349,23 +349,25 @@ private struct LambdaHttpServer { private final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { typealias Element = T - private let _buffer = Mutex>(.init()) - private let _continuation = Mutex?>(nil) + private let mutex = Mutex<(CircularBuffer, CheckedContinuation?)>((.init(), nil)) /// retrieve the first element from the buffer - public func popFirst() async -> T? { - self._buffer.withLock { $0.popFirst() } + public func popFirst() -> T? { + self.mutex.withLock { $0.0.popFirst() } } /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ invocation: T) async { - // if the iterator is waiting for an element, give it to it - // otherwise, enqueue the element - if let continuation = self._continuation.withLock({ $0 }) { - self._continuation.withLock { $0 = nil } - continuation.resume(returning: invocation) - } else { - self._buffer.withLock { $0.append(invocation) } + self.mutex.withLock { mutexContent in + var (_buffer, _continuation) = mutexContent + // if the iterator is waiting for an element, give it to it + // otherwise, enqueue the element + if let continuation = _continuation { + continuation.resume(returning: invocation) + _continuation = nil + } else { + _buffer.append(invocation) + } } } @@ -376,15 +378,17 @@ private struct LambdaHttpServer { return nil } - if let element = await self.popFirst() { + if let element = self.popFirst() { + // if there is an element in the buffer, dequeue it return element } else { // we can't return nil if there is nothing to dequeue otherwise the async for loop will stop - // wait for an element to be enqueued - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - // store the continuation for later, when an element is enqueued - self._continuation.withLock { - $0 = continuation + // so, wait for an element to be enqueued + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + self.mutex.withLock { mutexContent in + // store the continuation for later, when an element is enqueued + mutexContent.1 = continuation } } }