Skip to content

Commit aeff955

Browse files
Fix startMessageInclusive does not work if the 1st message is a chunked message (#462)
1 parent 6f7c37a commit aeff955

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

lib/ConsumerImpl.cc

+25-2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ DECLARE_LOG_OBJECT()
5959
using std::chrono::milliseconds;
6060
using std::chrono::seconds;
6161

62+
static boost::optional<MessageId> getStartMessageId(const boost::optional<MessageId>& startMessageId,
63+
bool inclusive) {
64+
if (!inclusive || !startMessageId) {
65+
return startMessageId;
66+
}
67+
// The default ledger id and entry id of a chunked message refer the fields of the last chunk. When the
68+
// start message id is inclusive, we need to start from the first chunk.
69+
auto chunkMsgIdImpl =
70+
dynamic_cast<const ChunkMessageIdImpl*>(Commands::getMessageIdImpl(startMessageId.value()).get());
71+
if (chunkMsgIdImpl) {
72+
return boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
73+
}
74+
return startMessageId;
75+
}
76+
6277
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
6378
const std::string& subscriptionName, const ConsumerConfiguration& conf,
6479
bool isPersistent, const ConsumerInterceptorsPtr& interceptors,
@@ -91,7 +106,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
91106
messageListenerRunning_(!conf.isStartPaused()),
92107
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this, conf)),
93108
readCompacted_(conf.isReadCompacted()),
94-
startMessageId_(startMessageId),
109+
startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
95110
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
96111
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
97112
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -469,7 +484,15 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
469484

470485
auto& chunkedMsgCtx = it->second;
471486
if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
472-
if (it == chunkedMessageCache_.end()) {
487+
auto startMessageId = startMessageId_.get().value_or(MessageId::earliest());
488+
if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() &&
489+
startMessageId.entryId() == messageId.entryId()) {
490+
// When the start message id is not inclusive, the last chunk of the previous chunked message will
491+
// be delivered, which is expected and we only need to filter it out.
492+
chunkedMessageCache_.remove(uuid);
493+
LOG_INFO("Filtered the chunked message before the start message id (uuid: "
494+
<< uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
495+
} else if (it == chunkedMessageCache_.end()) {
473496
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
474497
<< ", messageId: " << messageId << ")");
475498
} else {

tests/ReaderTest.cc

+29
Original file line numberDiff line numberDiff line change
@@ -888,5 +888,34 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
888888
}
889889
}
890890

891+
TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
892+
const auto topic = "test-seek-inclusive-chunk-message-" + std::to_string(time(nullptr));
893+
894+
Producer producer;
895+
ProducerConfiguration producerConf;
896+
producerConf.setBatchingEnabled(false);
897+
producerConf.setChunkingEnabled(true);
898+
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
899+
900+
std::string largeValue(1024 * 1024 * 6, 'a');
901+
MessageId firstMsgId;
902+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeValue).build(), firstMsgId));
903+
MessageId secondMsgId;
904+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeValue).build(), secondMsgId));
905+
906+
auto assertStartMessageId = [&](bool inclusive, MessageId expectedMsgId) {
907+
Reader reader;
908+
ReaderConfiguration readerConf;
909+
readerConf.setStartMessageIdInclusive(inclusive);
910+
ASSERT_EQ(ResultOk, client.createReader(topic, firstMsgId, readerConf, reader));
911+
Message msg;
912+
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
913+
ASSERT_EQ(expectedMsgId, msg.getMessageId());
914+
ASSERT_EQ(ResultOk, reader.close());
915+
};
916+
assertStartMessageId(true, firstMsgId);
917+
assertStartMessageId(false, secondMsgId);
918+
}
919+
891920
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
892921
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)