Skip to content

Commit dfa854b

Browse files
massakamBewareMyPower
authored andcommitted
Fix an issue where zero queue consumers are unable to receive messages after topic unloading (#473)
(cherry picked from commit afeac78)
1 parent 115d64a commit dfa854b

File tree

2 files changed

+154
-14
lines changed

2 files changed

+154
-14
lines changed

lib/ConsumerImpl.cc

+54-14
Original file line numberDiff line numberDiff line change
@@ -310,15 +310,23 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
310310
if (result == ResultOk) {
311311
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
312312
{
313-
Lock lock(mutex_);
313+
Lock mutexLock(mutex_);
314314
setCnx(cnx);
315315
incomingMessages_.clear();
316316
possibleSendToDeadLetterTopicMessages_.clear();
317317
state_ = Ready;
318318
backoff_.reset();
319-
// Complicated logic since we don't have a isLocked() function for mutex
320-
if (waitingForZeroQueueSizeMessage) {
321-
sendFlowPermitsToBroker(cnx, 1);
319+
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
320+
// Complicated logic since we don't have a isLocked() function for mutex
321+
if (waitingForZeroQueueSizeMessage) {
322+
sendFlowPermitsToBroker(cnx, 1);
323+
}
324+
// Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
325+
// otherwise a deadlock will occur.
326+
Lock pendingReceiveMutexLock(pendingReceiveMutex_);
327+
if (!pendingReceives_.empty()) {
328+
sendFlowPermitsToBroker(cnx, pendingReceives_.size());
329+
}
322330
}
323331
availablePermits_ = 0;
324332
}
@@ -915,7 +923,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
915923
}
916924

917925
// Using RAII for locking
918-
ClientConnectionPtr currentCnx = getCnx().lock();
919926
Lock lock(mutexForReceiveWithZeroQueueSize);
920927

921928
// Just being cautious
@@ -924,9 +931,18 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
924931
getName() << "The incoming message queue should never be greater than 0 when Queue size is 0");
925932
incomingMessages_.clear();
926933
}
927-
waitingForZeroQueueSizeMessage = true;
928934

929-
sendFlowPermitsToBroker(currentCnx, 1);
935+
{
936+
// Lock mutex_ to prevent a race condition with handleCreateConsumer.
937+
// If handleCreateConsumer is executed after setting waitingForZeroQueueSizeMessage to true and
938+
// before calling sendFlowPermitsToBroker, the result may be that a flow permit is sent twice.
939+
Lock lock(mutex_);
940+
waitingForZeroQueueSizeMessage = true;
941+
// If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
942+
// In other words, a flow permit will not be sent until setCnx(cnx) is executed in
943+
// handleCreateConsumer.
944+
sendFlowPermitsToBroker(getCnx().lock(), 1);
945+
}
930946

931947
while (true) {
932948
if (!incomingMessages_.pop(msg)) {
@@ -939,6 +955,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
939955
Lock localLock(mutex_);
940956
// if message received due to an old flow - discard it and wait for the message from the
941957
// latest flow command
958+
ClientConnectionPtr currentCnx = getCnx().lock();
942959
if (msg.impl_->cnx_ == currentCnx.get()) {
943960
waitingForZeroQueueSizeMessage = false;
944961
// Can't use break here else it may trigger a race with connection opened.
@@ -966,19 +983,42 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback) {
966983
return;
967984
}
968985

969-
Lock lock(pendingReceiveMutex_);
986+
if (messageListener_) {
987+
LOG_ERROR(getName() << "Can not receive when a listener has been set");
988+
callback(ResultInvalidConfiguration, msg);
989+
return;
990+
}
991+
992+
Lock mutexlock(mutex_, std::defer_lock);
993+
if (config_.getReceiverQueueSize() == 0) {
994+
// Lock mutex_ to prevent a race condition with handleCreateConsumer.
995+
// If handleCreateConsumer is executed after pushing the callback to pendingReceives_ and
996+
// before calling sendFlowPermitsToBroker, the result may be that a flow permit is sent twice.
997+
// Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
998+
// otherwise a deadlock will occur.
999+
mutexlock.lock();
1000+
}
1001+
1002+
Lock pendingReceiveMutexLock(pendingReceiveMutex_);
9701003
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
971-
lock.unlock();
1004+
pendingReceiveMutexLock.unlock();
1005+
if (config_.getReceiverQueueSize() == 0) {
1006+
mutexlock.unlock();
1007+
}
9721008
messageProcessed(msg);
9731009
msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
9741010
callback(ResultOk, msg);
1011+
} else if (config_.getReceiverQueueSize() == 0) {
1012+
pendingReceives_.push(callback);
1013+
// If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
1014+
// In other words, a flow permit will not be sent until setCnx(cnx) is executed in
1015+
// handleCreateConsumer.
1016+
sendFlowPermitsToBroker(getCnx().lock(), 1);
1017+
pendingReceiveMutexLock.unlock();
1018+
mutexlock.unlock();
9751019
} else {
9761020
pendingReceives_.push(callback);
977-
lock.unlock();
978-
979-
if (config_.getReceiverQueueSize() == 0) {
980-
sendFlowPermitsToBroker(getCnx().lock(), 1);
981-
}
1021+
pendingReceiveMutexLock.unlock();
9821022
}
9831023
}
9841024

