Skip to content

Commit 6d47e94

Browse files
Fix crash when removing connection from the pool (#347)
Fixes #346 ### Motivation #336 changes the key of the `ClientConnection` in `ConnectionPool`, while in `ClientConnection::close`, it still passes the old key (logical address) to `ConnectionPool::remove`, which results in the connection could never be removed and destroyed until being deleted as a stale connection. What's worse, if the key does not exist, the iterator returned by `std::map::find` will still be dereferenced, which might cause crash in some platforms. See https://github.com/apache/pulsar-client-cpp/blob/8d32fd254e294d1fabba73aed70115a434b341ef/lib/ConnectionPool.cc#L122-L123 ### Modifications - Avoid dereferencing the iterator if it's invalid in `ConnectionPool::remove`. - Store the key suffix in `ClientConnection` and pass the correct key to `ConnectionPool::remove` in `ClientConnection::close` - Add `ClientTest.testConnectionClose` to verify `ClientConnection::close` can remove itself from the pool and the connection will be destroyed eventually.
1 parent 8d32fd2 commit 6d47e94

File tree

5 files changed

+55
-8
lines changed

5 files changed

+55
-8
lines changed

lib/ClientConnection.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
160160
ExecutorServicePtr executor,
161161
const ClientConfiguration& clientConfiguration,
162162
const AuthenticationPtr& authentication, const std::string& clientVersion,
163-
ConnectionPool& pool)
163+
ConnectionPool& pool, size_t poolIndex)
164164
: operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
165165
authentication_(authentication),
166166
serverProtocolVersion_(proto::ProtocolVersion_MIN),
@@ -184,7 +184,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
184184
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
185185
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
186186
clientVersion_(clientVersion),
187-
pool_(pool) {
187+
pool_(pool),
188+
poolIndex_(poolIndex) {
188189
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
189190
if (clientConfiguration.isUseTls()) {
190191
#if BOOST_VERSION >= 105400
@@ -265,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
265266
}
266267

267268
ClientConnection::~ClientConnection() {
268-
LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_);
269+
LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_);
269270
}
270271

271272
void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
@@ -1320,7 +1321,7 @@ void ClientConnection::close(Result result, bool detach) {
13201321
}
13211322
// Remove the connection from the pool before completing any promise
13221323
if (detach) {
1323-
pool_.remove(logicalAddress_, this); // trigger the destructor
1324+
pool_.remove(logicalAddress_ + "-" + std::to_string(poolIndex_), this);
13241325
}
13251326

13261327
auto self = shared_from_this();

lib/ClientConnection.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
129129
ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
130130
ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
131131
const AuthenticationPtr& authentication, const std::string& clientVersion,
132-
ConnectionPool& pool);
132+
ConnectionPool& pool, size_t poolIndex);
133133
~ClientConnection();
134134

135135
#if __cplusplus < 201703L
@@ -400,6 +400,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
400400

401401
const std::string clientVersion_;
402402
ConnectionPool& pool_;
403+
const size_t poolIndex_;
404+
403405
friend class PulsarFriend;
404406

405407
void checkServerError(ServerError error);

lib/ConnectionPool.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
9797
ClientConnectionPtr cnx;
9898
try {
9999
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
100-
clientConfiguration_, authentication_, clientVersion_, *this));
100+
clientConfiguration_, authentication_, clientVersion_, *this,
101+
keySuffix));
101102
} catch (const std::runtime_error& e) {
102103
lock.unlock();
103104
LOG_ERROR("Failed to create connection: " << e.what())
@@ -106,7 +107,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
106107
return promise.getFuture();
107108
}
108109

109-
LOG_INFO("Created connection for " << logicalAddress);
110+
LOG_INFO("Created connection for " << key);
110111

111112
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
112113
pool_.insert(std::make_pair(key, cnx));
@@ -120,7 +121,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
120121
void ConnectionPool::remove(const std::string& key, ClientConnection* value) {
121122
std::lock_guard<std::recursive_mutex> lock(mutex_);
122123
auto it = pool_.find(key);
123-
if (it->second.get() == value) {
124+
if (it != pool_.end() && it->second.get() == value) {
124125
LOG_INFO("Remove connection for " << key);
125126
pool_.erase(it);
126127
}

tests/ClientTest.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,3 +400,44 @@ TEST(ClientTest, testClientVersion) {
400400

401401
client.close();
402402
}
403+
404+
TEST(ClientTest, testConnectionClose) {
405+
std::vector<Client> clients;
406+
clients.emplace_back(lookupUrl);
407+
clients.emplace_back(lookupUrl, ClientConfiguration().setConnectionsPerBroker(5));
408+
409+
const auto topic = "client-test-connection-close";
410+
for (auto &client : clients) {
411+
auto testClose = [&client](ClientConnectionWeakPtr weakCnx) {
412+
auto cnx = weakCnx.lock();
413+
ASSERT_TRUE(cnx);
414+
415+
auto numConnections = PulsarFriend::getConnections(client).size();
416+
LOG_INFO("Connection refcnt: " << cnx.use_count() << " before close");
417+
auto executor = PulsarFriend::getExecutor(*cnx);
418+
// Simulate the close() happens in the event loop
419+
executor->postWork([cnx, &client, numConnections] {
420+
cnx->close();
421+
ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1);
422+
LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close");
423+
});
424+
cnx.reset();
425+
426+
// The ClientConnection could still be referred in a socket callback, wait until all these
427+
// callbacks being cancelled due to the socket close.
428+
ASSERT_TRUE(waitUntil(
429+
std::chrono::seconds(1), [weakCnx] { return weakCnx.expired(); }, 1));
430+
};
431+
Producer producer;
432+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
433+
testClose(PulsarFriend::getProducerImpl(producer).getCnx());
434+
producer.close();
435+
436+
Consumer consumer;
437+
ASSERT_EQ(ResultOk, client.subscribe("client-test-connection-close", "sub", consumer));
438+
testClose(PulsarFriend::getConsumerImpl(consumer).getCnx());
439+
consumer.close();
440+
441+
client.close();
442+
}
443+
}

tests/PulsarFriend.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class PulsarFriend {
140140
return connections;
141141
}
142142

143+
static ExecutorServicePtr getExecutor(const ClientConnection& cnx) { return cnx.executor_; }
144+
143145
static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
144146
std::vector<ProducerImplPtr> producers;
145147
std::lock_guard<std::mutex> lock(cnx.mutex_);

0 commit comments

Comments
 (0)