Skip to content

Commit bb16f24

Browse files
Fix lazy partitioned producer might send duplicated messages (#342)
Fixes #341 ### Motivation When a lazy partitioned producer sends two messages, the flow is: 1. `start` is called to grab the connection via `grab()`. 2. Generate 0 as the sequence id of the 1st message. 3. Add the 1st message into the queuea. 4. The connection is established, `msgSequenceGenerator_` is reset from 1 to 0. 5. When sending the 2nd message, 0 is also generated as the sequence id. Then two messages have the same sequence id. ### Modifications For lazy partitioned producers, if the internal producer is not started, sending the message in the callback of its future. Add `ChunkDedupTest#testLazyPartitionedProducer` to verify it since only the `tests/chunkdedup/docker-compose.yml` enables the deduplication.
1 parent a8402da commit bb16f24

File tree

5 files changed

+53
-12
lines changed

5 files changed

+53
-12
lines changed

lib/PartitionedProducerImpl.cc

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition
198198
// override
199199
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
200200
if (state_ != Ready) {
201-
callback(ResultAlreadyClosed, msg.getMessageId());
201+
if (callback) {
202+
callback(ResultAlreadyClosed, msg.getMessageId());
203+
}
202204
return;
203205
}
204206

@@ -209,7 +211,9 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
209211
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
210212
// change me: abort or notify failure in callback?
211213
// change to appropriate error if callback
212-
callback(ResultUnknownError, msg.getMessageId());
214+
if (callback) {
215+
callback(ResultUnknownError, msg.getMessageId());
216+
}
213217
return;
214218
}
215219
// find a producer for that partition, index should start from 0
@@ -223,7 +227,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
223227
producersLock.unlock();
224228

225229
// send message on that partition
226-
producer->sendAsync(msg, callback);
230+
if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
231+
producer->sendAsync(msg, std::move(callback));
232+
} else {
233+
// Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
234+
producer->getProducerCreatedFuture().addListener(
235+
[msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
236+
if (result == ResultOk) {
237+
weakProducer.lock()->sendAsync(msg, std::move(callback));
238+
} else if (callback) {
239+
callback(result, {});
240+
}
241+
});
242+
}
227243
}
228244

229245
// override

lib/ProducerImpl.cc

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,16 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
6060
userProvidedProducerName_(false),
6161
producerStr_("[" + topic() + ", " + producerName_ + "] "),
6262
producerId_(client->newProducerId()),
63-
msgSequenceGenerator_(0),
6463
batchTimer_(executor_->createDeadlineTimer()),
64+
lastSequenceIdPublished_(conf.getInitialSequenceId()),
65+
msgSequenceGenerator_(lastSequenceIdPublished_ + 1),
6566
sendTimer_(executor_->createDeadlineTimer()),
6667
dataKeyRefreshTask_(*executor_, 4 * 60 * 60 * 1000),
6768
memoryLimitController_(client->getMemoryLimitController()),
6869
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
6970
interceptors_(interceptors) {
7071
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
7172
<< " id: " << producerId_);
72-
73-
int64_t initialSequenceId = conf.getInitialSequenceId();
74-
lastSequenceIdPublished_ = initialSequenceId;
75-
msgSequenceGenerator_ = initialSequenceId + 1;
76-
7773
if (!producerName_.empty()) {
7874
userProvidedProducerName_ = true;
7975
}

lib/ProducerImpl.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#ifndef LIB_PRODUCERIMPL_H_
2020
#define LIB_PRODUCERIMPL_H_
2121

22+
#include <atomic>
2223
#include <boost/optional.hpp>
2324
#include <list>
2425
#include <memory>
@@ -103,6 +104,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
103104

104105
ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }
105106

107+
bool ready() const { return producerCreatedPromise_.isComplete(); }
108+
106109
protected:
107110
ProducerStatsBasePtr producerStatsBasePtr_;
108111

@@ -169,13 +172,13 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {
169172
bool userProvidedProducerName_;
170173
std::string producerStr_;
171174
uint64_t producerId_;
172-
int64_t msgSequenceGenerator_;
173175

174176
std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_;
175177
DeadlineTimerPtr batchTimer_;
176178
PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr);
177179

178-
volatile int64_t lastSequenceIdPublished_;
180+
std::atomic<int64_t> lastSequenceIdPublished_;
181+
std::atomic<int64_t> msgSequenceGenerator_;
179182
std::string schemaVersion_;
180183

181184
DeadlineTimerPtr sendTimer_;

tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,5 @@ add_executable(Oauth2Test oauth2/Oauth2Test.cc)
7272
target_compile_options(Oauth2Test PRIVATE -DTEST_CONF_DIR="${TEST_CONF_DIR}")
7373
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
7474

75-
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
75+
add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc HttpHelper.cc)
7676
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

tests/chunkdedup/ChunkDedupTest.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
#include <gtest/gtest.h>
2020
#include <pulsar/Client.h>
21+
#include <time.h>
2122

23+
#include "../HttpHelper.h"
2224
#include "lib/Latch.h"
2325
#include "lib/LogUtils.h"
2426

@@ -47,6 +49,30 @@ TEST(ChunkDedupTest, testSendChunks) {
4749
client.close();
4850
}
4951

52+
TEST(ChunkDedupTest, testLazyPartitionedProducer) {
53+
std::string topic = "test-lazy-partitioned-producer-" + std::to_string(time(nullptr));
54+
Client client{"pulsar://localhost:6650"};
55+
ProducerConfiguration conf;
56+
conf.setLazyStartPartitionedProducers(true);
57+
Producer producer;
58+
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
59+
60+
constexpr int numPartitions = 3;
61+
int res =
62+
makePutRequest("http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions",
63+
std::to_string(numPartitions));
64+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
65+
66+
for (int i = 0; i < 10; i++) {
67+
const auto key = std::to_string(i % numPartitions);
68+
MessageId msgId;
69+
producer.send(MessageBuilder().setPartitionKey(key).setContent("msg-" + std::to_string(i)).build(),
70+
msgId);
71+
ASSERT_TRUE(msgId.ledgerId() >= 0 && msgId.entryId() >= 0) << "i: " << i << ", msgId: " << msgId;
72+
}
73+
client.close();
74+
}
75+
5076
int main(int argc, char* argv[]) {
5177
::testing::InitGoogleTest(&argc, argv);
5278
return RUN_ALL_TESTS();

0 commit comments

Comments
 (0)