tests/ZeroQueueSizeTest.cc

+100
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <mutex>
2828

2929
#include "ConsumerTest.h"
30+
#include "HttpHelper.h"
3031
#include "lib/Latch.h"
3132
#include "lib/LogUtils.h"
3233

@@ -37,6 +38,7 @@ using namespace pulsar;
3738
static int totalMessages = 10;
3839
static int globalCount = 0;
3940
static std::string lookupUrl = "pulsar://localhost:6650";
41+
static std::string adminUrl = "http://localhost:8080";
4042
static std::string contentBase = "msg-";
4143

4244
static void messageListenerFunction(Consumer consumer, const Message& msg, Latch& latch) {
@@ -287,3 +289,101 @@ TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
287289

288290
client.close();
289291
}
292+
293+
class ZeroQueueSizeTest : public ::testing::TestWithParam<bool> {};
294+
295+
TEST_P(ZeroQueueSizeTest, testReceptionAfterUnloading) {
296+
Client client(lookupUrl);
297+
auto isAsync = GetParam();
298+
std::string topicName = "zero-queue-size-reception-after-unloading";
299+
if (isAsync) {
300+
topicName += "-async";
301+
}
302+
std::string subName = "my-sub";
303+
304+
Producer producer;
305+
Result result = client.createProducer(topicName, producer);
306+
ASSERT_EQ(ResultOk, result);
307+
308+
Consumer consumer;
309+
ConsumerConfiguration consConfig;
310+
consConfig.setReceiverQueueSize(0);
311+
result = client.subscribe(topicName, subName, consConfig, consumer);
312+
ASSERT_EQ(ResultOk, result);
313+
314+
for (int i = 0; i < totalMessages / 2; i++) {
315+
std::ostringstream ss;
316+
ss << contentBase << i;
317+
Message msg = MessageBuilder().setContent(ss.str()).build();
318+
result = producer.send(msg);
319+
ASSERT_EQ(ResultOk, result);
320+
}
321+
322+
for (int i = 0; i < totalMessages / 2; i++) {
323+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
324+
std::ostringstream ss;
325+
ss << contentBase << i;
326+
if (isAsync) {
327+
Latch latch(1);
328+
consumer.receiveAsync([&consumer, &ss, &latch](Result res, const Message& receivedMsg) {
329+
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
330+
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
331+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
332+
latch.countdown();
333+
});
334+
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
335+
} else {
336+
Message receivedMsg;
337+
consumer.receive(receivedMsg);
338+
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
339+
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
340+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
341+
}
342+
}
343+
344+
// Wait for messages to be delivered while performing `receive` or `receiveAsync` in a separate thread.
345+
// At this time, the value of availablePermits should be 1.
346+
std::thread consumeThread([&consumer, &isAsync] {
347+
for (int i = totalMessages / 2; i < totalMessages; i++) {
348+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
349+
std::ostringstream ss;
350+
ss << contentBase << i;
351+
if (isAsync) {
352+
Latch latch(1);
353+
consumer.receiveAsync([&consumer, &ss, &latch](Result res, const Message& receivedMsg) {
354+
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
355+
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
356+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
357+
latch.countdown();
358+
});
359+
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
360+
} else {
361+
Message receivedMsg;
362+
consumer.receive(receivedMsg);
363+
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
364+
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
365+
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
366+
}
367+
}
368+
});
369+
std::this_thread::sleep_for(std::chrono::seconds(1));
370+
371+
int res = makePutRequest(adminUrl + "/admin/v2/persistent/public/default/" + topicName + "/unload", "");
372+
ASSERT_TRUE(res / 100 == 2) << "res: " << res;
373+
374+
for (int i = totalMessages / 2; i < totalMessages; i++) {
375+
std::ostringstream ss;
376+
ss << contentBase << i;
377+
Message msg = MessageBuilder().setContent(ss.str()).build();
378+
result = producer.send(msg);
379+
ASSERT_EQ(ResultOk, result);
380+
}
381+
382+
consumeThread.join();
383+
consumer.unsubscribe();
384+
consumer.close();
385+
producer.close();
386+
client.close();
387+
}
388+
389+
INSTANTIATE_TEST_CASE_P(Pulsar, ZeroQueueSizeTest, ::testing::Values(false, true));

0 commit comments

Comments
 (0)