From d61c99883ad329dd9071829c3e01aa2619d7cf6d Mon Sep 17 00:00:00 2001 From: bchamagne Date: Mon, 8 Jan 2024 16:58:13 +0100 Subject: [PATCH] refactor the listerner protocol to catch err and close the connection of invalid messages instead of crashing --- lib/archethic/p2p/listener_protocol.ex | 179 +++++++++++++++---------- 1 file changed, 109 insertions(+), 70 deletions(-) diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 35f1f56df7..f3dfc95b27 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -8,12 +8,9 @@ defmodule Archethic.P2P.ListenerProtocol do require Logger alias Archethic.Crypto - alias Archethic.P2P.Message alias Archethic.P2P.MessageEnvelop - alias Archethic.TaskSupervisor - alias Archethic.Utils @behaviour :ranch_protocol @@ -37,6 +34,19 @@ defmodule Archethic.P2P.ListenerProtocol do }) end + def handle_info( + {_transport, socket, err}, + state = %{transport: transport, ip: ip, port: port} + ) + when is_atom(err) do + Logger.warning( + "Received an error from tcp listener (ip: #{inspect(ip)}, port: #{port}): #{inspect(err)}" + ) + + transport.close(socket) + {:noreply, state} + end + def handle_info( {_transport, socket, msg}, state = %{transport: transport} @@ -44,73 +54,30 @@ defmodule Archethic.P2P.ListenerProtocol do :inet.setopts(socket, active: :once) Task.Supervisor.start_child(TaskSupervisor, fn -> - start_decode_time = System.monotonic_time() - - %MessageEnvelop{ - message_id: message_id, - message: message, - sender_public_key: sender_public_key, - signature: signature - } = MessageEnvelop.decode(msg) - - valid_signature? = - Crypto.verify?( - signature, - Message.encode(message) |> Utils.wrap_binary(), - sender_public_key - ) - - if valid_signature? do - :telemetry.execute( - [:archethic, :p2p, :decode_message], - %{duration: System.monotonic_time() - start_decode_time}, - %{message: Message.name(message)} - ) - - start_processing_time = System.monotonic_time() - response = Message.process(message, sender_public_key) - - :telemetry.execute( - [:archethic, :p2p, :handle_message], - %{ - duration: System.monotonic_time() - start_processing_time - }, - %{message: Message.name(message)} - ) - - start_encode_time = System.monotonic_time() - - response_signature = - response - |> Message.encode() - |> Utils.wrap_binary() - |> Crypto.sign_with_first_node_key() - - encoded_response = - %MessageEnvelop{ - message: response, - message_id: message_id, - sender_public_key: Crypto.first_node_public_key(), - signature: response_signature - } - |> MessageEnvelop.encode(sender_public_key) - - :telemetry.execute( - [:archethic, :p2p, :encode_message], - %{duration: System.monotonic_time() - start_encode_time}, - %{message: Archethic.P2P.Message.name(message)} - ) - - start_sending_time = System.monotonic_time() - transport.send(socket, encoded_response) - - :telemetry.execute( - [:archethic, :p2p, :transport_sending_message], - %{duration: System.monotonic_time() - start_sending_time}, - %{message: Archethic.P2P.Message.name(message)} - ) - else - transport.close(socket) + case do_decode_msg(msg) do + nil -> + transport.close(socket) + + %MessageEnvelop{ + message_id: message_id, + message: message, + sender_public_key: sender_pkey, + signature: signature + } -> + valid_signature? = + Crypto.verify?( + signature, + Message.encode(message) |> Utils.wrap_binary(), + sender_pkey + ) + + if valid_signature? do + do_process_msg(message, sender_pkey) + |> do_encode_response(message_id, sender_pkey) + |> do_reply(transport, socket, message) + else + transport.close(socket) + end end end) @@ -121,4 +88,76 @@ defmodule Archethic.P2P.ListenerProtocol do Logger.warning("Connection closed for #{:inet.ntoa(ip)}:#{port}") {:stop, :normal, state} end + + # msg is the bytes coming from TCP + # message is the struct + defp do_decode_msg(msg) do + start_decode_time = System.monotonic_time() + + MessageEnvelop.decode(msg) + |> tap(fn %MessageEnvelop{message: message} -> + :telemetry.execute( + [:archethic, :p2p, :decode_message], + %{duration: System.monotonic_time() - start_decode_time}, + %{message: Message.name(message)} + ) + end) + rescue + x -> + Logger.warning("Received an invalid message") + nil + end + + defp do_process_msg(message, sender_pkey) do + start_processing_time = System.monotonic_time() + + Message.process(message, sender_pkey) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :handle_message], + %{ + duration: System.monotonic_time() - start_processing_time + }, + %{message: Message.name(message)} + ) + end) + end + + defp do_encode_response(response, message_id, sender_pkey) do + start_encode_time = System.monotonic_time() + + response_signature = + response + |> Message.encode() + |> Utils.wrap_binary() + |> Crypto.sign_with_first_node_key() + + %MessageEnvelop{ + message: response, + message_id: message_id, + sender_public_key: Crypto.first_node_public_key(), + signature: response_signature + } + |> MessageEnvelop.encode(sender_pkey) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :encode_message], + %{duration: System.monotonic_time() - start_encode_time}, + %{message: Message.name(response)} + ) + end) + end + + defp do_reply(encoded_response, transport, socket, message) do + start_sending_time = System.monotonic_time() + + transport.send(socket, encoded_response) + |> tap(fn _ -> + :telemetry.execute( + [:archethic, :p2p, :transport_sending_message], + %{duration: System.monotonic_time() - start_sending_time}, + %{message: Message.name(message)} + ) + end) + end end