Skip to content

Commit 9f96eb9

Browse files
Fix blue-green migration might be stuck due to an existing reconnection (#406)
Fixes #405 ### Motivation After triggering a blue-green migration, the socket will be disconnected and then schedule a reconnection to the blue cluster. However, the blue cluster could never respond with a response for Producer or Subscribe commands. Take producer as example, it means `connectionOpened` will not complete and `reconnectionPending_` will not become false. Then, after receiving a `CommandProducerClose` command from the blue cluster, a new reconnection will be scheduled to the green cluster but it will be skipped because `reconnectionPending_` is true, which means the previous `connectionOpened` future is not completed until the 30s timeout is reached. ``` 2024-02-26 06:09:30.251 INFO [139737465607744] HandlerBase:101 | [persistent://public/unload-test/topic-1708927732, sub, 0] Ignoring reconnection attempt since there's already a pending reconnection 2024-02-26 06:10:00.035 WARN [139737859880512] ProducerImpl:291 | [persistent://public/unload-test/topic-1708927732, cluster-a-0-0] Failed to reconnect producer: TimeOut ``` ### Modifications When receiving the `TOPIC_MIGRATED` command, cancel the pending `Producer` and `Subscribe` commands so that `connectionOpened` will fail with a retryable error. In the next time of reconnection, the green cluster will be connected. Fix the `ExtensibleLoadManagerTest` with a more strict timeout check. After this change, it will pass in about 3 seconds locally, while in CI even if it passed, it takes about 70 seconds before. Besides, fix the possible crash on macOS when closing the client, see #405 (comment)
1 parent 543e51c commit 9f96eb9

9 files changed

+60
-22
lines changed

lib/ClientConnection.cc

+12
Original file line numberDiff line numberDiff line change
@@ -1800,6 +1800,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co
18001800
if (it != producers_.end()) {
18011801
auto producer = it->second.lock();
18021802
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1803+
unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect());
18031804
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
18041805
} else {
18051806
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
@@ -1809,6 +1810,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co
18091810
if (it != consumers_.end()) {
18101811
auto consumer = it->second.lock();
18111812
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
1813+
unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect());
18121814
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
18131815
} else {
18141816
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
@@ -2027,4 +2029,14 @@ void ClientConnection::handleAckResponse(const proto::CommandAckResponse& respon
20272029
}
20282030
}
20292031

2032+
void ClientConnection::unsafeRemovePendingRequest(long requestId) {
2033+
auto it = pendingRequests_.find(requestId);
2034+
if (it != pendingRequests_.end()) {
2035+
it->second.promise.setFailed(ResultDisconnected);
2036+
ASIO_ERROR ec;
2037+
it->second.timer->cancel(ec);
2038+
pendingRequests_.erase(it);
2039+
}
2040+
}
2041+
20302042
} // namespace pulsar

lib/ClientConnection.h

+2
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
426426
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
427427
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
428428
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
429+
// This method must be called when `mutex_` is held
430+
void unsafeRemovePendingRequest(long requestId);
429431
};
430432
} // namespace pulsar
431433

lib/ConsumerImpl.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
251251
unAckedMessageTrackerPtr_->clear();
252252

