Skip to content

Commit 77e0c22

Browse files
authored
Fix an unbalanced release of the producer's pending semaphore (#392)
### Motivation Current code releases the producer's pending semaphore twice when batch is off and message is too big. The unbalanced release overflows the semaphore, and subsequent sends will fail with ProducerQueueIsFull. ### Modifications Remove the redundant semaphore release as the necessary release will be done in `handleFailedResult`.
1 parent c7e53ac commit 77e0c22

File tree

2 files changed

+2
-1
lines changed

2 files changed

+2
-1
lines changed

lib/ProducerImpl.cc

-1
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,6 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
630630
const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
631631
if (msgHeadersAndPayloadSize > maxMessageSize) {
632632
lock.unlock();
633-
releaseSemaphoreForSendOp(*op);
634633
LOG_WARN(getName()
635634
<< " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
636635
<< maxMessageSize << " bytes unless chunking is enabled");

tests/ProducerTest.cc

+2
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ TEST_P(ProducerTest, testMaxMessageSize) {
243243
ASSERT_EQ(ResultMessageTooBig,
244244
producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
245245

246+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
247+
246248
client.close();
247249
}
248250

0 commit comments

Comments
 (0)