Skip to content

Commit 234a55d

Browse files
authored
[feat] Add startPaused setting to consumer (#416)
### Motivation The Java client consumer has a setting called `startPaused`. If this is set to true, the created consumer will not fetch messages from the broker until resume is called. Currently, this setting does not seem to exist in the C++ client consumer, so I will add it.
1 parent ee1d7b9 commit 234a55d

7 files changed

+93
-3
lines changed

include/pulsar/ConsumerConfiguration.h

+15
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
659659
*/
660660
bool isAckReceiptEnabled() const;
661661

662+
/**
663+
* Starts the consumer in a paused state.
664+
*
665+
* When enabled, the consumer does not immediately fetch messages when the consumer is created.
666+
* Instead, the consumer waits to fetch messages until Consumer::resumeMessageListener is called.
667+
*
668+
* Default: false
669+
*/
670+
ConsumerConfiguration& setStartPaused(bool startPaused);
671+
672+
/**
673+
* The associated getter of setStartPaused.
674+
*/
675+
bool isStartPaused() const;
676+
662677
friend class PulsarWrapper;
663678
friend class PulsarFriend;
664679

include/pulsar/c/consumer_configuration.h

+6
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,12 @@ PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode
353353
pulsar_consumer_configuration_get_regex_subscription_mode(
354354
pulsar_consumer_configuration_t *consumer_configuration);
355355

356+
PULSAR_PUBLIC void pulsar_consumer_configuration_set_start_paused(
357+
pulsar_consumer_configuration_t *consumer_configuration, int start_paused);
358+
359+
PULSAR_PUBLIC int pulsar_consumer_configuration_is_start_paused(
360+
pulsar_consumer_configuration_t *consumer_configuration);
361+
356362
/**
357363
* Set batch receive policy.
358364
*

lib/ConsumerConfiguration.cc

+7
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,13 @@ ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei
317317

318318
bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; }
319319

320+
ConsumerConfiguration& ConsumerConfiguration::setStartPaused(bool startPaused) {
321+
impl_->startPaused = startPaused;
322+
return *this;
323+
}
324+
325+
bool ConsumerConfiguration::isStartPaused() const { return impl_->startPaused; }
326+
320327
ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
321328
RegexSubscriptionMode regexSubscriptionMode) {
322329
impl_->regexSubscriptionMode = regexSubscriptionMode;

lib/ConsumerConfigurationImpl.h

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct ConsumerConfigurationImpl {
6262
bool batchIndexAckEnabled{false};
6363
std::vector<ConsumerInterceptorPtr> interceptors;
6464
bool ackReceiptEnabled{false};
65+
bool startPaused{false};
6566
};
6667
} // namespace pulsar
6768
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */

lib/ConsumerImpl.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
8888
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
8989
consumerId_(client->newConsumerId()),
9090
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
91-
messageListenerRunning_(true),
91+
messageListenerRunning_(!conf.isStartPaused()),
9292
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this, conf)),
9393
readCompacted_(conf.isReadCompacted()),
9494
startMessageId_(startMessageId),

lib/c/c_ConsumerConfiguration.cc

+9
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,15 @@ pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_
254254
consumer_configuration->consumerConfiguration.getRegexSubscriptionMode();
255255
}
256256

257+
void pulsar_consumer_configuration_set_start_paused(pulsar_consumer_configuration_t *consumer_configuration,
258+
int start_paused) {
259+
consumer_configuration->consumerConfiguration.setStartPaused(start_paused);
260+
}
261+
262+
int pulsar_consumer_configuration_is_start_paused(pulsar_consumer_configuration_t *consumer_configuration) {
263+
return consumer_configuration->consumerConfiguration.isStartPaused();
264+
}
265+
257266
int pulsar_consumer_configuration_set_batch_receive_policy(
258267
pulsar_consumer_configuration_t *consumer_configuration,
259268
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy_t) {

tests/BasicEndToEndTest.cc

+54-2
Original file line numberDiff line numberDiff line change
@@ -917,8 +917,7 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
917917
std::string topicName = "partition-testMessageListenerPause";
918918

919919
// call admin api to make it partitioned
920-
std::string url =
921-
adminUrl + "admin/v2/persistent/public/default/partition-testMessageListener-pauses/partitions";
920+
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
922921
int res = makePutRequest(url, "5");
923922

924923
LOG_INFO("res = " << res);
@@ -968,6 +967,59 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
968967
client.close();
969968
}
970969

970+
void testStartPaused(bool isPartitioned) {
971+
Client client(lookupUrl);
972+
std::string topicName =
973+
isPartitioned ? "testStartPausedWithPartitionedTopic" : "testStartPausedWithNonPartitionedTopic";
974+
std::string subName = "sub";
975+
976+
if (isPartitioned) {
977+
// Call admin api to make it partitioned
978+
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
979+
int res = makePutRequest(url, "5");
980+
LOG_INFO("res = " << res);
981+
ASSERT_FALSE(res != 204 && res != 409);
982+
}
983+
984+
Producer producer;
985+
Result result = client.createProducer(topicName, producer);
986+
987+
// Initializing global Count
988+
globalCount = 0;
989+
990+
ConsumerConfiguration consumerConfig;
991+
consumerConfig.setMessageListener(
992+
std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2));
993+
consumerConfig.setStartPaused(true);
994+
Consumer consumer;
995+
// Removing dangling subscription from previous test failures
996+
result = client.subscribe(topicName, subName, consumerConfig, consumer);
997+
consumer.unsubscribe();
998+
999+
result = client.subscribe(topicName, subName, consumerConfig, consumer);
1000+
ASSERT_EQ(ResultOk, result);
1001+
1002+
int numOfMessages = 50;
1003+
for (int i = 0; i < numOfMessages; i++) {
1004+
std::string messageContent = "msg-" + std::to_string(i);
1005+
Message msg = MessageBuilder().setContent(messageContent).build();
1006+
ASSERT_EQ(ResultOk, producer.send(msg));
1007+
}
1008+
1009+
std::this_thread::sleep_for(std::chrono::microseconds(2 * 1000 * 1000));
1010+
ASSERT_EQ(globalCount, 0);
1011+
consumer.resumeMessageListener();
1012+
ASSERT_TRUE(waitUntil(std::chrono::seconds(5), [&]() -> bool { return globalCount >= numOfMessages; }));
1013+
1014+
consumer.unsubscribe();
1015+
producer.close();
1016+
client.close();
1017+
}
1018+
1019+
TEST(BasicEndToEndTest, testStartPausedWithNonPartitionedTopic) { testStartPaused(false); }
1020+
1021+
TEST(BasicEndToEndTest, testStartPausedWithPartitionedTopic) { testStartPaused(true); }
1022+
9711023
TEST(BasicEndToEndTest, testResendViaSendCallback) {
9721024
ClientConfiguration clientConfiguration;
9731025
clientConfiguration.setIOThreads(1);

0 commit comments

Comments
 (0)