Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #878

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Assign random identifier to clients connecting with empty client id.
- Ability to gracefully shut down the broker.

### Changed
- Public re-export `Strategy` for shared subscriptions
Expand Down
1 change: 1 addition & 0 deletions rumqttd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tokio = { version = "1.36", features = ["rt", "time", "net", "io-util", "macros"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
bytes = { version = "1", features = ["serde"] }
ctrlc = "3.4"
flume = { version = "0.11.0", default-features = false, features = ["async"]}
slab = "0.4.9"
thiserror = "1.0.57"
Expand Down
10 changes: 8 additions & 2 deletions rumqttd/examples/external_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ fn main() {
// true
// });

let mut broker = Broker::new(config);
let _guard = Broker::new(config).start().unwrap().drop_guard();

broker.start().unwrap();
let (tx, rx) = flume::bounded::<()>(1);
ctrlc::set_handler(move || {
let _ = tx.send(());
})
.expect("Error setting Ctrl-C handler");

rx.recv().expect("Could not receive Ctrl-C signal");
}

async fn auth(_client_id: String, _username: String, _password: String) -> bool {
Expand Down
36 changes: 36 additions & 0 deletions rumqttd/examples/graceful_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use rumqttd::{Broker, Config};
use tracing::Level;

use core::time::Duration;
use std::thread;

fn main() {
let builder = tracing_subscriber::fmt()
.pretty()
.with_max_level(Level::DEBUG)
.with_line_number(false)
.with_file(false)
.with_thread_ids(false)
.with_thread_names(false);

builder
.try_init()
.expect("initialized subscriber succesfully");

// As examples are compiled as seperate binary so this config is current path dependent. Run it
// from root of this crate
let config = config::Config::builder()
.add_source(config::File::with_name("rumqttd.toml"))
.build()
.unwrap();

let config: Config = config.try_deserialize().unwrap();

dbg!(&config);

let handler = Broker::new(config).start().unwrap();

thread::sleep(Duration::from_secs(1));
handler.shutdown();
thread::sleep(Duration::from_secs(1));
}
8 changes: 2 additions & 6 deletions rumqttd/examples/meters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn main() {

dbg!(&config);

let mut broker = Broker::new(config);
let broker = Broker::new(config);
let meters = broker.meters().unwrap();

let (mut link_tx, mut link_rx) = broker.link("consumer").unwrap();
Expand Down Expand Up @@ -59,11 +59,7 @@ fn main() {
});
}

thread::spawn(move || {
if let Err(e) = broker.start() {
println!("Broker stopped: {e}");
}
});
let _guard = broker.start().unwrap().drop_guard();
thread::sleep(Duration::from_secs(2));

loop {
Expand Down
8 changes: 2 additions & 6 deletions rumqttd/examples/singlenode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use rumqttd::{Broker, Config, Notification};

use std::thread;

fn main() {
let builder = tracing_subscriber::fmt()
.pretty()
Expand All @@ -25,11 +23,9 @@ fn main() {

dbg!(&config);

let mut broker = Broker::new(config);
let broker = Broker::new(config);
let (mut link_tx, mut link_rx) = broker.link("singlenode").unwrap();
thread::spawn(move || {
broker.start().unwrap();
});
let _guard = broker.start().unwrap().drop_guard();

link_tx.subscribe("#").unwrap();

Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use link::local;
pub use link::meters;
pub use router::{Alert, IncomingMeter, Meter, Notification, OutgoingMeter};
use segments::Storage;
pub use server::Broker;
pub use server::{Broker, BrokerHandler, ShutdownDropGuard};

pub use self::router::shared_subs::Strategy;

Expand Down
6 changes: 6 additions & 0 deletions rumqttd/src/link/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{io, net::AddrParseError, time::Duration};

use tokio::{
net::TcpStream,
sync::watch,
time::{sleep, sleep_until, Instant},
};

Expand Down Expand Up @@ -48,6 +49,7 @@ pub async fn start<P>(
config: BridgeConfig,
router_tx: Sender<(ConnectionId, Event)>,
protocol: P,
mut shutdown_rx: watch::Receiver<()>,
) -> Result<(), BridgeError>
where
P: Protocol + Clone + Send + 'static,
Expand Down Expand Up @@ -154,6 +156,10 @@ where
// resetting timeout because tokio::select! consumes the old timeout future
timeout = sleep_until(ping_time + Duration::from_secs(config.ping_delay));
}
_ = shutdown_rx.changed() => {
debug!("Shutting down bridge");
break 'outer Ok(());
}
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use axum::Json;
use axum::{routing::get, Router};
use flume::Sender;
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::info;
use tokio::{net::TcpListener, sync::watch};
use tracing::{debug, info};

#[derive(Debug)]
pub struct ConsoleLink {
Expand Down Expand Up @@ -39,7 +39,7 @@ impl ConsoleLink {
}

#[tracing::instrument]
pub async fn start(console: Arc<ConsoleLink>) {
pub async fn start(console: Arc<ConsoleLink>, mut shutdown_rx: watch::Receiver<()>) {
let listener = TcpListener::bind(console.config.listen.clone())
.await
.unwrap();
Expand All @@ -56,7 +56,13 @@ pub async fn start(console: Arc<ConsoleLink>) {
.route("/logs", post(logs))
.with_state(console);

axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(async move {
debug!("Shutting down console");
let _ = shutdown_rx.changed().await;
})
.await
.unwrap();
}

async fn root(State(console): State<Arc<ConsoleLink>>) -> impl IntoResponse {
Expand Down
8 changes: 7 additions & 1 deletion rumqttd/src/link/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::{router::Event, MetricType};
use crate::{ConnectionId, MetricSettings};
use flume::{SendError, Sender};
use tokio::select;
use tracing::error;
use tokio::sync::watch;
use tracing::{debug, error};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -18,6 +19,7 @@ pub enum Error {
pub async fn start(
config: HashMap<MetricType, MetricSettings>,
router_tx: Sender<(ConnectionId, Event)>,
mut shutdown_rx: watch::Receiver<()>,
) {
let span = tracing::info_span!("metrics_timer");
let _guard = span.enter();
Expand All @@ -42,6 +44,10 @@ pub async fn start(
error!("Failed to push alerts: {e}");
}
}
_ = shutdown_rx.changed() => {
debug!("Shutting down metrics timer");
break;
}
}
}
}
11 changes: 8 additions & 3 deletions rumqttd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,15 @@ fn main() {

validate_config(&configs);

// println!("{:#?}", configs);
let _guard = Broker::new(configs).start().unwrap().drop_guard();

let mut broker = Broker::new(configs);
broker.start().unwrap();
let (tx, rx) = flume::bounded::<()>(1);
ctrlc::set_handler(move || {
let _ = tx.send(());
})
.expect("Error setting Ctrl-C handler");

rx.recv().expect("Could not receive Ctrl-C signal");
}

// Do any extra validation that needs to be done before starting the broker here.
Expand Down
Loading