Skip to content

Commit ee1d7b9

Browse files
Fix hasMessageAvailable might return true after seeking to latest (#409)
* ### Motivation After a seek operation is done, the `startMessageId` will be updated until the reconnection due to the seek is done in `connectionOpened`. So before it's updated, `hasMessageAvailable` could compare with an outdated `startMessageId` and return a wrong value. ### Modifications Replace `duringSeek` with a `SeekStatus` field: - `NOT_STARTED`: initial, or a seek operation is done. `seek` could only succeed in this status. - `IN_PROGRESS`: A seek operation has started but the client does not receive the response from broker. - `COMPLETED`: The client has received the seek response but the seek future is not done. After the status becomes `COMPLETED`, if the connection is not ready, next time the connection is established, the status will change from `COMPLETED` to `NOT_STARTED` and then seek future will be completed in the internal executor. Add `testHasMessageAvailableAfterSeekToEnd` and `testSeekInProgress`.
1 parent 4360500 commit ee1d7b9

File tree

4 files changed

+139
-39
lines changed

4 files changed

+139
-39
lines changed

lib/ConsumerImpl.cc

+50-37
Original file line numberDiff line numberDiff line change
@@ -236,16 +236,14 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
236236
// sending the subscribe request.
237237
cnx->registerConsumer(consumerId_, get_shared_this_ptr());
238238

239-
if (duringSeek_) {
239+
if (duringSeek()) {
240240
ackGroupingTrackerPtr_->flushAndClean();
241241
}
242242

243243
Lock lockForMessageId(mutexForMessageId_);
244-
// Update startMessageId so that we can discard messages after delivery restarts
245-
const auto startMessageId = clearReceiveQueue();
244+
clearReceiveQueue();
246245
const auto subscribeMessageId =
247-
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
248-
startMessageId_ = startMessageId;
246+
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : boost::none;
249247
lockForMessageId.unlock();
250248

251249
unAckedMessageTrackerPtr_->clear();
@@ -1048,14 +1046,21 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
10481046
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that
10491047
* was
10501048
* not seen by the application
1049+
* `startMessageId_` is updated so that we can discard messages after delivery restarts.
10511050
*/
1052-
boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
1053-
bool expectedDuringSeek = true;
1054-
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
1055-
return seekMessageId_.get();
1051+
void ConsumerImpl::clearReceiveQueue() {
1052+
if (duringSeek()) {
1053+
startMessageId_ = seekMessageId_.get();
1054+
SeekStatus expected = SeekStatus::COMPLETED;
1055+
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
1056+
auto seekCallback = seekCallback_.release();
1057+
executor_->postWork([seekCallback] { seekCallback(ResultOk); });
1058+
}
1059+
return;
10561060
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
1057-
return startMessageId_.get();
1061+
return;
10581062
}
1063+
10591064
Message nextMessageInQueue;
10601065
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
10611066
// There was at least one message pending in the queue
@@ -1071,16 +1076,12 @@ boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
10711076
.ledgerId(nextMessageId.ledgerId())
10721077
.entryId(nextMessageId.entryId() - 1)
10731078
.build();
1074-
return previousMessageId;
1079+
startMessageId_ = previousMessageId;
10751080
} else if (lastDequedMessageId_ != MessageId::earliest()) {
10761081
// If the queue was empty we need to restart from the message just after the last one that has been
10771082
// dequeued
10781083
// in the past
1079-
return lastDequedMessageId_;
1080-
} else {
1081-
// No message was received or dequeued by this consumer. Next message would still be the
1082-
// startMessageId
1083-
return startMessageId_.get();
1084+
startMessageId_ = lastDequedMessageId_;
10841085
}
10851086
}
10861087

@@ -1500,18 +1501,15 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
15001501

15011502
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
15021503

1503-
inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) {
1504-
return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1;
1505-
}
1506-
15071504
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
1508-
const auto startMessageId = startMessageId_.get();
1509-
Lock lock(mutexForMessageId_);
1510-
const auto messageId =
1511-
(lastDequedMessageId_ == MessageId::earliest()) ? startMessageId.value() : lastDequedMessageId_;
1512-
1513-
if (messageId == MessageId::latest()) {
1514-
lock.unlock();
1505+
bool compareMarkDeletePosition;
1506+
{
1507+
std::lock_guard<std::mutex> lock{mutexForMessageId_};
1508+
compareMarkDeletePosition =
1509+
(lastDequedMessageId_ == MessageId::earliest()) &&
1510+
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
1511+
}
1512+
if (compareMarkDeletePosition) {
15151513
auto self = get_shared_this_ptr();
15161514
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
15171515
if (result != ResultOk) {
@@ -1543,16 +1541,15 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15431541
}
15441542
});
15451543
} else {
1546-
if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
1547-
lock.unlock();
1544+
if (hasMoreMessages()) {
15481545
callback(ResultOk, true);
15491546
return;
15501547
}
1551-
lock.unlock();
1552-
1553-
getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) {
1554-
callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId));
1555-
});
1548+
auto self = get_shared_this_ptr();
1549+
getLastMessageIdAsync(
1550+
[this, self, callback](Result result, const GetLastMessageIdResponse& response) {
1551+
callback(result, (result == ResultOk) && hasMoreMessages());
1552+
});
15561553
}
15571554
}
15581555

@@ -1656,9 +1653,18 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
16561653
return;
16571654
}
16581655

1656+
auto expected = SeekStatus::NOT_STARTED;
1657+
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
1658+
LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
1659+
<< static_cast<int>(expected));
1660+
callback(ResultNotAllowedError);
1661+
return;
1662+
}
1663+
16591664
const auto originalSeekMessageId = seekMessageId_.get();
16601665
seekMessageId_ = seekId;
1661-
duringSeek_ = true;
1666+
seekStatus_ = SeekStatus::IN_PROGRESS;
1667+
seekCallback_ = std::move(callback);
16621668
if (timestamp > 0) {
16631669
LOG_INFO(getName() << " Seeking subscription to " << timestamp);
16641670
} else {
@@ -1682,12 +1688,19 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
16821688
Lock lock(mutexForMessageId_);
16831689
lastDequedMessageId_ = MessageId::earliest();
16841690
lock.unlock();
1691+
if (getCnx().expired()) {
1692+
// It's during reconnection, complete the seek future after connection is established
1693+
seekStatus_ = SeekStatus::COMPLETED;
1694+
} else {
1695+
startMessageId_ = seekMessageId_.get();
1696+
seekCallback_.release()(result);
1697+
}
16851698
} else {
16861699
LOG_ERROR(getName() << "Failed to seek: " << result);
16871700
seekMessageId_ = originalSeekMessageId;
1688-
duringSeek_ = false;
1701+
seekStatus_ = SeekStatus::NOT_STARTED;
1702+
seekCallback_.release()(result);
16891703
}
1690-
callback(result);
16911704
});
16921705
}
16931706

lib/ConsumerImpl.h

+29-2
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
7575
const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
7676
const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
7777

