Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListenerProtocol catch errors #1370

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 133 additions & 72 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ defmodule Archethic.P2P.ListenerProtocol do
require Logger

alias Archethic.Crypto

alias Archethic.P2P
alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop

alias Archethic.TaskSupervisor

alias Archethic.Utils

@behaviour :ranch_protocol
Expand All @@ -37,81 +35,27 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info(
{_transport, socket, err},
state = %{transport: transport, ip: ip}
)
when is_atom(err) do
if node_ip?(ip) do
Logger.error("Received an error from tcp listener (ip: #{:inet.ntoa(ip)}): #{inspect(err)}")
end

transport.close(socket)
{:noreply, state}
end

def handle_info(
{_transport, socket, msg},
state = %{transport: transport}
state = %{transport: transport, ip: ip}
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(TaskSupervisor, fn ->
start_decode_time = System.monotonic_time()

%MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_public_key,
signature: signature
} = MessageEnvelop.decode(msg)

valid_signature? =
Crypto.verify?(
signature,
Message.encode(message) |> Utils.wrap_binary(),
sender_public_key
)

if valid_signature? do
:telemetry.execute(
[:archethic, :p2p, :decode_message],
%{duration: System.monotonic_time() - start_decode_time},
%{message: Message.name(message)}
)

start_processing_time = System.monotonic_time()
response = Message.process(message, sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :handle_message],
%{
duration: System.monotonic_time() - start_processing_time
},
%{message: Message.name(message)}
)

start_encode_time = System.monotonic_time()

response_signature =
response
|> Message.encode()
|> Utils.wrap_binary()
|> Crypto.sign_with_first_node_key()

encoded_response =
%MessageEnvelop{
message: response,
message_id: message_id,
sender_public_key: Crypto.first_node_public_key(),
signature: response_signature
}
|> MessageEnvelop.encode(sender_public_key)

:telemetry.execute(
[:archethic, :p2p, :encode_message],
%{duration: System.monotonic_time() - start_encode_time},
%{message: Archethic.P2P.Message.name(message)}
)

start_sending_time = System.monotonic_time()
transport.send(socket, encoded_response)

:telemetry.execute(
[:archethic, :p2p, :transport_sending_message],
%{duration: System.monotonic_time() - start_sending_time},
%{message: Archethic.P2P.Message.name(message)}
)
else
transport.close(socket)
end
handle_message(msg, transport, socket, ip)
end)

{:noreply, state}
Expand All @@ -121,4 +65,121 @@ defmodule Archethic.P2P.ListenerProtocol do
Logger.warning("Connection closed for #{:inet.ntoa(ip)}:#{port}")
{:stop, :normal, state}
end

defp handle_message(msg, transport, socket, ip) do
case decode_msg(msg) do
{:ok,
%MessageEnvelop{
message_id: message_id,
message: message,
sender_public_key: sender_pkey,
signature: signature
}} ->
valid_signature? =
Crypto.verify?(
signature,
message |> Message.encode() |> Utils.wrap_binary(),
sender_pkey
)

if valid_signature? do
message
|> process_msg(sender_pkey)
|> encode_response(message_id, sender_pkey)
|> reply(transport, socket, message)
else
if node_ip?(ip) do
Logger.error("Received a message with an invalid signature",
node: Base.encode16(sender_pkey)
)
end

transport.close(socket)
bchamagne marked this conversation as resolved.
Show resolved Hide resolved
end

{:error, reason} ->
if node_ip?(ip) do
Logger.error(reason)
end

transport.close(socket)
end
end

# msg is the bytes coming from TCP
# message is the struct
defp decode_msg(msg) do
start_decode_time = System.monotonic_time()

MessageEnvelop.decode(msg)
|> then(fn res = %MessageEnvelop{message: message} ->
:telemetry.execute(
[:archethic, :p2p, :decode_message],
%{duration: System.monotonic_time() - start_decode_time},
%{message: Message.name(message)}
)

{:ok, res}
end)
rescue
err ->
{:error, Exception.format(:error, err, __STACKTRACE__)}
end

defp process_msg(message, sender_pkey) do
start_processing_time = System.monotonic_time()

Message.process(message, sender_pkey)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :handle_message],
%{
duration: System.monotonic_time() - start_processing_time
},
%{message: Message.name(message)}
)
end)
end

defp encode_response(response, message_id, sender_pkey) do
start_encode_time = System.monotonic_time()

response_signature =
response
|> Message.encode()
|> Utils.wrap_binary()
|> Crypto.sign_with_first_node_key()

%MessageEnvelop{
message: response,
message_id: message_id,
sender_public_key: Crypto.first_node_public_key(),
signature: response_signature
}
|> MessageEnvelop.encode(sender_pkey)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :encode_message],
%{duration: System.monotonic_time() - start_encode_time},
%{message: Message.name(response)}
)
end)
end

defp reply(encoded_response, transport, socket, message) do
start_sending_time = System.monotonic_time()

transport.send(socket, encoded_response)
|> tap(fn _ ->
:telemetry.execute(
[:archethic, :p2p, :transport_sending_message],
%{duration: System.monotonic_time() - start_sending_time},
%{message: Message.name(message)}
)
end)
end

defp node_ip?(ip) do
P2P.list_nodes() |> Enum.map(& &1.ip) |> Enum.member?(ip)
end
end
Loading