From ef0ba67444d6e92d41703af37981d075b4eb982c Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Wed, 15 Nov 2023 20:00:08 +0800 Subject: [PATCH] fix: double check before producer reconnect (#1131) Co-authored-by: gunli --- pulsar/producer_partition.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9c7c1c09d8..f076eefe56 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -424,6 +424,14 @@ func (p *partitionProducer) reconnectToBroker() { } p.log.Info("Reconnecting to broker in ", delayReconnectTime) time.Sleep(delayReconnectTime) + + // double check + if p.getProducerState() != producerReady { + // Producer is already closing + p.log.Info("producer state not ready, exit reconnect") + return + } + atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() if err == nil {