253253
ClientImplPtr client = client_.lock();
254-
uint64_t requestId = client->newRequestId();
254+
long requestId = client->newRequestId();
255255
SharedBuffer cmd = Commands::newSubscribe(
256256
topic(), subscription_, consumerId_, requestId, getSubType(), getConsumerName(), subscriptionMode_,
257257
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
@@ -260,6 +260,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
260260

261261
// Keep a reference to ensure object is kept alive.
262262
auto self = get_shared_this_ptr();
263+
setFirstRequestIdAfterConnect(requestId);
263264
cnx->sendRequestWithId(cmd, requestId)
264265
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
265266
Result handleResult = handleCreateConsumer(cnx, result);
@@ -1719,6 +1720,7 @@ void ConsumerImpl::cancelTimers() noexcept {
17191720
batchReceiveTimer_->cancel(ec);
17201721
checkExpiredChunkedTimer_->cancel(ec);
17211722
unAckedMessageTrackerPtr_->stop();
1723+
consumerStatsBasePtr_->stop();
17221724
}
17231725

17241726
void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {

lib/HandlerBase.h

+9
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
100100
const std::string& topic() const { return *topic_; }
101101
const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }
102102

103+
long firstRequestIdAfterConnect() const {
104+
return firstRequestIdAfterConnect_.load(std::memory_order_acquire);
105+
}
106+
103107
private:
104108
const std::shared_ptr<std::string> topic_;
105109

@@ -140,6 +144,10 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
140144

141145
Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const;
142146

147+
void setFirstRequestIdAfterConnect(long requestId) {
148+
firstRequestIdAfterConnect_.store(requestId, std::memory_order_release);
149+
}
150+
143151
private:
144152
DeadlineTimerPtr timer_;
145153
DeadlineTimerPtr creationTimer_;
@@ -148,6 +156,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
148156
std::atomic<bool> reconnectionPending_;
149157
ClientConnectionWeakPtr connection_;
150158
std::string redirectedClusterURI_;
159+
std::atomic<long> firstRequestIdAfterConnect_{-1L};
151160

152161
friend class ClientConnection;
153162
friend class PulsarFriend;

lib/ProducerImpl.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
149149
ClientImplPtr client = client_.lock();
150150
cnx->registerProducer(producerId_, shared_from_this());
151151

152-
int requestId = client->newRequestId();
152+
long requestId = client->newRequestId();
153153

154154
SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
155155
conf_.getProperties(), conf_.getSchema(), epoch_,
@@ -159,6 +159,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
159159

160160
// Keep a reference to ensure object is kept alive.
161161
auto self = shared_from_this();
162+
setFirstRequestIdAfterConnect(requestId);
162163
cnx->sendRequestWithId(cmd, requestId)
163164
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
164165
Result handleResult = handleCreateProducer(cnx, result, responseData);

lib/stats/ConsumerStatsBase.h

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace pulsar {
2828
class ConsumerStatsBase {
2929
public:
3030
virtual void start() {}
31+
virtual void stop() {}
3132
virtual void receivedMessage(Message&, Result) = 0;
3233
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0;
3334
virtual ~ConsumerStatsBase() {}

lib/stats/ConsumerStatsImpl.h

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>
5959
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
6060
void flushAndReset(const ASIO_ERROR&);
6161
void start() override;
62+
void stop() override {
63+
ASIO_ERROR error;
64+
timer_->cancel(error);
65+
}
6266
void receivedMessage(Message&, Result) override;
6367
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
6468
virtual ~ConsumerStatsImpl();

tests/PulsarFriend.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class PulsarFriend {
169169
static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }
170170

171171
static std::string getConnectionPhysicalAddress(HandlerBase& handler) {
172-
auto cnx = handler.connection_.lock();
172+
auto cnx = handler.getCnx().lock();
173173
if (cnx) {
174174
return cnx->physicalAddress_;
175175
}

tests/extensibleLM/ExtensibleLoadManagerTest.cc

+26-19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "include/pulsar/Client.h"
2626
#include "lib/LogUtils.h"
2727
#include "lib/Semaphore.h"
28+
#include "lib/TimeUtils.h"
2829
#include "tests/HttpHelper.h"
2930
#include "tests/PulsarFriend.h"
3031

@@ -40,6 +41,9 @@ bool checkTime() {
4041
}
4142

4243
TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
44+
constexpr auto maxWaitTime = std::chrono::seconds(5);
45+
constexpr long maxWaitTimeMs = maxWaitTime.count() * 1000L;
46+
4347
const static std::string blueAdminUrl = "http://localhost:8080/";
4448
const static std::string greenAdminUrl = "http://localhost:8081/";
4549
const static std::string topicNameSuffix = std::to_string(time(NULL));
@@ -105,12 +109,13 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
105109
std::string content = std::to_string(i);
106110
const auto msg = MessageBuilder().setContent(content).build();
107111

108-
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
109-
Result sendResult = producer.send(msg);
110-
return sendResult == ResultOk;
111-
}));
112+
auto start = TimeUtils::currentTimeMillis();
113+
Result sendResult = producer.send(msg);
114+
auto elapsed = TimeUtils::currentTimeMillis() - start;
115+
LOG_INFO("produce i: " << i << " " << elapsed << " ms");
116+
ASSERT_EQ(sendResult, ResultOk);
117+
ASSERT_TRUE(elapsed < maxWaitTimeMs);
112118

113-
LOG_INFO("produced i:" << i);
114119
producedMsgs.emplace(i, i);
115120
i++;
116121
}
@@ -124,18 +129,20 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
124129
if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) {
125130
break;
126131
}
127-
Result receiveResult =
128-
consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message
129-
if (receiveResult != ResultOk) {
130-
continue;
131-
}
132+
auto start = TimeUtils::currentTimeMillis();
133+
Result receiveResult = consumer.receive(receivedMsg, maxWaitTimeMs);
134+
auto elapsed = TimeUtils::currentTimeMillis() - start;
132135
int i = std::stoi(receivedMsg.getDataAsString());
133-
LOG_INFO("received i:" << i);
134-
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
135-
Result ackResult = consumer.acknowledge(receivedMsg);
136-
return ackResult == ResultOk;
137-
}));
138-
LOG_INFO("acked i:" << i);
136+
LOG_INFO("receive i: " << i << " " << elapsed << " ms");
137+
ASSERT_EQ(receiveResult, ResultOk);
138+
ASSERT_TRUE(elapsed < maxWaitTimeMs);
139+
140+
start = TimeUtils::currentTimeMillis();
141+
Result ackResult = consumer.acknowledge(receivedMsg);
142+
elapsed = TimeUtils::currentTimeMillis() - start;
143+
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
144+
ASSERT_TRUE(elapsed < maxWaitTimeMs);
145+
ASSERT_EQ(ackResult, ResultOk);
139146
consumedMsgs.emplace(i, i);
140147
}
141148
LOG_INFO("consumer finished");
@@ -153,7 +160,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
153160
std::string destinationBroker;
154161
while (checkTime()) {
155162
// make sure producers and consumers are ready
156-
ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
163+
ASSERT_TRUE(waitUntil(maxWaitTime,
157164
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
158165

159166
std::string url =
@@ -182,7 +189,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
182189
}
183190

184191
// make sure producers and consumers are ready
185-
ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
192+
ASSERT_TRUE(waitUntil(maxWaitTime,
186193
[&] { return consumerImpl.isConnected() && producerImpl.isConnected(); }));
187194
std::string responseDataAfterUnload;
188195
ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
@@ -220,7 +227,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
220227
LOG_INFO("res:" << res);
221228
return res == 200;
222229
}));
223-
ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] {
230+
ASSERT_TRUE(waitUntil(maxWaitTime, [&] {
224231
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
225232
auto &producerImpl = PulsarFriend::getProducerImpl(producer);
226233
auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl);

0 commit comments

Comments
 (0)