diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9c7c1c09d8..f076eefe56 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -424,6 +424,14 @@ func (p *partitionProducer) reconnectToBroker() { } p.log.Info("Reconnecting to broker in ", delayReconnectTime) time.Sleep(delayReconnectTime) + + // double check + if p.getProducerState() != producerReady { + // Producer is already closing + p.log.Info("producer state not ready, exit reconnect") + return + } + atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() if err == nil {