Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery from network failures #288

Closed
LukeMathWalker opened this issue Jun 16, 2020 · 11 comments
Closed

Recovery from network failures #288

LukeMathWalker opened this issue Jun 16, 2020 · 11 comments

Comments

@LukeMathWalker
Copy link
Contributor

I was browsing the crate to understand if there is any form of automatic recovery when a network failure occurs (similar to what is provided by the .NET client here).

If not, is this something you'd be interested to integrate into lapin or do you want to keep lapin as thin of a client as possible?

@LukeMathWalker
Copy link
Contributor Author

LukeMathWalker commented Jun 16, 2020

From my experiments it seems I can detect when the connection with the broker is lost because my consumers will receive a

ProtocolError(AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'") })

but I would then have to figure out how to re-establish that connection afterwards (in particular if using set_delegate instead of the iterator interface).

Another possible hook seems to be on_error on the Connection object.

@Keruspe
Copy link
Collaborator

Keruspe commented Jun 16, 2020

There is currently no auto-reconnection feature in lapin, indeed.

Splitting the AMQP stuff in its own async fn and using the on_error hook to re-spawn it is the best way to handle this atm, I think. As it's usually quite easy to do just that, and adding automatic recovery can add a lot of complexity, there hasn't been any plan to implement such a thing yet, but we could explore this idea at some point

@LukeMathWalker
Copy link
Contributor Author

Thanks a lot for the advice!
Somewhat related: what is the story when it comes to resource management/clean up of resources?

I noticed that dropping a connection does not affect the behavior of channels - they keep working fine, including their consumers.
Dropping channels instead cancels the consumers (they receive None).

When is the connection closed? If all channels on top of it are dropped? Does it always need to be closed explicitly?
Trying to understand how to implement fault-tolerance without leaking resources.

@Keruspe
Copy link
Collaborator

Keruspe commented Jun 16, 2020

Since 1.0, channels are closed once their last reference is dropped, unless they have been manually closed before.
Connection is closed once the connection and all channels have been dropped, unless it has been manually closed before.

@LukeMathWalker
Copy link
Contributor Author

Amazing, thank you 👍

@emacsist
Copy link

There is currently no auto-reconnection feature in lapin, indeed.

Splitting the AMQP stuff in its own async fn and using the on_error hook to re-spawn it is the best way to handle this atm, I think. As it's usually quite easy to do just that, and adding automatic recovery can add a lot of complexity, there hasn't been any plan to implement such a thing yet, but we could explore this idea at some point

Is there any demo ?
e.g. A publish message to rabbitmq broker method, how to recover by on_error hook ?

@Keruspe
Copy link
Collaborator

Keruspe commented Jul 21, 2020

Well, it's not trivial, and the problem is inherent to the AMQ protocol.

WRT basic_publish:

  • In "standard" mode, there is no way to know if a message was successfully published, you can only know when you finished writing all the bytes to the socket. In case of a network error, you can just intercept the error, reconnect and retry sending.
  • In "publisher-confirms" mode (enabled with confirm_select), you get an ack once the message has finished processing on the rabbitmq side. You can then track in your application a list of "on-flight" messages and drop them once you get the ack, or retry them on network failure + reconnection. But if the network fails just when the rabbitmq server is done dealing with the message, right when it tries to send you the ack... it won't try to send it again once the reconnection is done, so you'll publish it twice.

There is no proper way of "resuming" an AMQP session.

Other caveat, basic_consume:
You got a message, you finished dealing with it, and just when you want to ack it: network failure. The server requeues the message and unless you have a proper way to rollback what you've done with it, you'll apply your actions twice.
Even trickier: you get a message a start a rather long computation. Meanwhile, network is lost and you reconnect, you do other stuff. Once the computation is done, you ack => unrecoverable error. The "delivery_tag" used for ack is incremental, per channel, per connection and just gets back to 0 on reconnect. It's not some kind of uuid or such.

What most clients do is basically keep some kind of topology descriptor around, and just redeclare the queues and the consumers, leaving all the rest to you, but that can lead to very awkward situations like this and it has bitten me several times in the past, that why as of now I didn't want to implement something that people will expect to "just work" when things are really not that simple and some things just aren't possible to handle automatically.

All of these and more examples should definitely be somewhere in the documentation

@vipera
Copy link

vipera commented Jul 22, 2020

I agree that examples showing how to register an on_error hook would greatly help. I'm trying to figure out how to implement some kind of limited client recovery as described here. An opinionated way of "resuming" a session might benefit most casual users, but I agree that allowing you to do it yourself is probably the best bet.

That's what I'm trying, but I'm stumped right at the first step - on_error is supposed to call my async function to reconnect which Tokio is supposed to execute - am I supposed to start a Tokio task inside it? How do I know it's ready? A minimal working example that shows how to achieve this would help, as I'll probably end up overcomplicating it somehow.

@Keruspe
Copy link
Collaborator

Keruspe commented Jul 22, 2020

I know that people use on_error for this but haven't actually used it this way myself (I use it to just crash the program and have systemd restart it).

Here is one way to handle reconnections: https://github.com/CleverCloud/lapin/blob/lapin-1.x/examples/reconnect.rs

Sample output with example started without rabbitmq running, then starting rabbitmq, then restarting rabbitmq:

[2020-07-22T08:05:30Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:05:32Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:05:34Z INFO  reconnect] CONNECTED
[2020-07-22T08:05:34Z INFO  reconnect] Declared queue Queue { name: ShortString("hello"), message_count: 0, consumer_count: 0 }
[2020-07-22T08:05:34Z INFO  reconnect] will consume
[2020-07-22T08:06:00Z ERROR lapin::channel] Connection closed on channel 0 by 0:0 => AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'") } => CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
[2020-07-22T08:06:00Z ERROR lapin::channels] Connection error
thread '<unnamed>' panicked at 'error in consumer: ProtocolError(AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'") })', examples/reconnect.rs:57:39
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
[2020-07-22T08:06:00Z ERROR reconnect] Error: protocol error: AMQP hard error: CONNECTION-FORCED: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
[2020-07-22T08:06:02Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:04Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:06Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:08Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:10Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:12Z ERROR reconnect] Error: IO error: Connection refused (os error 111)
[2020-07-22T08:06:14Z INFO  reconnect] CONNECTED
[2020-07-22T08:06:15Z INFO  reconnect] Declared queue Queue { name: ShortString("hello"), message_count: 0, consumer_count: 0 }
[2020-07-22T08:06:15Z INFO  reconnect] will consume

@Keruspe
Copy link
Collaborator

Keruspe commented Nov 11, 2020

FYI, I just landed #311 which basically allows you to save your current topology (exchanges, queues, consumers) and load it back from a json or such (documentation incoming).

With this in place, we just need a way for a failed connection to "absorb" a new one (so that streams etc stay the same as before) to get proper automatic reconnection working.

@Keruspe
Copy link
Collaborator

Keruspe commented Nov 13, 2020

Now tracked in #312

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants