Skip to content

Commit e63925a

Browse files
authored
fix(rumqttd): fix session present flag in connack (#792)
1 parent 09a6153 commit e63925a

File tree

3 files changed

+62
-39
lines changed

3 files changed

+62
-39
lines changed

rumqttd/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020
### Fixed
2121
- MQTT keep alive interval
2222
- record client id for remote link's span
23+
- session present flag in connack
2324

2425
### Security
2526

rumqttd/src/router/graveyard.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Graveyard {
2323
}
2424

2525
/// Save connection tracker
26-
pub fn save(
26+
pub fn save_state(
2727
&mut self,
2828
mut tracker: Tracker,
2929
subscriptions: HashSet<String>,
@@ -33,34 +33,43 @@ impl Graveyard {
3333
tracker.pause(PauseReason::Busy);
3434
let id = tracker.id.clone();
3535

36+
let session_state = SessionState {
37+
tracker,
38+
subscriptions,
39+
unacked_pubrels,
40+
};
41+
42+
self.connections.insert(
43+
id,
44+
SavedState {
45+
session_state: Some(session_state),
46+
metrics,
47+
},
48+
);
49+
}
50+
51+
/// Save only metrics for connection
52+
pub fn save_metrics(&mut self, id: String, metrics: ConnectionEvents) {
3653
self.connections.insert(
3754
id,
3855
SavedState {
39-
tracker,
40-
subscriptions,
56+
session_state: None,
4157
metrics,
42-
unacked_pubrels,
4358
},
4459
);
4560
}
4661
}
4762

4863
#[derive(Debug)]
4964
pub struct SavedState {
65+
pub session_state: Option<SessionState>,
66+
pub metrics: ConnectionEvents,
67+
}
68+
69+
#[derive(Debug)]
70+
pub struct SessionState {
5071
pub tracker: Tracker,
5172
pub subscriptions: HashSet<String>,
52-
pub metrics: ConnectionEvents,
5373
// used for pubrel in qos2
5474
pub unacked_pubrels: VecDeque<u16>,
5575
}
56-
57-
impl SavedState {
58-
pub fn new(client_id: String) -> SavedState {
59-
SavedState {
60-
tracker: Tracker::new(client_id),
61-
subscriptions: HashSet::new(),
62-
metrics: ConnectionEvents::default(),
63-
unacked_pubrels: VecDeque::new(),
64-
}
65-
}
66-
}

rumqttd/src/router/routing.rs

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ use crate::protocol::{
55
SubscribeReasonCode, UnsubAck, UnsubAckReason,
66
};
77
use crate::router::alertlog::alert;
8-
use crate::router::graveyard::SavedState;
98
use crate::router::scheduler::{PauseReason, Tracker};
10-
use crate::router::Forward;
9+
use crate::router::{ConnectionEvents, Forward};
1110
use crate::segments::Position;
1211
use crate::*;
1312
use flume::{bounded, Receiver, RecvError, Sender, TryRecvError};
@@ -304,23 +303,36 @@ impl Router {
304303
// Retrieve previous connection state from graveyard
305304
let saved = self.graveyard.retrieve(&client_id);
306305
let clean_session = connection.clean;
307-
let previous_session = saved.is_some();
306+
let previous_session = saved.as_ref().is_some_and(|s| s.session_state.is_some());
308307
// for qos2 pending pubrels
309308
let mut pending_acks = VecDeque::new();
309+
310310
let tracker = if !clean_session {
311-
let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s);
312-
connection.subscriptions = saved.subscriptions;
313-
connection.events = saved.metrics;
314-
// for using in acklog
315-
pending_acks = saved.unacked_pubrels.clone();
316-
outgoing.unacked_pubrels = saved.unacked_pubrels;
317-
saved.tracker
311+
// if there was some saved state, restore the metrics
312+
// and get the session's state if present
313+
let saved_state = saved.and_then(|saved| {
314+
connection.events = saved.metrics;
315+
saved.session_state
316+
});
317+
318+
// if session's state is present, restore that session
319+
// otherwise, just start new one
320+
saved_state.map_or_else(
321+
|| Tracker::new(client_id.clone()),
322+
|session_state| {
323+
connection.subscriptions = session_state.subscriptions;
324+
// for using in acklog
325+
pending_acks = session_state.unacked_pubrels.clone();
326+
outgoing.unacked_pubrels = session_state.unacked_pubrels;
327+
session_state.tracker
328+
},
329+
)
318330
} else {
319331
// Only retrieve metrics in clean session
320-
let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s);
321-
connection.events = saved.metrics;
332+
connection.events = saved.map_or_else(ConnectionEvents::default, |s| s.metrics);
322333
Tracker::new(client_id.clone())
323334
};
335+
324336
let ackslog = AckLog::new();
325337

326338
let time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
@@ -503,20 +515,17 @@ impl Router {
503515
}
504516
}
505517

506-
self.graveyard.save(
518+
self.graveyard.save_state(
507519
tracker,
508520
connection.subscriptions,
509521
connection.events,
510522
outgoing.unacked_pubrels,
511523
);
512524
} else {
525+
tracker.pause(PauseReason::Busy);
526+
let id = tracker.id.clone();
513527
// Only save metrics in clean session
514-
self.graveyard.save(
515-
Tracker::new(client_id),
516-
HashSet::new(),
517-
connection.events,
518-
VecDeque::new(),
519-
);
528+
self.graveyard.save_metrics(id, connection.events);
520529
}
521530
self.router_meters.total_connections -= 1;
522531
}
@@ -1657,10 +1666,14 @@ fn print_status(router: &mut Router, metrics: Print) {
16571666

16581667
let metrics = match metrics {
16591668
Some(v) => Some(v),
1660-
None => router
1661-
.graveyard
1662-
.retrieve(&id)
1663-
.map(|v| (v.metrics, v.tracker)),
1669+
None => router.graveyard.retrieve(&id).map(|v| {
1670+
(
1671+
v.metrics,
1672+
v.session_state
1673+
.map(|s| s.tracker)
1674+
.unwrap_or(Tracker::new(id)),
1675+
)
1676+
}),
16641677
};
16651678

16661679
println!("{metrics:#?}");

0 commit comments

Comments
 (0)