Skip to content

Comments

[Issue 1128][Producer] Regard ProducerBlockedQuotaExceededException as a retryable exception for reconnecting#1457

Open
geniusjoe wants to merge 1 commit intoapache:masterfrom
geniusjoe:dev/ProducerBlockedQuotaExceededException
Open

[Issue 1128][Producer] Regard ProducerBlockedQuotaExceededException as a retryable exception for reconnecting#1457
geniusjoe wants to merge 1 commit intoapache:masterfrom
geniusjoe:dev/ProducerBlockedQuotaExceededException

Conversation

@geniusjoe
Copy link
Contributor

@geniusjoe geniusjoe commented Jan 20, 2026

Fixes #1134
Master Issue: #1128

Motivation

Currently, the Go client handles connections in two ways:

  1. If the current connection is an initialization connection, meaning when the producer is initialized for the first time, the grabCnx()method will not trigger retry logic. If an error occurs, it will be directly returned to the caller.
  2. If the current connection is an already active connection, the runEventsLoop() method will detect the disconnection and call the reconnectToBroker() method to re-establish the connection. During reconnection, the method identifies whether the current exception is a retryable error. If it is a retryable error, the internal.Retry() inside the reconnectToBroker() method will retry indefinitely based on the backoff strategy. If it is a non-retryable error, such as the current errMsgProducerBlockedQuotaExceededException, the partition producer will be closed, causing any following messages sent to that partition to result in a SendTimeoutError.

Since the backlogQuotaExceedException can be resolved by modifying the TTL or increasing consumption speed, it is likely to recover after some retry attempts. I believe the exception handling for errMsgProducerBlockedQuotaExceededException in the reconnectToBroker() method should align as closely as possible with the Java implementation. Java’s current retry logic for connections is as follows:

  1. If an error is received after sending a message or a heartbeat times out, the callback function handleSendError() is triggered to close the connection.
  2. When the connection is closed, the connectionClosed method is called, which in turn invokes the grabCnx() method, and subsequently grabCnx() calls the connectionOpened() method to reconnect:
// Schedule a reconnection task
state.client.timer().newTimeout(timeout -> {
    log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
    grabCnx();  // Re-establish the connection
}, delayMs, TimeUnit.MILLISECONDS);
  1. If the connection still fails in the connectionOpened() method, the client will execute different logic based on the current exception. For the ProducerBlockedQuotaExceededException exception, the Java client does not treat it as an isUnrecoverableError. Instead, it first cleans up the pending messages and then attempts to reconnect.

Modifications

  1. To maintain consistency with the Java client, I suggest that when errMsgProducerBlockedQuotaExceededException is encountered in reconnectToBroker(), only failPendingMessages() should be executed, but the connection should not be closed. Specifically, the code should be modified as follows:
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
    p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
    p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
    // Do not close the connection here
    // return struct{}{}, err
}
  1. Update the golangci-lint version in the Makefile from v1.61.0 to v1.64.2, which already meets the minimum requirement for Golang 1.24.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
TestProducerReconnectWhenBacklogQuotaExceed

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@geniusjoe geniusjoe force-pushed the dev/ProducerBlockedQuotaExceededException branch 2 times, most recently from 4de6a9f to 2c6d942 Compare January 20, 2026 15:29
@geniusjoe geniusjoe force-pushed the dev/ProducerBlockedQuotaExceededException branch from 2c6d942 to c5c3ffd Compare January 29, 2026 15:33
@RobertIndie RobertIndie requested a review from Copilot January 30, 2026 07:46
@RobertIndie RobertIndie added this to the v0.19.0 milestone Jan 30, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request modifies the producer's reconnection behavior when encountering a ProducerBlockedQuotaExceededException. The change aligns the Go client with the Java client by treating this exception as retryable, allowing the producer to continue reconnecting after failing pending messages rather than closing the producer entirely.

Changes:

  • Modified producer_partition.go to remove early return when ProducerBlockedQuotaExceededException is encountered, allowing reconnection to continue
  • Added test TestProducerReconnectWhenBacklogQuotaExceed to verify the reconnection behavior
  • Updated golangci-lint version from v1.61.0 to v1.64.2

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
pulsar/producer_partition.go Removed early return for ProducerBlockedQuotaExceededException to allow reconnection retry
pulsar/producer_test.go Added test and custom backoff policy implementation to verify quota exceeded reconnection behavior
Makefile Updated golangci-lint version to v1.64.2

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL comments from copilot. Thanks.

@geniusjoe geniusjoe force-pushed the dev/ProducerBlockedQuotaExceededException branch 2 times, most recently from de206e1 to 70d6b0b Compare January 30, 2026 13:05
@RobertIndie RobertIndie requested a review from Copilot February 6, 2026 09:40
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +3060 to +3063
// Step 4: Send 512KB messages and monitor statistics
// Limit to 10 iterations to avoid infinite loop in test
isReachMaxBackoff := false
for i := 0; i < 10; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need 10 iterations? This seems to be consistently reproducible. Perhaps we don’t need the for loop and time.Sleep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobertIndie
The broker updates the backlog quota based on metrics calculations. Since metrics are collected approximately every 30 seconds, sending a single message larger than 10KB will not immediately trigger backlog quota throttling. Therefore, a for loop is needed to send multiple messages repeatedly, waiting for the backlog to reach the quota limit.

@geniusjoe geniusjoe force-pushed the dev/ProducerBlockedQuotaExceededException branch from 70d6b0b to 1c5c317 Compare February 6, 2026 10:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants