Skip to content

Commit 81cc562

Browse files
authored
Added support for multiple connections to each broker (#336)
1 parent 77e2d63 commit 81cc562

12 files changed

+82
-76
lines changed

include/pulsar/Client.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,6 @@ class PULSAR_PUBLIC Client {
415415
std::function<void(Result, const SchemaInfo&)> callback);
416416

417417
private:
418-
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
419-
bool poolConnections);
420418
Client(const std::shared_ptr<ClientImpl>);
421419

422420
friend class PulsarFriend;

include/pulsar/ClientConfiguration.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ class PULSAR_PUBLIC ClientConfiguration {
4646
*/
4747
uint64_t getMemoryLimit() const;
4848

49+
/**
50+
* Sets the max number of connection that the client library will open to a single broker.
51+
* By default, the connection pool will use a single connection for all the producers and consumers.
52+
* Increasing this parameter may improve throughput when using many producers over a high latency
53+
* connection.
54+
*
55+
* @param connectionsPerBroker max number of connections per broker (needs to be greater than 0)
56+
*/
57+
ClientConfiguration& setConnectionsPerBroker(int connectionsPerBroker);
58+
59+
/**
60+
* @return the max number of connection that the client library will open to a single broker
61+
*/
62+
int getConnectionsPerBroker() const;
63+
4964
/**
5065
* Set the authentication method to be used with the broker
5166
*

lib/Client.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,10 @@ namespace pulsar {
3636
Client::Client(const std::shared_ptr<ClientImpl> impl) : impl_(impl) {}
3737

3838
Client::Client(const std::string& serviceUrl)
39-
: impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration(), true)) {}
39+
: impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
4040

4141
Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
42-
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, true)) {}
43-
44-
Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
45-
bool poolConnections)
46-
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, poolConnections)) {}
42+
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
4743

4844
Result Client::createProducer(const std::string& topic, Producer& producer) {
4945
return createProducer(topic, ProducerConfiguration(), producer);

lib/ClientConfiguration.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ ClientConfiguration& ClientConfiguration::setMemoryLimit(uint64_t memoryLimitByt
4040

4141
uint64_t ClientConfiguration::getMemoryLimit() const { return impl_->memoryLimit; }
4242

43+
ClientConfiguration& ClientConfiguration::setConnectionsPerBroker(int connectionsPerBroker) {
44+
if (connectionsPerBroker <= 0) {
45+
throw std::invalid_argument("connectionsPerBroker should be greater than 0");
46+
}
47+
impl_->connectionsPerBroker = connectionsPerBroker;
48+
return *this;
49+
}
50+
51+
int ClientConfiguration::getConnectionsPerBroker() const { return impl_->connectionsPerBroker; }
52+
4353
ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authentication) {
4454
impl_->authenticationPtr = authentication;
4555
return *this;

lib/ClientConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ struct ClientConfigurationImpl {
2727
AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
2828
uint64_t memoryLimit{0ull};
2929
int ioThreads{1};
30+
int connectionsPerBroker{1};
3031
int operationTimeoutSeconds{30};
3132
int messageListenerThreads{1};
3233
int concurrentLookupRequest{50000};

lib/ClientImpl.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ typedef std::unique_lock<std::mutex> Lock;
7575

7676
typedef std::vector<std::string> StringList;
7777

78-
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
79-
bool poolConnections)
78+
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
8079
: mutex_(),
8180
state_(Open),
8281
serviceNameResolver_(serviceUrl),
@@ -87,7 +86,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
8786
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
8887
partitionListenerExecutorProvider_(
8988
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
90-
pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(), poolConnections,
89+
pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(),
9190
ClientImpl::getClientVersion(clientConfiguration)),
9291
producerIdGenerator_(0),
9392
consumerIdGenerator_(0),

lib/ClientImpl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ std::string generateRandomName();
6868

6969
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7070
public:
71-
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
72-
bool poolConnections);
71+
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
7372
~ClientImpl();
7473

7574
/**

lib/ConnectionPool.cc

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ DECLARE_LOG_OBJECT()
3434
namespace pulsar {
3535

3636
ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
37-
const AuthenticationPtr& authentication, bool poolConnections,
38-
const std::string& clientVersion)
37+
const AuthenticationPtr& authentication, const std::string& clientVersion)
3938
: clientConfiguration_(conf),
4039
executorProvider_(executorProvider),
4140
authentication_(authentication),
42-
poolConnections_(poolConnections),
43-
clientVersion_(clientVersion) {}
41+
clientVersion_(clientVersion),
42+
randomDistribution_(0, conf.getConnectionsPerBroker() - 1),
43+
randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {}
4444

4545
bool ConnectionPool::close() {
4646
bool expectedState = false;
@@ -49,16 +49,15 @@ bool ConnectionPool::close() {
4949
}
5050

5151
std::unique_lock<std::recursive_mutex> lock(mutex_);
52-
if (poolConnections_) {
53-
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
54-
auto& cnx = cnxIt->second;
55-
if (cnx) {
56-
// The 2nd argument is false because removing a value during the iteration will cause segfault
57-
cnx->close(ResultDisconnected, false);
58-
}
52+
53+
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
54+
auto& cnx = cnxIt->second;
55+
if (cnx) {
56+
// The 2nd argument is false because removing a value during the iteration will cause segfault
57+
cnx->close(ResultDisconnected, false);
5958
}
60-
pool_.clear();
6159
}
60+
pool_.clear();
6261
return true;
6362
}
6463

@@ -72,22 +71,24 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
7271

7372
std::unique_lock<std::recursive_mutex> lock(mutex_);
7473

75-
if (poolConnections_) {
76-
PoolMap::iterator cnxIt = pool_.find(logicalAddress);
77-
if (cnxIt != pool_.end()) {
78-
auto& cnx = cnxIt->second;
79-
80-
if (!cnx->isClosed()) {
81-
// Found a valid or pending connection in the pool
82-
LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " //
83-
<< (cnx.use_count()) << " @ " << cnx.get());
84-
return cnx->getConnectFuture();
85-
} else {
86-
// The closed connection should have been removed from the pool in ClientConnection::close
87-
LOG_WARN("Deleting stale connection from pool for "
88-
<< logicalAddress << " use_count: " << (cnx.use_count()) << " @ " << cnx.get());
89-
pool_.erase(logicalAddress);
90-
}
74+
std::stringstream ss;
75+
ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
76+
const std::string key = ss.str();
77+
78+
PoolMap::iterator cnxIt = pool_.find(key);
79+
if (cnxIt != pool_.end()) {
80+
auto& cnx = cnxIt->second;
81+
82+
if (!cnx->isClosed()) {
83+
// Found a valid or pending connection in the pool
84+
LOG_DEBUG("Got connection from pool for " << key << " use_count: " //
85+
<< (cnx.use_count()) << " @ " << cnx.get());
86+
return cnx->getConnectFuture();
87+
} else {
88+
// The closed connection should have been removed from the pool in ClientConnection::close
89+
LOG_WARN("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count())
90+
<< " @ " << cnx.get());
91+
pool_.erase(key);
9192
}
9293
}
9394

@@ -107,7 +108,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
107108
LOG_INFO("Created connection for " << logicalAddress);
108109

109110
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
110-
pool_.insert(std::make_pair(logicalAddress, cnx));
111+
pool_.insert(std::make_pair(key, cnx));
111112

112113
lock.unlock();
113114

lib/ConnectionPool.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <map>
2828
#include <memory>
2929
#include <mutex>
30+
#include <random>
3031
#include <string>
3132

3233
#include "Future.h"
@@ -41,8 +42,7 @@ using ExecutorServiceProviderPtr = std::shared_ptr<ExecutorServiceProvider>;
4142
class PULSAR_PUBLIC ConnectionPool {
4243
public:
4344
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
44-
const AuthenticationPtr& authentication, bool poolConnections,
45-
const std::string& clientVersion);
45+
const AuthenticationPtr& authentication, const std::string& clientVersion);
4646

4747
/**
4848
* Close the connection pool.
@@ -82,11 +82,13 @@ class PULSAR_PUBLIC ConnectionPool {
8282
AuthenticationPtr authentication_;
8383
typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
8484
PoolMap pool_;
85-
bool poolConnections_;
8685
const std::string clientVersion_;
8786
mutable std::recursive_mutex mutex_;
8887
std::atomic_bool closed_{false};
8988

89+
std::uniform_int_distribution<> randomDistribution_;
90+
std::mt19937 randomEngine_;
91+
9092
friend class PulsarFriend;
9193
};
9294
} // namespace pulsar

perf/PerfConsumer.cc

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,12 @@ struct Arguments {
6767
int receiverQueueSize;
6868
int ioThreads;
6969
int listenerThreads;
70-
bool poolConnections;
70+
int connectionsPerBroker;
71+
7172
std::string encKeyName;
7273
std::string encKeyValueFile;
7374
};
7475

75-
namespace pulsar {
76-
class PulsarFriend {
77-
public:
78-
static Client getClient(const std::string& url, const ClientConfiguration conf, bool poolConnections) {
79-
return Client(url, conf, poolConnections);
80-
}
81-
};
82-
} // namespace pulsar
83-
8476
#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
8577
// Used for gcc-4.4.7 with boost-1.41
8678
#include <cstdatomic>
@@ -167,14 +159,15 @@ void startPerfConsumer(const Arguments& args) {
167159
std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
168160
conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
169161
}
162+
conf.setConnectionsPerBroker(args.connectionsPerBroker);
170163
conf.setIOThreads(args.ioThreads);
171164
conf.setMessageListenerThreads(args.listenerThreads);
172165
if (!args.authPlugin.empty()) {
173166
AuthenticationPtr auth = AuthFactory::create(args.authPlugin, args.authParams);
174167
conf.setAuth(auth);
175168
}
176169

177-
Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, args.poolConnections));
170+
Client client(args.serviceURL, conf);
178171

179172
ConsumerConfiguration consumerConf;
180173
consumerConf.setMessageListener(messageListener);
@@ -299,8 +292,8 @@ int main(int argc, char** argv) {
299292
("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
300293
"Number of listener threads") //
301294

302-
("pool-connections", po::value<bool>(&args.poolConnections)->default_value(false),
303-
"whether pool connections used") //
295+
("connections-per-broker", po::value<int>(&args.connectionsPerBroker)->default_value(1),
296+
"Number of connections per each broker") //
304297

305298
("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""),
306299
"The private key name to decrypt payload") //

perf/PerfProducer.cc

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,12 @@ struct Arguments {
6464
unsigned int batchingMaxMessages;
6565
long batchingMaxAllowedSizeInBytes;
6666
long batchingMaxPublishDelayMs;
67-
bool poolConnections;
67+
int connectionsPerBroker;
6868
std::string encKeyName;
6969
std::string encKeyValueFile;
7070
std::string compression;
7171
};
7272

73-
namespace pulsar {
74-
class PulsarFriend {
75-
public:
76-
static Client getClient(const std::string& url, const ClientConfiguration conf, bool poolConnections) {
77-
return Client(url, conf, poolConnections);
78-
}
79-
};
80-
} // namespace pulsar
81-
8273
unsigned long messagesProduced;
8374
unsigned long bytesProduced;
8475
using namespace boost::accumulators;
@@ -283,8 +274,8 @@ int main(int argc, char** argv) {
283274
po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
284275
"Use only is batch-size > 1, Default is 3 seconds") //
285276

286-
("pool-connections", po::value<bool>(&args.poolConnections)->default_value(false),
287-
"whether pool connections used") //
277+
("connections-per-broker", po::value<int>(&args.connectionsPerBroker)->default_value(1),
278+
"Number of connections per each broker") //
288279

289280
("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""),
290281
"The public key name to encrypt payload") //
@@ -371,6 +362,7 @@ int main(int argc, char** argv) {
371362
producerConf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
372363

373364
pulsar::ClientConfiguration conf;
365+
conf.setConnectionsPerBroker(args.connectionsPerBroker);
374366
conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
375367
conf.setUseTls(args.isUseTls);
376368
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
@@ -385,7 +377,7 @@ int main(int argc, char** argv) {
385377
conf.setAuth(auth);
386378
}
387379

388-
pulsar::Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, args.poolConnections));
380+
pulsar::Client client(args.serviceURL, conf);
389381

390382
std::atomic<bool> exitCondition(false);
391383
startPerfProducer(args, producerConf, client, exitCondition);

tests/LookupServiceTest.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ TEST(LookupServiceTest, basicLookup) {
7676
std::string url = "pulsar://localhost:6650";
7777
ClientConfiguration conf;
7878
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
79-
ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
79+
ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
8080
ServiceNameResolver serviceNameResolver(url);
8181
BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
8282

@@ -140,7 +140,7 @@ static void testMultiAddresses(LookupService& lookupService) {
140140
}
141141

142142
TEST(LookupServiceTest, testMultiAddresses) {
143-
ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), true, "");
143+
ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), "");
144144
ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
145145
ClientConfiguration conf;
146146
BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf);
@@ -154,7 +154,7 @@ TEST(LookupServiceTest, testMultiAddresses) {
154154
}
155155
TEST(LookupServiceTest, testRetry) {
156156
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
157-
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, "");
157+
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
158158
ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
159159
ClientConfiguration conf;
160160

@@ -188,7 +188,7 @@ TEST(LookupServiceTest, testRetry) {
188188

189189
TEST(LookupServiceTest, testTimeout) {
190190
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
191-
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, "");
191+
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
192192
ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
193193
ClientConfiguration conf;
194194

@@ -451,7 +451,7 @@ TEST(LookupServiceTest, testRedirectionLimit) {
451451
ClientConfiguration conf;
452452
conf.setMaxLookupRedirects(redirect_limit);
453453
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
454-
ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
454+
ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
455455
std::string url = "pulsar://localhost:6650";
456456
ServiceNameResolver serviceNameResolver(url);
457457
BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf);

0 commit comments

Comments
 (0)