Skip to content

Commit c10364a

Browse files
Merge pull request #11683 from rabbitmq/mergify/bp/v3.13.x/pr-11682
Catch abrupt TCP closure when processing `queue_event` (backport #11676) (backport #11682)
2 parents 0e8ea17 + 6fb22af commit c10364a

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,15 @@ handle_cast({close_connection, Reason},
140140

141141
handle_cast(QueueEvent = {queue_event, _, _},
142142
State = #state{proc_state = PState0}) ->
143-
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
144-
{ok, PState} ->
145-
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
146-
{error, Reason, PState} ->
147-
{stop, Reason, pstate(State, PState)}
143+
try
144+
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
145+
{ok, PState} ->
146+
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
147+
{error, Reason0, PState} ->
148+
{stop, Reason0, pstate(State, PState)}
149+
end
150+
catch throw:{send_failed, Reason1} ->
151+
network_error(Reason1, State)
148152
end;
149153

150154
handle_cast({force_event_refresh, Ref}, State0) ->
@@ -328,17 +332,17 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
328332
{ok, Packet, Rest, ParseState1} ->
329333
case ProcState of
330334
connect_packet_unprocessed ->
331-
Send = fun(Data) ->
332-
case rabbit_net:send(Socket, Data) of
333-
ok ->
334-
ok;
335-
{error, Reason} ->
336-
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
337-
[Socket, Reason]),
338-
exit({send_failed, Reason})
339-
end
340-
end,
341-
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
335+
SendFun = fun(Data) ->
336+
case rabbit_net:send(Socket, Data) of
337+
ok ->
338+
ok;
339+
{error, Reason} ->
340+
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
341+
[Socket, Reason]),
342+
throw({send_failed, Reason})
343+
end
344+
end,
345+
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, SendFun) of
342346
{ok, ProcState1} ->
343347
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
344348
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
@@ -354,7 +358,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
354358
?LOG_ERROR("Rejected MQTT connection ~ts with Connect Reason Code ~p",
355359
[ConnName, ConnectReasonCode]),
356360
{stop, shutdown, {_SendWill = false, State}}
357-
catch exit:{send_failed, Reason} ->
361+
catch throw:{send_failed, Reason} ->
358362
network_error(Reason, State)
359363
end;
360364
_ ->
@@ -375,7 +379,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
375379
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
376380
{stop, {disconnect, {client_initiated, SendWill}}, ProcState1} ->
377381
{stop, normal, {SendWill, pstate(State, ProcState1)}}
378-
catch exit:{send_failed, Reason} ->
382+
catch throw:{send_failed, Reason} ->
379383
network_error(Reason, State)
380384
end
381385
end;

0 commit comments

Comments
 (0)