Skip to content

Commit ce6c4bc

Browse files
authored
Expose keep alive interval for c and c++ client (#457)
* Expose keep alive interval for c and c++ client * Optimize
1 parent 3efa80a commit ce6c4bc

8 files changed

+47
-4
lines changed

include/pulsar/ClientConfiguration.h

+13
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,19 @@ class PULSAR_PUBLIC ClientConfiguration {
356356
*/
357357
int getConnectionTimeout() const;
358358

359+
/**
360+
* Set keep alive interval for each client-broker-connection. <i>(default: 30 seconds)</i>.
361+
*
362+
* @param keepAliveIntervalInSeconds
363+
* @return
364+
*/
365+
ClientConfiguration& setKeepAliveIntervalInSeconds(unsigned int keepAliveIntervalInSeconds);
366+
367+
/**
368+
* The getter associated with setKeepAliveIntervalInSeconds().
369+
*/
370+
unsigned int getKeepAliveIntervalInSeconds() const;
371+
359372
friend class ClientImpl;
360373
friend class PulsarWrapper;
361374

include/pulsar/c/client_configuration.h

+6
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ PULSAR_PUBLIC const unsigned int pulsar_client_configuration_get_partitions_upda
204204
PULSAR_PUBLIC const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
205205
pulsar_client_configuration_t *conf);
206206

207+
PULSAR_PUBLIC void pulsar_client_configuration_set_keep_alive_interval_in_seconds(
208+
pulsar_client_configuration_t *conf, unsigned int keepAliveIntervalInSeconds);
209+
210+
PULSAR_PUBLIC unsigned int pulsar_client_configuration_get_keep_alive_interval_in_seconds(
211+
pulsar_client_configuration_t *conf);
212+
207213
#ifdef __cplusplus
208214
}
209215
#endif

lib/ClientConfiguration.cc

+10
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,16 @@ ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) {
214214

215215
int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; }
216216

217+
ClientConfiguration& ClientConfiguration::setKeepAliveIntervalInSeconds(
218+
unsigned int keepAliveIntervalInSeconds) {
219+
impl_->keepAliveIntervalInSeconds = keepAliveIntervalInSeconds;
220+
return *this;
221+
}
222+
223+
unsigned int ClientConfiguration::getKeepAliveIntervalInSeconds() const {
224+
return impl_->keepAliveIntervalInSeconds;
225+
}
226+
217227
ClientConfiguration& ClientConfiguration::setDescription(const std::string& description) {
218228
if (description.length() > 64) {
219229
throw std::invalid_argument("The description length exceeds 64");

lib/ClientConfigurationImpl.h

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct ClientConfigurationImpl {
4747
unsigned int partitionsUpdateInterval{60}; // 1 minute
4848
std::string listenerName;
4949
int connectionTimeoutMs{10000}; // 10 seconds
50+
unsigned int keepAliveIntervalInSeconds{30};
5051
std::string description;
5152
std::string proxyServiceUrl;
5253
ClientConfiguration::ProxyProtocol proxyProtocol;

lib/ClientConnection.cc

+3-4
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ using proto::BaseCommand;
5151

5252
static const uint32_t DefaultBufferSize = 64 * 1024;
5353

54-
static const int KeepAliveIntervalInSeconds = 30;
55-
5654
static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
5755
return MessageIdBuilder::from(messageIdData).build();
5856
}
@@ -186,6 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
186184
connectTimeoutTask_(
187185
std::make_shared<PeriodicTask>(*executor_, clientConfiguration.getConnectionTimeout())),
188186
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
187+
keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()),
189188
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
190189
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
191190
clientVersion_(clientVersion),
@@ -310,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
310309
// Only send keep-alive probes if the broker supports it
311310
keepAliveTimer_ = executor_->createDeadlineTimer();
312311
if (keepAliveTimer_) {
313-
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
312+
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
314313
auto weakSelf = weak_from_this();
315314
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
316315
auto self = weakSelf.lock();
@@ -1245,7 +1244,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12451244
// be zero And we do not attempt to dereference the pointer.
12461245
Lock lock(mutex_);
12471246
if (keepAliveTimer_) {
1248-
keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds));
1247+
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
12491248
auto weakSelf = weak_from_this();
12501249
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
12511250
auto self = weakSelf.lock();

lib/ClientConnection.h

+1
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
388388
// Signals whether we're waiting for a response from broker
389389
bool havePendingPingRequest_ = false;
390390
bool isSniProxy_ = false;
391+
unsigned int keepAliveIntervalInSeconds_;
391392
DeadlineTimerPtr keepAliveTimer_;
392393
DeadlineTimerPtr consumerStatsRequestTimer_;
393394

lib/c/c_ClientConfiguration.cc

+10
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,13 @@ const unsigned int pulsar_client_configuration_get_partitions_update_interval(
208208
pulsar_client_configuration_t *conf) {
209209
return conf->conf.getPartitionsUpdateInterval();
210210
}
211+
212+
void pulsar_client_configuration_set_keep_alive_interval_in_seconds(pulsar_client_configuration_t *conf,
213+
unsigned int keepAliveIntervalInSeconds) {
214+
conf->conf.setKeepAliveIntervalInSeconds(keepAliveIntervalInSeconds);
215+
}
216+
217+
unsigned int pulsar_client_configuration_get_keep_alive_interval_in_seconds(
218+
pulsar_client_configuration_t *conf) {
219+
return conf->conf.getKeepAliveIntervalInSeconds();
220+
}

tests/c/c_ClientConfigurationTest.cc

+3
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ TEST(C_ClientConfigurationTest, testCApiConfig) {
3434

3535
pulsar_client_configuration_set_partitions_update_interval(conf, 10);
3636
ASSERT_EQ(pulsar_client_configuration_get_partitions_update_interval(conf), 10);
37+
38+
pulsar_client_configuration_set_keep_alive_interval_in_seconds(conf, 60);
39+
ASSERT_EQ(pulsar_client_configuration_get_keep_alive_interval_in_seconds(conf), 60);
3740
}

0 commit comments

Comments
 (0)