Skip to content

Commit

Permalink
reconnect on queue close
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed Mar 2, 2024
1 parent ae26d8a commit 2b17d99
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions packages/relayer/pkg/queue/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ func (r *RabbitMQ) Ack(ctx context.Context, msg queue.Message) error {

err := rmqMsg.Ack(false)

slog.Info("attempted acknowledge rabbitmq message")

if err != nil {
slog.Error("error acknowledging rabbitmq message", "err", err.Error())
return err
Expand Down Expand Up @@ -214,6 +212,12 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close connection")
}

r.Close(ctx)

if err := r.connect(); err != nil {
return err
}

return queue.ErrClosed
case err := <-r.chErrCh:
if err != nil {
Expand All @@ -222,6 +226,12 @@ func (r *RabbitMQ) Notify(ctx context.Context, wg *sync.WaitGroup) error {
slog.Error("rabbitmq notify close channel")
}

r.Close(ctx)

if err := r.connect(); err != nil {
return err
}

return queue.ErrClosed
case returnMsg := <-r.notifyReturnCh:
slog.Error("rabbitmq notify return", "id", returnMsg.MessageId, "err", returnMsg.ReplyText)
Expand Down

0 comments on commit 2b17d99

Please sign in to comment.