Skip to content

Commit 37ea769

Browse files
Fix the flush callback might be called repeatedly (#353)
Fixes #352 ### Motivation #303 adds the flush callback to the last `OpSendMsg` instead of adding to the batch message container. However, `batchMessageAndSend` will create an `OpSendMsg` and add it to the `pendingMessagesQueue_`. https://github.com/apache/pulsar-client-cpp/blob/7bb94f45b917ed30b5302ac93ffa1f1942fc6313/lib/ProducerImpl.cc#L384-L389 In the code above, `pendingMessagesQueue_` could never be empty and the callback will be added again by `opSendMsg->addTrackerCallback`. The 1st time it's added in `createOpSendMsg` or `createOpSendMsgs` called by `batchMessageAndSend`. ### Motivation Add the callback to the last `OpSendMsg only when the batch message container is empty. In `testFlushBatch`, replace the `flush` call with the `flushAsync` call and verify the callback is only called once after it's completed.
1 parent 7bb94f4 commit 37ea769

File tree

3 files changed

+48
-19
lines changed

3 files changed

+48
-19
lines changed

lib/BatchMessageKeyBasedContainer.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,16 @@ std::vector<std::unique_ptr<OpSendMsg>> BatchMessageKeyBasedContainer::createOpS
7777
// Store raw pointers to use std::sort
7878
std::vector<OpSendMsg*> rawOpSendMsgs;
7979
for (auto& kv : batches_) {
80-
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
80+
if (!kv.second.empty()) {
81+
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
82+
}
8183
}
8284
std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) {
8385
return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
8486
});
87+
if (rawOpSendMsgs.empty()) {
88+
return {};
89+
}
8590
rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
8691

8792
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};

lib/ProducerImpl.cc

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -376,29 +376,37 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen
376376

377377
void ProducerImpl::flushAsync(FlushCallback callback) {
378378
if (state_ != Ready) {
379-
callback(ResultAlreadyClosed);
379+
if (callback) {
380+
callback(ResultAlreadyClosed);
381+
}
380382
return;
381383
}
384+
385+
auto addCallbackToLastOp = [this, &callback] {
386+
if (pendingMessagesQueue_.empty()) {
387+
return false;
388+
}
389+
pendingMessagesQueue_.back()->addTrackerCallback(callback);
390+
return true;
391+
};
392+
382393
if (batchMessageContainer_) {
383394
Lock lock(mutex_);
384-
auto failures = batchMessageAndSend(callback);
385-
if (!pendingMessagesQueue_.empty()) {
386-
auto& opSendMsg = pendingMessagesQueue_.back();
387-
lock.unlock();
388-
failures.complete();
389-
opSendMsg->addTrackerCallback(callback);
390-
} else {
391-
lock.unlock();
392-
failures.complete();
393-
callback(ResultOk);
395+
396+
if (batchMessageContainer_->isEmpty()) {
397+
if (!addCallbackToLastOp() && callback) {
398+
lock.unlock();
399+
callback(ResultOk);
400+
}
401+
return;
394402
}
403+
404+
auto failures = batchMessageAndSend(callback);
405+
lock.unlock();
406+
failures.complete();
395407
} else {
396408
Lock lock(mutex_);
397-
if (!pendingMessagesQueue_.empty()) {
398-
auto& opSendMsg = pendingMessagesQueue_.back();
399-
lock.unlock();
400-
opSendMsg->addTrackerCallback(callback);
401-
} else {
409+
if (!addCallbackToLastOp() && callback) {
402410
lock.unlock();
403411
callback(ResultOk);
404412
}

tests/ProducerTest.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,23 @@ TEST_P(ProducerTest, testFlushBatch) {
476476
producer.sendAsync(msg, cb);
477477
}
478478

479-
producer.flush();
479+
auto assertFlushCallbackOnce = [&producer] {
480+
Latch latch{1};
481+
std::mutex mutex;
482+
std::vector<Result> results;
483+
producer.flushAsync([&](Result result) {
484+
{
485+
std::lock_guard<std::mutex> lock{mutex};
486+
results.emplace_back(result);
487+
}
488+
latch.countdown();
489+
});
490+
latch.wait();
491+
std::lock_guard<std::mutex> lock{mutex};
492+
ASSERT_EQ(results, (std::vector<Result>{ResultOk}));
493+
};
494+
495+
assertFlushCallbackOnce();
480496
ASSERT_EQ(needCallBack.load(), 0);
481497
producer.close();
482498

@@ -494,7 +510,7 @@ TEST_P(ProducerTest, testFlushBatch) {
494510
producer.sendAsync(msg, cb2);
495511
}
496512

497-
producer.flush();
513+
assertFlushCallbackOnce();
498514
ASSERT_EQ(needCallBack2.load(), 0);
499515
producer.close();
500516

0 commit comments

Comments
 (0)