-
Notifications
You must be signed in to change notification settings - Fork 113
Fix data race in LocalServer's invocation pool #479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -350,23 +350,25 @@ private struct LambdaHttpServer { | |
private final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { | ||
typealias Element = T | ||
|
||
private let _buffer = Mutex<CircularBuffer<T>>(.init()) | ||
private let _continuation = Mutex<CheckedContinuation<T, any Error>?>(nil) | ||
private let mutex = Mutex<(CircularBuffer<T>, CheckedContinuation<T, any Error>?)>((.init(), nil)) | ||
|
||
/// retrieve the first element from the buffer | ||
public func popFirst() async -> T? { | ||
self._buffer.withLock { $0.popFirst() } | ||
public func popFirst() -> T? { | ||
self.mutex.withLock { $0.0.popFirst() } | ||
} | ||
|
||
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element | ||
public func push(_ invocation: T) async { | ||
// if the iterator is waiting for an element, give it to it | ||
// otherwise, enqueue the element | ||
if let continuation = self._continuation.withLock({ $0 }) { | ||
self._continuation.withLock { $0 = nil } | ||
continuation.resume(returning: invocation) | ||
} else { | ||
self._buffer.withLock { $0.append(invocation) } | ||
self.mutex.withLock { mutexContent in | ||
var (_buffer, _continuation) = mutexContent | ||
// if the iterator is waiting for an element, give it to it | ||
// otherwise, enqueue the element | ||
if let continuation = _continuation { | ||
continuation.resume(returning: invocation) | ||
_continuation = nil | ||
} else { | ||
_buffer.append(invocation) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -377,15 +379,17 @@ private struct LambdaHttpServer { | |
return nil | ||
} | ||
|
||
if let element = await self.popFirst() { | ||
if let element = self.popFirst() { | ||
// if there is an element in the buffer, dequeue it | ||
return element | ||
} else { | ||
// we can't return nil if there is nothing to dequeue otherwise the async for loop will stop | ||
// wait for an element to be enqueued | ||
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in | ||
// store the continuation for later, when an element is enqueued | ||
self._continuation.withLock { | ||
$0 = continuation | ||
// so, wait for an element to be enqueued | ||
return try await withCheckedThrowingContinuation { | ||
(continuation: CheckedContinuation<T, any Error>) in | ||
self.mutex.withLock { mutexContent in | ||
// store the continuation for later, when an element is enqueued | ||
mutexContent.1 = continuation | ||
Comment on lines
+382
to
+392
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still racy: between "popFirst" returning nil and storing the continuation, somebody could have already put sth in the buffer. Really, to implement this correctly, I think you need to acquire the lock, check your invariants, apply the necessary state change, release the lock and then run side-effects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The invariant here is: There can only be a stored continuation if the buffer is empty. If the buffer is non-empty, there cannot be a stored continuation. |
||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it is a good idea to resume a continuation while holding a lock... usually I would try to keep the
withLock
body free of side-effects.