@@ -255,39 +255,24 @@ process_request(?PUBLISH,
255255 packet_id = PacketId },
256256 payload = Payload },
257257 State0 = # state {unacked_client_pubs = U ,
258- cfg = # cfg {retainer_pid = RPid ,
259- proto_ver = ProtoVer }}) ->
258+ cfg = # cfg {proto_ver = ProtoVer }}) ->
260259 EffectiveQos = maybe_downgrade_qos (Qos ),
261260 rabbit_global_counters :messages_received (ProtoVer , 1 ),
262261 State = maybe_increment_publisher (State0 ),
263- Publish = fun () ->
264- Msg = # mqtt_msg {retain = Retain ,
265- qos = EffectiveQos ,
266- topic = Topic ,
267- dup = Dup ,
268- packet_id = PacketId ,
269- payload = Payload },
270- case publish_to_queues (Msg , State ) of
271- {ok , _ } = Ok ->
272- case Retain of
273- false ->
274- ok ;
275- true ->
276- hand_off_to_retainer (RPid , Topic , Msg )
277- end ,
278- Ok ;
279- Error ->
280- Error
281- end
282- end ,
262+ Msg = # mqtt_msg {retain = Retain ,
263+ qos = EffectiveQos ,
264+ topic = Topic ,
265+ dup = Dup ,
266+ packet_id = PacketId ,
267+ payload = Payload },
283268 case EffectiveQos of
284269 ? QOS_0 ->
285- publish_to_queues_with_checks (Topic , Publish , State );
270+ publish_to_queues_with_checks (Msg , State );
286271 ? QOS_1 ->
287272 rabbit_global_counters :messages_received_confirm (ProtoVer , 1 ),
288273 case rabbit_mqtt_confirms :contains (PacketId , U ) of
289274 false ->
290- publish_to_queues_with_checks (Topic , Publish , State );
275+ publish_to_queues_with_checks (Msg , State );
291276 true ->
292277 % % Client re-sent this PUBLISH packet.
293278 % % We already sent this message to target queues awaiting confirmations.
@@ -1226,25 +1211,16 @@ terminate(SendWill, ConnName, ProtoFamily,
12261211 maybe_decrement_publisher (State ),
12271212 maybe_delete_mqtt_qos0_queue (State ).
12281213
1229- maybe_send_will (
1230- true , ConnStr ,
1231- # state {cfg = # cfg {retainer_pid = RPid ,
1232- will_msg = WillMsg = # mqtt_msg {retain = Retain ,
1233- topic = Topic }}
1234- } = State ) ->
1235- ? LOG_DEBUG (" sending MQTT will message to topic ~s on connection ~s " ,
1236- [Topic , ConnStr ]),
1237- case check_topic_access (Topic , write , State ) of
1238- ok ->
1239- _ = publish_to_queues (WillMsg , State ),
1240- case Retain of
1241- false ->
1242- ok ;
1243- true ->
1244- hand_off_to_retainer (RPid , Topic , WillMsg )
1245- end ;
1246- {error , access_refused = Reason } ->
1247- ? LOG_ERROR (" failed to send will message: ~p " , [Reason ])
1214+ -spec maybe_send_will (boolean (), binary (), state ()) -> ok .
1215+ maybe_send_will (true , ConnStr ,
1216+ State = # state {cfg = # cfg {will_msg = WillMsg = # mqtt_msg {topic = Topic }}}) ->
1217+ case publish_to_queues_with_checks (WillMsg , State ) of
1218+ {ok , _ } ->
1219+ ? LOG_DEBUG (" sent MQTT will message to topic ~s on connection ~s " ,
1220+ [Topic , ConnStr ]);
1221+ {error , Reason , _ } ->
1222+ ? LOG_DEBUG (" failed to send MQTT will message to topic ~s on connection ~s : ~p " ,
1223+ [Topic , ConnStr , Reason ])
12481224 end ;
12491225maybe_send_will (_ , _ , _ ) ->
12501226 ok .
@@ -1546,17 +1522,31 @@ trace_tap_out(Msg0 = {?QUEUE_TYPE_QOS_0, _, _, _, _},
15461522 trace_tap_out (Msg , State )
15471523 end .
15481524
1525+ -spec publish_to_queues_with_checks (mqtt_msg (), state ()) ->
1526+ {ok , state ()} | {error , any (), state ()}.
15491527publish_to_queues_with_checks (
1550- TopicName , PublishFun ,
1551- # state {cfg = # cfg {exchange = Exchange },
1552- auth_state = # auth_state {user = User ,
1553- authz_ctx = AuthzCtx }
1554- } = State ) ->
1528+ Msg = # mqtt_msg {topic = Topic ,
1529+ retain = Retain },
1530+ State = # state {cfg = # cfg {exchange = Exchange ,
1531+ retainer_pid = RPid },
1532+ auth_state = # auth_state {user = User ,
1533+ authz_ctx = AuthzCtx }}) ->
15551534 case check_resource_access (User , Exchange , write , AuthzCtx ) of
15561535 ok ->
1557- case check_topic_access (TopicName , write , State ) of
1536+ case check_topic_access (Topic , write , State ) of
15581537 ok ->
1559- PublishFun ();
1538+ case publish_to_queues (Msg , State ) of
1539+ {ok , _ } = Ok ->
1540+ case Retain of
1541+ false ->
1542+ ok ;
1543+ true ->
1544+ hand_off_to_retainer (RPid , Topic , Msg )
1545+ end ,
1546+ Ok ;
1547+ Error ->
1548+ Error
1549+ end ;
15601550 {error , access_refused } ->
15611551 {error , unauthorized , State }
15621552 end ;
0 commit comments