Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rumqttc: resume session only if CONNACK with session present 1 #864

Merged
merged 13 commits into from
May 21, 2024
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Resume v5 session only if broker CONNACK with Session Present 1.

### Security

Expand Down
17 changes: 12 additions & 5 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl EventLoop {
if self.network.is_none() {
let (network, connack) = time::timeout(
Duration::from_secs(self.options.connection_timeout()),
connect(&mut self.options),
connect(&mut self.pending, &mut self.options),
)
.await??;
self.network = Some(network);
Expand Down Expand Up @@ -263,15 +263,15 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(pending: &mut VecDeque<Request>, options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
// connect to the broker
let mut network = network_connect(options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(options, &mut network).await?;
let packet = mqtt_connect(pending, options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session
// republished in the next session if session is resumed
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Expand Down Expand Up @@ -385,6 +385,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
}

async fn mqtt_connect(
pending: &mut VecDeque<Request>,
options: &mut MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
Expand All @@ -406,8 +407,14 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Override local keep_alive value if set by server.
// Check if it's a new session
if !connack.session_present {
// If it's a new session, clear the pendings
pending.clear();
}
// Override local settings if set by server.
if let Some(props) = &connack.properties {
// Override local keep_alive value if set by server.
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
}
Expand Down