Skip to content

Commit a8402da

Browse files
Fix close() returns ResultAlreadyClosed after unsubscribe or close (#338)
Fixes #88 ### Motivation When `close` is called if the consumer has already called `unsubscribe` or `close`, it should not fail. See https://github.com/apache/pulsar/blob/428c18c8d0c3d135189920740192982e11ffb2bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1034 ### Modifications Use the same close logic with Java client. Add `testCloseAgainBeforeCloseDone` and `testCloseAfterUnsubscribe` to verify the new behaviors of `Consumer::close`.
1 parent 47fb809 commit a8402da

File tree

5 files changed

+41
-13
lines changed

5 files changed

+41
-13
lines changed

lib/ConsumerImpl.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,10 +1238,12 @@ void ConsumerImpl::disconnectConsumer() {
12381238
}
12391239

12401240
void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
1241-
auto callback = [this, originalCallback](Result result) {
1241+
auto callback = [this, originalCallback](Result result, bool alreadyClosed = false) {
12421242
shutdown();
12431243
if (result == ResultOk) {
1244-
LOG_INFO(getName() << "Closed consumer " << consumerId_);
1244+
if (!alreadyClosed) {
1245+
LOG_INFO(getName() << "Closed consumer " << consumerId_);
1246+
}
12451247
} else {
12461248
LOG_WARN(getName() << "Failed to close consumer: " << result);
12471249
}
@@ -1250,8 +1252,9 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12501252
}
12511253
};
12521254

1253-
if (state_ != Ready) {
1254-
callback(ResultAlreadyClosed);
1255+
auto state = state_.load();
1256+
if (state == Closing || state == Closed) {
1257+
callback(ResultOk, true);
12551258
return;
12561259
}
12571260

tests/BasicEndToEndTest.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ TEST(BasicEndToEndTest, testProduceConsume) {
244244
consumer.receive(receivedMsg);
245245
ASSERT_EQ(content, receivedMsg.getDataAsString());
246246
ASSERT_EQ(ResultOk, consumer.unsubscribe());
247-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
247+
ASSERT_EQ(ResultOk, consumer.close());
248248
ASSERT_EQ(ResultOk, producer.close());
249249
ASSERT_EQ(ResultOk, client.close());
250250
}
@@ -405,7 +405,7 @@ TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions) {
405405

406406
ASSERT_EQ(ResultOk, producer1.close());
407407
ASSERT_EQ(ResultOk, consumer1.close());
408-
ASSERT_EQ(ResultAlreadyClosed, consumer1.close());
408+
ASSERT_EQ(ResultOk, consumer1.close());
409409
ASSERT_EQ(ResultConsumerNotInitialized, consumer2.close());
410410
ASSERT_EQ(ResultOk, client1.close());
411411

@@ -637,7 +637,7 @@ TEST(BasicEndToEndTest, testCompressionLZ4) {
637637
ASSERT_EQ(content2, receivedMsg.getDataAsString());
638638

639639
ASSERT_EQ(ResultOk, consumer.unsubscribe());
640-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
640+
ASSERT_EQ(ResultOk, consumer.close());
641641
ASSERT_EQ(ResultOk, producer.close());
642642
ASSERT_EQ(ResultOk, client.close());
643643
}
@@ -675,7 +675,7 @@ TEST(BasicEndToEndTest, testCompressionZLib) {
675675
ASSERT_EQ(content2, receivedMsg.getDataAsString());
676676

677677
ASSERT_EQ(ResultOk, consumer.unsubscribe());
678-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
678+
ASSERT_EQ(ResultOk, consumer.close());
679679
ASSERT_EQ(ResultOk, producer.close());
680680
ASSERT_EQ(ResultOk, client.close());
681681
}
@@ -750,7 +750,7 @@ TEST(BasicEndToEndTest, testConsumerClose) {
750750
Consumer consumer;
751751
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
752752
ASSERT_EQ(consumer.close(), ResultOk);
753-
ASSERT_EQ(consumer.close(), ResultAlreadyClosed);
753+
ASSERT_EQ(consumer.close(), ResultOk);
754754
}
755755

756756
TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) {
@@ -1398,7 +1398,7 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
13981398
}
13991399

14001400
ASSERT_EQ(ResultOk, consumer.unsubscribe());
1401-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
1401+
ASSERT_EQ(ResultOk, consumer.close());
14021402
ASSERT_EQ(ResultOk, producer.close());
14031403
}
14041404
ASSERT_EQ(ResultOk, client.close());
@@ -1617,7 +1617,7 @@ TEST(BasicEndToEndTest, testSeek) {
16171617
ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
16181618
ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
16191619
ASSERT_EQ(ResultOk, consumer.unsubscribe());
1620-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
1620+
ASSERT_EQ(ResultOk, consumer.close());
16211621
ASSERT_EQ(ResultOk, producer.close());
16221622
ASSERT_EQ(ResultOk, client.close());
16231623
}

tests/ConsumerConfigurationTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ TEST(ConsumerConfigurationTest, testSubscriptionInitialPosition) {
314314
ASSERT_EQ(content1, receivedMsg.getDataAsString());
315315

316316
ASSERT_EQ(ResultOk, consumer.unsubscribe());
317-
ASSERT_EQ(ResultAlreadyClosed, consumer.close());
317+
ASSERT_EQ(ResultOk, consumer.close());
318318
ASSERT_EQ(ResultOk, producer.close());
319319
ASSERT_EQ(ResultOk, client.close());
320320
}

tests/ConsumerTest.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,4 +1405,29 @@ TEST(ConsumerTest, testNoListenerThreadBlocking) {
14051405
client.close();
14061406
}
14071407

1408+
TEST(ConsumerTest, testCloseAfterUnsubscribe) {
1409+
Client client{lookupUrl};
1410+
Consumer consumer;
1411+
ASSERT_EQ(ResultOk, client.subscribe("test-close-after-unsubscribe", "sub", consumer));
1412+
ASSERT_EQ(ResultOk, consumer.unsubscribe());
1413+
ASSERT_EQ(ResultOk, consumer.close());
1414+
}
1415+
1416+
TEST(ConsumerTest, testCloseAgainBeforeCloseDone) {
1417+
Client client{lookupUrl};
1418+
Consumer consumer;
1419+
ASSERT_EQ(ResultOk, client.subscribe("test-close-again-before-close-done", "sub", consumer));
1420+
auto done = std::make_shared<std::atomic_bool>(false);
1421+
auto result = std::make_shared<std::atomic<Result>>(ResultOk);
1422+
consumer.closeAsync([done, result](Result innerResult) {
1423+
result->store(innerResult);
1424+
done->store(true);
1425+
});
1426+
ASSERT_EQ(ResultOk, consumer.close());
1427+
ASSERT_FALSE(*done);
1428+
waitUntil(std::chrono::seconds(3), [done] { return done->load(); });
1429+
ASSERT_EQ(ResultOk, *result);
1430+
ASSERT_TRUE(*done);
1431+
}
1432+
14081433
} // namespace pulsar

tests/c/c_BasicEndToEndTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
109109
delete receive_ctx.data;
110110

111111
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
112-
ASSERT_EQ(pulsar_result_AlreadyClosed, pulsar_consumer_close(consumer));
112+
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));
113113
ASSERT_EQ(pulsar_result_Ok, pulsar_producer_close(producer));
114114
ASSERT_EQ(pulsar_result_Ok, pulsar_client_close(client));
115115

0 commit comments

Comments
 (0)