Skip to content

Commit 6f115e7

Browse files
Fix ProducerBusy or ConsumerBusy error when configuring multiple brokers per connection (#337)
### Motivation This is a catch up for apache/pulsar#21144 When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool. https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75 If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed. ### Modifications - Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`. - Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object. ### Verifying this change `ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
1 parent 81cc562 commit 6f115e7

11 files changed

+58
-15
lines changed

lib/ClientImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
516516
}
517517
}
518518

519-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic) {
519+
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520520
Promise<Result, ClientConnectionPtr> promise;
521521

522522
const auto topicNamePtr = TopicName::get(topic);
@@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&
528528

529529
auto self = shared_from_this();
530530
lookupServicePtr_->getBroker(*topicNamePtr)
531-
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
531+
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
532532
if (result != ResultOk) {
533533
promise.setFailed(result);
534534
return;
535535
}
536-
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
536+
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key)
537537
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
538538
if (result == ResultOk) {
539539
auto cnx = weakCnx.lock();

lib/ClientImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9595

9696
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9797

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic);
98+
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
9999

100100
void closeAsync(CloseCallback callback);
101101
void shutdown();
@@ -123,6 +123,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
123123

124124
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }
125125

126+
ConnectionPool& getConnectionPool() noexcept { return pool_; }
127+
126128
friend class PulsarFriend;
127129

128130
private:

lib/ConnectionPool.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ bool ConnectionPool::close() {
6161
return true;
6262
}
6363

64-
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
65-
const std::string& logicalAddress, const std::string& physicalAddress) {
64+
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
65+
const std::string& physicalAddress,
66+
size_t keySuffix) {
6667
if (closed_) {
6768
Promise<Result, ClientConnectionWeakPtr> promise;
6869
promise.setFailed(ResultAlreadyClosed);
@@ -72,7 +73,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
7273
std::unique_lock<std::recursive_mutex> lock(mutex_);
7374

7475
std::stringstream ss;
75-
ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
76+
ss << logicalAddress << '-' << keySuffix;
7677
const std::string key = ss.str();
7778

7879
PoolMap::iterator cnxIt = pool_.find(key);
@@ -95,7 +96,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
9596
// No valid or pending connection found in the pool, creating a new one
9697
ClientConnectionPtr cnx;
9798
try {
98-
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
99+
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
99100
clientConfiguration_, authentication_, clientVersion_, *this));
100101
} catch (const std::runtime_error& e) {
101102
lock.unlock();

lib/ConnectionPool.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,29 @@ class PULSAR_PUBLIC ConnectionPool {
6565
* a proxy layer. Essentially, the pool is using the logical address as a way to
6666
* decide whether to reuse a particular connection.
6767
*
68+
* There could be many connections to the same broker, so this pool uses an integer key as the suffix of
69+
* the key that represents the connection.
70+
*
6871
* @param logicalAddress the address to use as the broker tag
6972
* @param physicalAddress the real address where the TCP connection should be made
73+
* @param keySuffix the key suffix to choose which connection on the same broker
7074
* @return a future that will produce the ClientCnx object
7175
*/
7276
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
73-
const std::string& physicalAddress);
77+
const std::string& physicalAddress,
78+
size_t keySuffix);
79+
80+
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
81+
const std::string& physicalAddress) {
82+
return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex());
83+
}
7484

7585
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
7686
return getConnectionAsync(address, address);
7787
}
7888

89+
size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
90+
7991
private:
8092
ClientConfiguration clientConfiguration_;
8193
ExecutorServiceProviderPtr executorProvider_;

lib/ExecutorService.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)> task) { io_service_.pos
133133
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
134134
: executors_(nthreads), executorIdx_(0), mutex_() {}
135135

136-
ExecutorServicePtr ExecutorServiceProvider::get() {
136+
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
137+
idx %= executors_.size();
137138
Lock lock(mutex_);
138139

139-
int idx = executorIdx_++ % executors_.size();
140140
if (!executors_[idx]) {
141141
executors_[idx] = ExecutorService::create();
142142
}

lib/ExecutorService.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,17 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
8888
public:
8989
explicit ExecutorServiceProvider(int nthreads);
9090

91-
ExecutorServicePtr get();
91+
ExecutorServicePtr get() { return get(executorIdx_++); }
92+
93+
ExecutorServicePtr get(size_t index);
9294

9395
// See TimeoutProcessor for the semantics of the parameter.
9496
void close(long timeoutMs = 3000);
9597

9698
private:
9799
typedef std::vector<ExecutorServicePtr> ExecutorList;
98100
ExecutorList executors_;
99-
int executorIdx_;
101+
std::atomic_size_t executorIdx_;
100102
std::mutex mutex_;
101103
typedef std::unique_lock<std::mutex> Lock;
102104
};

lib/HandlerBase.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace pulsar {
3232
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
3333
: topic_(std::make_shared<std::string>(topic)),
3434
client_(client),
35+
connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
3536
executor_(client->getIOExecutorProvider()->get()),
3637
mutex_(),
3738
creationTimestamp_(TimeUtils::now()),
@@ -88,7 +89,8 @@ void HandlerBase::grabCnx() {
8889
return;
8990
}
9091
auto self = shared_from_this();
91-
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
92+
auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
93+
cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
9294
if (result == ResultOk) {
9395
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
9496
connectionOpened(cnx).addListener([this, self](Result result, bool) {

lib/HandlerBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
9999

100100
protected:
101101
ClientImplWeakPtr client_;
102+
const size_t connectionKeySuffix_;
102103
ExecutorServicePtr executor_;
103104
mutable std::mutex mutex_;
104105
std::mutex pendingReceiveMutex_;

lib/ProducerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
966966
}
967967

968968
void ProducerImpl::disconnectProducer() {
969-
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
969+
LOG_INFO("Broker notification of Closed producer: " << producerId_);
970970
resetCnx();
971971
scheduleReconnection();
972972
}

tests/ProducerTest.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,4 +618,19 @@ TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp
618618
client.close();
619619
}
620620

621+
TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) {
622+
ClientConfiguration conf;
623+
conf.setConnectionsPerBroker(10);
624+
625+
Client client(serviceUrl, conf);
626+
Producer producer;
627+
ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice", producer));
628+
629+
for (int i = 0; i < 5; i++) {
630+
ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i;
631+
}
632+
633+
client.close();
634+
}
635+
621636
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

tests/PulsarFriend.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <string>
2323

24+
#include "WaitUtils.h"
2425
#include "lib/ClientConnection.h"
2526
#include "lib/ClientImpl.h"
2627
#include "lib/ConsumerConfigurationImpl.h"
@@ -197,6 +198,13 @@ class PulsarFriend {
197198
lookupData->setPartitions(newPartitions);
198199
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
199200
}
201+
202+
static bool reconnect(Producer producer) {
203+
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
204+
producerImpl->disconnectProducer();
205+
return waitUntil(std::chrono::seconds(3),
206+
[producerImpl] { return !producerImpl->getCnx().expired(); });
207+
}
200208
};
201209
} // namespace pulsar
202210

0 commit comments

Comments
 (0)