Skip to content

Commit 77e2d63

Browse files
Fix possible deadlock of Future when adding a listener after completed (#334)
* Fix possible deadlock of Future when adding a listener after completed ### Motivation There is a case that deadlock could happen for a `Future`. Assume there is a `Promise` and its `Future`. 1. Call `Future::addListener` to add a listener that tries to acquire a user-provided mutex (`lock`). 2. Thread 1: Acquire `lock` first. 3. Thread 2: Call `Promise::setValue`, the listener will be triggered first before completed. Since `lock` is held by Thread 1, the listener will be blocked. 4. Thread 1: Call `Future::addListener`, since it detects the `InternalState::completed_` is true, it will call `get` to retrieve the result and value. Then, deadlock happens: - Thread 1 waits for `lock` is released, and then complete `InternalState::future_`. - Thread 2 holds `lock` but wait for `InternalState::future_` is completed. In a real world case, if we acquire a lock before `ProducerImpl::closeAsync`, then another thread call `setValue` in `ClientConnection::handleSuccess` and the callback of `createProducerAsync` tries to acquire the lock, `handleSuccess` will be blocked. Then in `closeAsync`, the current thread will be blocked in: ```c++ cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId) .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); ``` The stacks: ``` Thread 1: #11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61 #13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552 Thread 2: #8 get (result=..., this=0x3d53e7a10) at lib/Future.h:69 #9 pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51 #11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794 ``` There are two points that make the deadlock: 1. We use `completed_` to represent if the future is completed. However, after it's true, the future might not be completed because the value is not set and the listeners are not completed. 2. If `addListener` is called after it's completed, we still push the listener to `listeners_` so that previous listeners could be executed before the new listener. This guarantee is unnecessarily strong. ### Modifications First, complete the future before calling the listeners. Then, use an enum to represent the status: - INITIAL: `complete` has not been called - COMPLETING: when the 1st time `complete` is called, the status will change from INITIAL to COMPLETING - COMPLETED: the future is completed. Besides, implementation of `Future` is simplified. #299 fixes a possible mutex crash by introducing the `std::future`. However, the root cause is the conditional variable is not used correctly: > Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread. See https://en.cppreference.com/w/cpp/thread/condition_variable The simplest way to fix #298 is just adding `lock.lock()` before `state->condition.notify_all();`. * Acquire lock again * Add initial value
1 parent a01c16e commit 77e2d63

File tree

5 files changed

+84
-70
lines changed

5 files changed

+84
-70
lines changed

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include <boost/optional.hpp>
2525
#include <functional>
26+
#include <list>
2627
#include <memory>
2728
#include <utility>
2829

lib/Future.h

Lines changed: 45 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
#define LIB_FUTURE_H_
2121

2222
#include <atomic>
23-
#include <chrono>
23+
#include <condition_variable>
24+
#include <forward_list>
2425
#include <functional>
25-
#include <future>
26-
#include <list>
2726
#include <memory>
2827
#include <mutex>
29-
#include <thread>
30-
#include <utility>
3128

3229
namespace pulsar {
3330

@@ -38,71 +35,70 @@ class InternalState {
3835
using Pair = std::pair<Result, Type>;
3936
using Lock = std::unique_lock<std::mutex>;
4037

38+
enum Status : uint8_t
39+
{
40+
INITIAL,
41+
COMPLETING,
42+
COMPLETED
43+
};
44+
4145
// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
4246
InternalState() {}
4347

4448
void addListener(Listener listener) {
4549
Lock lock{mutex_};
46-
listeners_.emplace_back(listener);
47-
lock.unlock();
48-
4950
if (completed()) {
50-
Type value;
51-
Result result = get(value);
52-
triggerListeners(result, value);
51+
auto result = result_;
52+
auto value = value_;
53+
lock.unlock();
54+
listener(result, value);
55+
} else {
56+
tailListener_ = listeners_.emplace_after(tailListener_, std::move(listener));
5357
}
5458
}
5559

5660
bool complete(Result result, const Type &value) {
57-
bool expected = false;
58-
if (!completed_.compare_exchange_strong(expected, true)) {
61+
Status expected = Status::INITIAL;
62+
if (!status_.compare_exchange_strong(expected, Status::COMPLETING)) {
5963
return false;
6064
}
61-
triggerListeners(result, value);
62-
promise_.set_value(std::make_pair(result, value));
63-
return true;
64-
}
65-
66-
bool completed() const noexcept { return completed_; }
6765

68-
Result get(Type &result) {
69-
const auto &pair = future_.get();
70-
result = pair.second;
71-
return pair.first;
72-
}
66+
// Ensure if another thread calls `addListener` at the same time, that thread can get the value by
67+
// `get` before the existing listeners are executed
68+
Lock lock{mutex_};
69+
result_ = result;
70+
value_ = value;
71+
status_ = COMPLETED;
72+
cond_.notify_all();
7373

74-
// Only public for test
75-
void triggerListeners(Result result, const Type &value) {
76-
while (true) {
77-
Lock lock{mutex_};
78-
if (listeners_.empty()) {
79-
return;
74+
if (!listeners_.empty()) {
75+
auto listeners = std::move(listeners_);
76+
lock.unlock();
77+
for (auto &&listener : listeners) {
78+
listener(result, value);
8079
}
80+
}
8181

82-
bool expected = false;
83-
if (!listenerRunning_.compare_exchange_strong(expected, true)) {
84-
// There is another thread that polled a listener that is running, skip polling and release
85-
// the lock. Here we wait for some time to avoid busy waiting.
86-
std::this_thread::sleep_for(std::chrono::milliseconds(1));
87-
continue;
88-
}
89-
auto listener = std::move(listeners_.front());
90-
listeners_.pop_front();
91-
lock.unlock();
82+
return true;
83+
}
9284

93-
listener(result, value);
94-
listenerRunning_ = false;
95-
}
85+
bool completed() const noexcept { return status_.load() == COMPLETED; }
86+
87+
Result get(Type &value) const {
88+
Lock lock{mutex_};
89+
cond_.wait(lock, [this] { return completed(); });
90+
value = value_;
91+
return result_;
9692
}
9793

9894
private:
99-
std::atomic_bool completed_{false};
100-
std::promise<Pair> promise_;
101-
std::shared_future<Pair> future_{promise_.get_future()};
102-
103-
std::list<Listener> listeners_;
10495
mutable std::mutex mutex_;
105-
std::atomic_bool listenerRunning_{false};
96+
mutable std::condition_variable cond_;
97+
std::forward_list<Listener> listeners_;
98+
decltype(listeners_.before_begin()) tailListener_{listeners_.before_begin()};
99+
Result result_;
100+
Type value_;
101+
std::atomic<Status> status_{INITIAL};
106102
};
107103

108104
template <typename Result, typename Type>

lib/ProducerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define LIB_PRODUCERIMPL_H_
2121

2222
#include <boost/optional.hpp>
23+
#include <list>
2324
#include <memory>
2425

2526
#include "Future.h"

tests/PromiseTest.cc

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
#include <gtest/gtest.h>
2020

2121
#include <chrono>
22+
#include <memory>
23+
#include <mutex>
2224
#include <string>
2325
#include <thread>
2426
#include <vector>
2527

28+
#include "WaitUtils.h"
2629
#include "lib/Future.h"
2730
#include "lib/LogUtils.h"
2831

@@ -88,26 +91,38 @@ TEST(PromiseTest, testListeners) {
8891
ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
8992
}
9093

91-
TEST(PromiseTest, testTriggerListeners) {
92-
InternalState<int, int> state;
93-
state.addListener([](int, const int&) {
94-
LOG_INFO("Start task 1...");
95-
std::this_thread::sleep_for(std::chrono::seconds(1));
96-
LOG_INFO("Finish task 1...");
94+
TEST(PromiseTest, testListenerDeadlock) {
95+
Promise<int, int> promise;
96+
auto future = promise.getFuture();
97+
auto mutex = std::make_shared<std::mutex>();
98+
auto done = std::make_shared<std::atomic_bool>(false);
99+
100+
future.addListener([mutex, done](int, int) {
101+
LOG_INFO("Listener-1 before acquiring the lock");
102+
std::lock_guard<std::mutex> lock{*mutex};
103+
LOG_INFO("Listener-1 after acquiring the lock");
104+
done->store(true);
97105
});
98-
state.addListener([](int, const int&) {
99-
LOG_INFO("Start task 2...");
106+
107+
std::thread t1{[mutex, &future] {
108+
std::lock_guard<std::mutex> lock{*mutex};
109+
// Make it a great chance that `t2` executes `promise.setValue` first
110+
std::this_thread::sleep_for(std::chrono::seconds(2));
111+
112+
// Since the future is completed, `Future::get` will be called in `addListener` to get the result
113+
LOG_INFO("Before adding Listener-2 (acquired the mutex)")
114+
future.addListener([](int, int) { LOG_INFO("Listener-2 is triggered"); });
115+
LOG_INFO("After adding Listener-2 (releasing the mutex)");
116+
}};
117+
t1.detach();
118+
std::thread t2{[mutex, promise] {
119+
// Make there a great chance that `t1` acquires `mutex` first
100120
std::this_thread::sleep_for(std::chrono::seconds(1));
101-
LOG_INFO("Finish task 2...");
102-
});
121+
LOG_INFO("Before setting value");
122+
promise.setValue(0); // the 1st listener is called, which is blocked at acquiring `mutex`
123+
LOG_INFO("After setting value");
124+
}};
125+
t2.detach();
103126

104-
auto start = std::chrono::high_resolution_clock::now();
105-
auto future1 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
106-
auto future2 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
107-
future1.wait();
108-
future2.wait();
109-
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
110-
std::chrono::high_resolution_clock::now() - start)
111-
.count();
112-
ASSERT_TRUE(elapsed > 2000) << "elapsed: " << elapsed << "ms";
127+
ASSERT_TRUE(waitUntil(std::chrono::seconds(5000), [done] { return done->load(); }));
113128
}

tests/WaitUtils.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,21 @@
2525
namespace pulsar {
2626

2727
template <typename Rep, typename Period>
28-
inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
28+
inline bool waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
2929
long durationMs = 10) {
3030
auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
3131
while (timeoutMs > 0) {
3232
auto now = std::chrono::high_resolution_clock::now();
3333
if (condition()) {
34-
break;
34+
return true;
3535
}
3636
std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
3737
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
3838
std::chrono::high_resolution_clock::now() - now)
3939
.count();
4040
timeoutMs -= elapsed;
4141
}
42+
return false;
4243
}
4344

4445
} // namespace pulsar

0 commit comments

Comments
 (0)