@@ -104,6 +104,10 @@ pub struct MqttState {
104104 pub manual_acks : bool ,
105105 /// Map of alias_id->topic
106106 topic_alises : HashMap < u16 , Bytes > ,
107+ /// Client_topic_alias
108+ client_topic_alias : HashMap < Bytes , u16 > ,
109+ /// current topic alias
110+ current_topic_alias : u16 ,
107111 /// `topic_alias_maximum` RECEIVED via connack packet
108112 pub broker_topic_alias_max : u16 ,
109113 /// Maximum number of allowed inflight QoS1 & QoS2 requests
@@ -133,6 +137,8 @@ impl MqttState {
133137 events : VecDeque :: with_capacity ( 100 ) ,
134138 manual_acks,
135139 topic_alises : HashMap :: new ( ) ,
140+ client_topic_alias : HashMap :: new ( ) ,
141+ current_topic_alias : 0 ,
136142 // Set via CONNACK
137143 broker_topic_alias_max : 0 ,
138144 max_outgoing_inflight : max_inflight,
@@ -506,18 +512,22 @@ impl MqttState {
506512
507513 let pkid = publish. pkid ;
508514
509- if let Some ( props) = & publish. properties {
510- if let Some ( alias) = props. topic_alias {
511- if alias > self . broker_topic_alias_max {
512- // We MUST NOT send a Topic Alias that is greater than the
513- // broker's Topic Alias Maximum.
514- return Err ( StateError :: InvalidAlias {
515- alias,
516- max : self . broker_topic_alias_max ,
517- } ) ;
518- }
515+ let topic = publish. topic . clone ( ) ;
516+ if let Some ( alias) = self . client_topic_alias . get ( & topic) {
517+ // If the Topic Alias is already in use, the Client MUST use the same Topic Alias.
518+ publish
519+ . properties
520+ . get_or_insert_with ( Default :: default)
521+ . topic_alias = Some ( * alias) ;
522+ } else {
523+ self . current_topic_alias += 1 ;
524+ if self . current_topic_alias > self . broker_topic_alias_max {
525+ self . current_topic_alias = 1 ;
519526 }
520- } ;
527+
528+ self . client_topic_alias
529+ . insert ( topic, self . current_topic_alias ) ;
530+ }
521531
522532 let event = Event :: Outgoing ( Outgoing :: Publish ( pkid) ) ;
523533 self . events . push_back ( event) ;
0 commit comments