Skip to content

Commit 54e529a

Browse files
authored
Fix ack failure on message listener in multi topics consumer (#447)
1 parent 8b2753a commit 54e529a

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

lib/ConsumerImpl.cc

+4-10
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
292292
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
293293
Result handleResult = ResultOk;
294294

295-
static bool firstTime = true;
296295
if (result == ResultOk) {
297-
if (firstTime) {
298-
firstTime = false;
299-
}
300296
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
301297
{
302298
Lock lock(mutex_);
@@ -313,12 +309,10 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
313309
}
314310

315311
LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
316-
if (consumerTopicType_ == NonPartitioned || !firstTime) {
317-
if (config_.getReceiverQueueSize() != 0) {
318-
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
319-
} else if (messageListener_) {
320-
sendFlowPermitsToBroker(cnx, 1);
321-
}
312+
if (config_.getReceiverQueueSize() != 0) {
313+
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
314+
} else if (messageListener_) {
315+
sendFlowPermitsToBroker(cnx, 1);
322316
}
323317
consumerCreatedPromise_.setValue(get_shared_this_ptr());
324318
} else {

lib/MultiTopicsConsumerImpl.cc

+10
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
149149
if (state_.compare_exchange_strong(state, Ready)) {
150150
LOG_INFO("Successfully Subscribed to Topics");
151151
multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
152+
// Now all child topics are successfully subscribed, start messageListeners
153+
if (messageListener_ && !conf_.isStartPaused()) {
154+
LOG_INFO("Start messageListeners");
155+
resumeMessageListener();
156+
}
152157
} else {
153158
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
154159
// unsubscribed all of the successfully subscribed partitioned consumers
@@ -205,6 +210,11 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
205210
ConsumerSubResultPromisePtr topicSubResultPromise) {
206211
std::shared_ptr<ConsumerImpl> consumer;
207212
ConsumerConfiguration config = conf_.clone();
213+
// Pause messageListener until all child topics are subscribed.
214+
// Otherwise messages may be acked before the parent consumer gets "Ready", causing ack failures.
215+
if (messageListener_) {
216+
config.setStartPaused(true);
217+
}
208218
auto client = client_.lock();
209219
if (!client) {
210220
topicSubResultPromise->setFailed(ResultAlreadyClosed);

0 commit comments

Comments
 (0)