78+
enum class SeekStatus : std::uint8_t
79+
{
80+
NOT_STARTED,
81+
IN_PROGRESS,
82+
COMPLETED
83+
};
84+
7885
class ConsumerImpl : public ConsumerImplBase {
7986
public:
8087
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
@@ -193,7 +200,7 @@ class ConsumerImpl : public ConsumerImplBase {
193200
const DeadlineTimerPtr& timer,
194201
BrokerGetLastMessageIdCallback callback);
195202

196-
boost::optional<MessageId> clearReceiveQueue();
203+
void clearReceiveQueue();
197204
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
198205
ResultCallback callback);
199206
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);
@@ -239,10 +246,13 @@ class ConsumerImpl : public ConsumerImplBase {
239246
MessageId lastDequedMessageId_{MessageId::earliest()};
240247
MessageId lastMessageIdInBroker_{MessageId::earliest()};
241248

242-
std::atomic_bool duringSeek_{false};
249+
std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
250+
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
243251
Synchronized<boost::optional<MessageId>> startMessageId_;
244252
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
245253

254+
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
255+
246256
class ChunkedMessageCtx {
247257
public:
248258
ChunkedMessageCtx() : totalChunks_(0) {}
@@ -332,6 +342,23 @@ class ConsumerImpl : public ConsumerImplBase {
332342
const proto::MessageIdData& messageIdData,
333343
const ClientConnectionPtr& cnx, MessageId& messageId);
334344

345+
bool hasMoreMessages() const {
346+
std::lock_guard<std::mutex> lock{mutexForMessageId_};
347+
if (lastMessageIdInBroker_.entryId() == -1L) {
348+
return false;
349+
}
350+
351+
const auto inclusive = config_.isStartMessageIdInclusive();
352+
if (lastDequedMessageId_ == MessageId::earliest()) {
353+
// If startMessageId_ is none, use latest so that this method will return false
354+
const auto startMessageId = startMessageId_.get().value_or(MessageId::latest());
355+
return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
356+
: (lastMessageIdInBroker_ > startMessageId);
357+
} else {
358+
return lastMessageIdInBroker_ > lastDequedMessageId_;
359+
}
360+
}
361+
335362
friend class PulsarFriend;
336363
friend class MultiTopicsConsumerImpl;
337364

lib/Synchronized.h

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ class Synchronized {
3030
return value_;
3131
}
3232

33+
T&& release() {
34+
std::lock_guard<std::mutex> lock(mutex_);
35+
return std::move(value_);
36+
}
37+
3338
Synchronized& operator=(const T& value) {
3439
std::lock_guard<std::mutex> lock(mutex_);
3540
value_ = value;

tests/ReaderTest.cc

+55
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,8 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
752752
producer.close();
753753
}
754754

755+
class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
756+
755757
TEST(ReaderSeekTest, testStartAtLatestMessageId) {
756758
Client client(serviceUrl);
757759

@@ -784,4 +786,57 @@ TEST(ReaderSeekTest, testStartAtLatestMessageId) {
784786
producer.close();
785787
}
786788

789+
TEST(ReaderTest, testSeekInProgress) {
790+
Client client(serviceUrl);
791+
const auto topic = "test-seek-in-progress-" + std::to_string(time(nullptr));
792+
Reader reader;
793+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
794+
795+
reader.seekAsync(MessageId::earliest(), [](Result) {});
796+
Promise<Result, Result> promise;
797+
reader.seekAsync(MessageId::earliest(), [promise](Result result) { promise.setValue(result); });
798+
Result result;
799+
promise.getFuture().get(result);
800+
ASSERT_EQ(result, ResultNotAllowedError);
801+
client.close();
802+
}
803+
804+
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
805+
Client client(serviceUrl);
806+
const auto topic = "test-has-message-available-after-seek-to-end-" + std::to_string(time(nullptr));
807+
Producer producer;
808+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
809+
Reader reader;
810+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
811+
812+
producer.send(MessageBuilder().setContent("msg-0").build());
813+
producer.send(MessageBuilder().setContent("msg-1").build());
814+
815+
bool hasMessageAvailable;
816+
if (GetParam()) {
817+
// Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been initialized
818+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
819+
}
820+
821+
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
822+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
823+
ASSERT_FALSE(hasMessageAvailable);
824+
825+
producer.send(MessageBuilder().setContent("msg-2").build());
826+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
827+
ASSERT_TRUE(hasMessageAvailable);
828+
829+
Message msg;
830+
ASSERT_EQ(ResultOk, reader.readNext(msg, 1000));
831+
ASSERT_EQ("msg-2", msg.getDataAsString());
832+
833+
// Test the 2nd seek
834+
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
835+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
836+
ASSERT_FALSE(hasMessageAvailable);
837+
838+
client.close();
839+
}
840+
787841
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
842+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)