Skip to content

Commit 916b37d

Browse files
committed
// fix bug where awaitTermination won't return even if all worker quit.
// https://en.cppreference.com/w/cpp/thread/condition_variable // follow what the STD told us to // "Even if the shared variable is atomic, it must be modified while owning the mutex to // correctly publish the modification to the waiting thread." // DEEP EXPLAINATION: // https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
1 parent 144e1fc commit 916b37d

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

src/utils/MessageQueue.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ class LoopQueueGuard {
3333

3434
public:
3535
explicit LoopQueueGuard(MessageQueue* queue) : queue_(queue) {
36-
queue_->workerCount_++;
3736
getRunningQueue()[queue]++;
37+
38+
{
39+
std::unique_lock<std::mutex> lk(queue_->queueMutex_);
40+
++queue_->workerCount_;
41+
}
3842
}
3943

4044
SCRIPTX_DISALLOW_COPY_AND_MOVE(LoopQueueGuard);
@@ -45,7 +49,17 @@ class LoopQueueGuard {
4549
q.erase(queue_);
4650
}
4751

48-
queue_->workerCount_--;
52+
// bugfix: awaitTermination won't return even if all worker quit.
53+
// https://en.cppreference.com/w/cpp/thread/condition_variable
54+
// follow what the STD told us to
55+
// "Even if the shared variable is atomic, it must be modified while owning the mutex to
56+
// correctly publish the modification to the waiting thread."
57+
// DEEP EXPLAINATION:
58+
// https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
59+
{
60+
std::unique_lock<std::mutex> lk(queue_->queueMutex_);
61+
--queue_->workerCount_;
62+
}
4963
queue_->workerQuitCondition_.notify_all();
5064
}
5165

@@ -180,7 +194,7 @@ void MessageQueue::awaitTermination() {
180194
workerQuitCondition_.wait(lk, [this] { return workerCount_ == 0; });
181195
}
182196

183-
bool MessageQueue::isShutdown() {
197+
bool MessageQueue::isShutdown() const {
184198
std::unique_lock<std::mutex> lk(queueMutex_);
185199
return shutdown_ != ShutdownType::kNone;
186200
}
@@ -386,7 +400,7 @@ MessageQueue::LoopReturnType MessageQueue::loopQueue(MessageQueue::LoopType loop
386400
if (loopType == LoopType::kLoopOnce) {
387401
onceMessageCount = dueMessageCount();
388402
}
389-
MessageQueue::LoopReturnType returnType = LoopReturnType::kRunOnce;
403+
LoopReturnType returnType = LoopReturnType::kRunOnce;
390404

391405
while (true) {
392406
Message* message = awaitDueMessage(loopType, onceMessageCount, returnType);

src/utils/MessageQueue.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class MessageQueue {
226226
std::condition_variable queueNotFullCondition_;
227227
std::deque<Message*> queue_;
228228
std::atomic_int32_t messageIdCounter_;
229-
std::atomic_uint32_t workerCount_;
229+
std::uint32_t workerCount_; // guard by queueMutex_
230230
std::condition_variable workerQuitCondition_;
231231

232232
std::shared_ptr<Supervisor> supervisor_;
@@ -321,7 +321,7 @@ class MessageQueue {
321321
/**
322322
* @return if the MessageQueue is shut down or shutting down.
323323
*/
324-
bool isShutdown();
324+
bool isShutdown() const;
325325

326326
/**
327327
* causing current loopQueue() call return immediately.

0 commit comments

Comments
 (0)