Skip to content

Commit

Permalink
refactor the listerner protocol to catch err and close the connection…
Browse files Browse the repository at this point in the history
… of invalid messages instead of crashing
  • Loading branch information
bchamagne committed Jan 8, 2024
1 parent 01acedb commit d61c998
Showing 1 changed file with 109 additions and 70 deletions.
179 changes: 109 additions & 70 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ defmodule Archethic.P2P.ListenerProtocol do
require Logger

alias Archethic.Crypto

alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop

alias Archethic.TaskSupervisor

alias Archethic.Utils

@behaviour :ranch_protocol
Expand All @@ -37,80 +34,50 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info(
{_transport, socket, err},
state = %{transport: transport, ip: ip, port: port}
)
when is_atom(err) do
Logger.warning(
"Received an error from tcp listener (ip: #{inspect(ip)}, port: #{port}): #{inspect(err)}"
)

transport.close(socket)
{:noreply, state}
end

def handle_info(
{_transport, socket, msg},
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(TaskSupervisor, fn ->
start_decode_time = System.monotonic_time()

%MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_public_key,
signature: signature
} = MessageEnvelop.decode(msg)

valid_signature? =
Crypto.verify?(
signature,
Message.encode(message) |> Utils.wrap_binary(),
sender_public_key
)

if valid_signature? do
:telemetry.execute(
[:archethic, :p2p, :decode_message],
%{duration: System.monotonic_time() - start_decode_time},
%{message: Message.name(message)}
)

start_processing_time = System.monotonic_time()
response = Message.process(message, sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :handle_message],
%{
duration: System.monotonic_time() - start_processing_time
},
%{message: Message.name(message)}
)

start_encode_time = System.monotonic_time()

response_signature =
response
|> Message.encode()
|> Utils.wrap_binary()
|> Crypto.sign_with_first_node_key()

encoded_response =
%MessageEnvelop{
message: response,
message_id: message_id,
sender_public_key: Crypto.first_node_public_key(),
signature: response_signature
}
|> MessageEnvelop.encode(sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :encode_message],
%{duration: System.monotonic_time() - start_encode_time},
%{message: Archethic.P2P.Message.name(message)}
)

start_sending_time = System.monotonic_time()
transport.send(socket, encoded_response)

:telemetry.execute(
[:archethic, :p2p, :transport_sending_message],
%{duration: System.monotonic_time() - start_sending_time},
%{message: Archethic.P2P.Message.name(message)}
)
else
transport.close(socket)
case do_decode_msg(msg) do
nil ->
transport.close(socket)

%MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_pkey,
signature: signature
} ->
valid_signature? =
Crypto.verify?(
signature,
Message.encode(message) |> Utils.wrap_binary(),
sender_pkey
)

if valid_signature? do
do_process_msg(message, sender_pkey)
|> do_encode_response(message_id, sender_pkey)
|> do_reply(transport, socket, message)
else
transport.close(socket)
end
end
end)

Expand All @@ -121,4 +88,76 @@ defmodule Archethic.P2P.ListenerProtocol do
Logger.warning("Connection closed for #{:inet.ntoa(ip)}:#{port}")
{:stop, :normal, state}
end

# msg is the bytes coming from TCP
# message is the struct
defp do_decode_msg(msg) do
start_decode_time = System.monotonic_time()

MessageEnvelop.decode(msg)
|> tap(fn %MessageEnvelop{message: message} ->
:telemetry.execute(
[:archethic, :p2p, :decode_message],
%{duration: System.monotonic_time() - start_decode_time},
%{message: Message.name(message)}
)
end)
rescue
x ->

Check warning on line 106 in lib/archethic/p2p/listener_protocol.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "x" is unused (if the variable is not meant to be used, prefix it with an underscore)
Logger.warning("Received an invalid message")
nil
end

defp do_process_msg(message, sender_pkey) do
start_processing_time = System.monotonic_time()

Message.process(message, sender_pkey)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :handle_message],
%{
duration: System.monotonic_time() - start_processing_time
},
%{message: Message.name(message)}
)
end)
end

defp do_encode_response(response, message_id, sender_pkey) do
start_encode_time = System.monotonic_time()

response_signature =
response
|> Message.encode()
|> Utils.wrap_binary()
|> Crypto.sign_with_first_node_key()

%MessageEnvelop{
message: response,
message_id: message_id,
sender_public_key: Crypto.first_node_public_key(),
signature: response_signature
}
|> MessageEnvelop.encode(sender_pkey)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :encode_message],
%{duration: System.monotonic_time() - start_encode_time},
%{message: Message.name(response)}
)
end)
end

defp do_reply(encoded_response, transport, socket, message) do
start_sending_time = System.monotonic_time()

transport.send(socket, encoded_response)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :transport_sending_message],
%{duration: System.monotonic_time() - start_sending_time},
%{message: Message.name(message)}
)
end)
end
end

0 comments on commit d61c998

Please sign in to comment.