Skip to content

Commit 55b4bc9

Browse files
[fix] Fix memory leak caused by incorrect close and destruction (#54)
Fixes #55 ### Motivation 1. When a producer or consumer is closed, the reference is still stored in `ClientImpl`. If a client kept creating producers or consumers, the memory usage would not reduce. 2. When the `HandlerBase::connection_` field is modified, the `removeProducer` or `removeConsumer` method is not called. Then these producers and consumers will be cached in the connection until the connection is closed. 3. The `PartitionedProducerImpl` and `MultiTopicsConsumerImpl` have cyclic references, when a `Producer` or `Consumer` instance goes out of the scope, the destructors are not called. When I used GDB to debug them, I found the reference counts were both greater than 1. ### Modifications Let's use "handlers" to represent "producers and consumers". 1. In `ClientImpl`, use `SynchronizedHashMap` to store references of handlers, as well as the `cleanupXXX` methods to remove a handler. 2. Add `HandlerBase::beforeConnectionChange` method, which is called before `connection_` is modified. Disallow the access to `connection_` from derived classes. 3. Avoid `shared_from_this()` being passed into callbacks in ASIO executors for `PartitionedProducerImpl` and `MultiTopicsConsumerImpl`. This PR also unifies the `shutdown` implementations for handlers and call `shutdown` in the destructors. 1. Cancel the timers 2. Unregister itself from `ClientImpl` and `ClientConnection` 3. Set the create future with `ResultAlreadyClosed` 4. Set the state to `Closed` It's called when: - the destructor is called - `closeAsync` is completed - `unsubscribeAsync` is completed with ResultOk ### Verifications `ShutdownTest` is added to verify the following cases: - a single topic - a partitioned topic (multiple topics) - a partitioned topic with regex subscription `testClose` verifies `shutdown` when `closeAsync` and `unsubscribeAsync` are called. `testDestructor` verifies `shutdown` when handlers go out of the scope and the destructors are called.
1 parent 7f7653b commit 55b4bc9

24 files changed

+610
-357
lines changed

.github/workflows/ci-pr-validation.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868
run: ./pulsar-test-service-start.sh
6969

7070
- name: Run unit tests
71-
run: ./run-unit-tests.sh
71+
run: RETRY_FAILED=3 ./run-unit-tests.sh
7272

7373
- name: Stop Pulsar service
7474
run: ./pulsar-test-service-stop.sh

lib/ClientConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
314314
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
315315
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
316316

317-
std::mutex mutex_;
317+
mutable std::mutex mutex_;
318318
typedef std::unique_lock<std::mutex> Lock;
319319

320320
// Pending buffers to write on the socket

lib/ClientImpl.cc

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
189189
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
190190
CreateProducerCallback callback, ProducerImplBasePtr producer) {
191191
if (result == ResultOk) {
192-
Lock lock(mutex_);
193-
producers_.push_back(producer);
194-
lock.unlock();
192+
auto pair = producers_.emplace(producer.get(), producer);
193+
if (!pair.second) {
194+
auto existingProducer = pair.first->second.lock();
195+
LOG_ERROR("Unexpected existing producer at the same address: "
196+
<< pair.first->first << ", producer: "
197+
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
198+
callback(ResultUnknownError, {});
199+
return;
200+
}
195201
callback(result, Producer(producer));
196202
} else {
197203
callback(result, {});
@@ -241,9 +247,18 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
241247
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
242248
auto self = shared_from_this();
243249
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
244-
Lock lock(mutex_);
245-
consumers_.push_back(weakConsumerPtr);
246-
lock.unlock();
250+
auto consumer = weakConsumerPtr.lock();
251+
if (consumer) {
252+
auto pair = consumers_.emplace(consumer.get(), consumer);
253+
if (!pair.second) {
254+
auto existingConsumer = pair.first->second.lock();
255+
LOG_ERROR("Unexpected existing consumer at the same address: "
256+
<< pair.first->first
257+
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
258+
}
259+
} else {
260+
LOG_ERROR("Unexpected case: the consumer is somehow expired");
261+
}
247262
});
248263
}
249264

@@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
397412
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
398413
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
399414
if (result == ResultOk) {
400-
Lock lock(mutex_);
401-
consumers_.push_back(consumer);
402-
lock.unlock();
415+
auto pair = consumers_.emplace(consumer.get(), consumer);
416+
if (!pair.second) {
417+
auto existingConsumer = pair.first->second.lock();
418+
LOG_ERROR("Unexpected existing consumer at the same address: "
419+
<< pair.first->first
420+
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
421+
callback(ResultUnknownError, {});
422+
return;
423+
}
403424
callback(result, Consumer(consumer));
404425
} else {
405426
callback(result, {});
@@ -477,27 +498,26 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
477498
}
478499

479500
void ClientImpl::closeAsync(CloseCallback callback) {
480-
Lock lock(mutex_);
481-
ProducersList producers(producers_);
482-
ConsumersList consumers(consumers_);
483-
484-
if (state_ != Open && callback) {
485-
lock.unlock();
486-
callback(ResultAlreadyClosed);
501+
if (state_ != Open) {
502+
if (callback) {
503+
callback(ResultAlreadyClosed);
504+
}
487505
return;
488506
}
489507
// Set the state to Closing so that no producers could get added
490508
state_ = Closing;
491-
lock.unlock();
492509

493510
memoryLimitController_.close();
494511

512+
auto producers = producers_.move();
513+
auto consumers = consumers_.move();
514+
495515
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
496516
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
497517
<< " consumers");
498518

499-
for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
500-
ProducerImplBasePtr producer = it->lock();
519+
for (auto&& kv : producers) {
520+
ProducerImplBasePtr producer = kv.second.lock();
501521
if (producer && !producer->isClosed()) {
502522
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
503523
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -507,8 +527,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
507527
}
508528
}
509529

510-
for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
511-
ConsumerImplBasePtr consumer = it->lock();
530+
for (auto&& kv : consumers) {
531+
ConsumerImplBasePtr consumer = kv.second.lock();
512532
if (consumer && !consumer->isClosed()) {
513533
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
514534
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -562,23 +582,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
562582
}
563583

564584
void ClientImpl::shutdown() {
565-
Lock lock(mutex_);
566-
ProducersList producers;
567-
ConsumersList consumers;
585+
auto producers = producers_.move();
586+
auto consumers = consumers_.move();
568587

569-
producers.swap(producers_);
570-
consumers.swap(consumers_);
571-
lock.unlock();
572-
573-
for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
574-
ProducerImplBasePtr producer = it->lock();
588+
for (auto&& kv : producers) {
589+
ProducerImplBasePtr producer = kv.second.lock();
575590
if (producer) {
576591
producer->shutdown();
577592
}
578593
}
579594

580-
for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
581-
ConsumerImplBasePtr consumer = it->lock();
595+
for (auto&& kv : consumers) {
596+
ConsumerImplBasePtr consumer = kv.second.lock();
582597
if (consumer) {
583598
consumer->shutdown();
584599
}
@@ -631,26 +646,24 @@ uint64_t ClientImpl::newRequestId() {
631646
}
632647

633648
uint64_t ClientImpl::getNumberOfProducers() {
634-
Lock lock(mutex_);
635649
uint64_t numberOfAliveProducers = 0;
636-
for (const auto& producer : producers_) {
650+
producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
637651
const auto& producerImpl = producer.lock();
638652
if (producerImpl) {
639653
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
640654
}
641-
}
655+
});
642656
return numberOfAliveProducers;
643657
}
644658

645659
uint64_t ClientImpl::getNumberOfConsumers() {
646-
Lock lock(mutex_);
647660
uint64_t numberOfAliveConsumers = 0;
648-
for (const auto& consumer : consumers_) {
661+
consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
649662
const auto consumerImpl = consumer.lock();
650663
if (consumerImpl) {
651664
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
652665
}
653-
}
666+
});
654667
return numberOfAliveConsumers;
655668
}
656669

lib/ClientImpl.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <atomic>
3232
#include <vector>
3333
#include "ServiceNameResolver.h"
34+
#include "SynchronizedHashMap.h"
3435

3536
namespace pulsar {
3637

@@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9192
ExecutorServiceProviderPtr getListenerExecutorProvider();
9293
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
9394
LookupServicePtr getLookup();
95+
96+
void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
97+
98+
void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }
99+
94100
friend class PulsarFriend;
95101

96102
private:
@@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
147153
uint64_t consumerIdGenerator_;
148154
uint64_t requestIdGenerator_;
149155

150-
typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
151-
ProducersList producers_;
152-
153-
typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
154-
ConsumersList consumers_;
156+
SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
157+
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
155158

156159
std::atomic<Result> closingError;
157160

lib/ConnectionPool.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ class PULSAR_PUBLIC ConnectionPool {
7474
typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
7575
PoolMap pool_;
7676
bool poolConnections_;
77-
std::mutex mutex_;
77+
mutable std::mutex mutex_;
7878
std::atomic_bool closed_{false};
7979

80-
friend class ConnectionPoolTest;
80+
friend class PulsarFriend;
8181
};
8282
} // namespace pulsar
8383
#endif //_PULSAR_CONNECTION_POOL_HEADER_

0 commit comments

Comments
 (0)