From a54cf99adc09a58fa544c4e2d1ca2588ed8241c9 Mon Sep 17 00:00:00 2001 From: Chralu Date: Thu, 6 Feb 2025 12:27:33 +0100 Subject: [PATCH] feat(P2PView): :sparkles: Store P2PView historic data. --- lib/archethic/db/embedded_impl/p2p_view2.ex | 224 +++++++++++++++++ lib/archethic/p2p/mem_table.ex | 1 + .../db/embedded_impl/p2p_view_test.exs | 227 ++++++++++++++++++ 3 files changed, 452 insertions(+) create mode 100644 lib/archethic/db/embedded_impl/p2p_view2.ex create mode 100644 test/archethic/db/embedded_impl/p2p_view_test.exs diff --git a/lib/archethic/db/embedded_impl/p2p_view2.ex b/lib/archethic/db/embedded_impl/p2p_view2.ex new file mode 100644 index 000000000..47db835bc --- /dev/null +++ b/lib/archethic/db/embedded_impl/p2p_view2.ex @@ -0,0 +1,224 @@ +defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do + defstruct [ + :geo_patch, + :available?, + :avg_availability + ] + + @type t :: %{ + geo_patch: binary(), + available?: boolean(), + avg_availability: float() + } + + @archethic_db_p2pview :archethic_db_p2pview + + require Logger + + # TODO use GenServer pour s'assurer que les opérations de mutation sont atomiques. + + def init() do + setup_ets_table() + end + + defp setup_ets_table do + :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table]) + end + + @spec set_summary(timestamp :: DateTime.t(), nodes :: list(t())) :: :ok + def set_summary(timestamp, nodes) do + write_nodes(nodes, DateTime.to_unix(timestamp)) + :ok + end + + @spec get_summary(timestamp :: DateTime.t()) :: list(t()) + def get_summary(timestamp) do + read_nodes(DateTime.to_unix(timestamp)) + end + + @spec update_node( + changes :: [], + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + def update_node( + changes, + start_timestamp, + index_at_timestamp + ) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + + stream_timestamps_from(unix_start_timestamp, :include_start_timestamp) + |> Stream.each(fn timestamp -> + index = index_at_timestamp.(timestamp) + + read_nodes(timestamp) + |> update_nodes(index, changes) + |> write_nodes(timestamp) + end) + |> Stream.run() + end + + @spec add_node( + node :: t(), + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: + :ok + def add_node(node, start_timestamp, index_at_timestamp) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + + stream_timestamps_from(unix_start_timestamp, :include_start_timestamp) + |> Stream.each(fn timestamp -> + index = index_at_timestamp.(timestamp) + + read_nodes(timestamp) + |> List.insert_at(index, node) + |> write_nodes(timestamp) + end) + |> Stream.run() + + :ok + end + + @spec update_nodes(nodes :: list(t()), index :: non_neg_integer(), changes :: []) :: list(t()) + defp update_nodes(nodes, index, changes) do + List.update_at( + nodes, + index, + fn node -> update_node(node, changes) end + ) + end + + @spec update_node(node :: t(), changes :: []) :: t() + defp update_node(node, changes) do + %__MODULE__{ + geo_patch: Keyword.get(changes, :geo_patch, node.geo_patch), + available?: Keyword.get(changes, :available?, node.available?), + avg_availability: Keyword.get(changes, :avg_availability, node.avg_availability) + } + end + + # TODO decliner avec enregistrement sur fichier + @spec write_nodes(nodes :: list(t()), unix_timestamp :: integer()) :: :ok + defp write_nodes(nodes, unix_timestamp) do + :ets.insert( + @archethic_db_p2pview, + {unix_timestamp, serialize(nodes)} + ) + + :ok + end + + @spec stream_timestamps_from( + start_unix_timestamp :: integer(), + opt :: atom() + ) :: Enumerable.t() + defp stream_timestamps_from(start_unix_timestamp, opt \\ nil) + + defp stream_timestamps_from(start_unix_timestamp, :include_start_timestamp) do + Stream.concat( + [start_unix_timestamp], + stream_timestamps_from(start_unix_timestamp + 1) + ) + end + + defp stream_timestamps_from(start_unix_timestamp, nil) do + Stream.resource( + fn -> %{start_timestamp: start_unix_timestamp, last_timestamp: nil} end, + &do_stream_timestamps_from/1, + fn _ -> nil end + ) + |> Stream.map(fn %{last_timestamp: timestamp} -> timestamp end) + end + + # TODO decliner avec enregistrement sur fichier + defp do_stream_timestamps_from(%{start_timestamp: start_timestamp, last_timestamp: nil}) do + case :ets.next(@archethic_db_p2pview, start_timestamp - 1) do + :"$end_of_table" -> + {:halt, nil} + + timestamp -> + acc = %{start_timestamp: start_timestamp, last_timestamp: timestamp} + {[acc], acc} + end + end + + # TODO decliner avec enregistrement sur fichier + defp do_stream_timestamps_from(%{ + start_timestamp: start_timestamp, + last_timestamp: previous_timestamp + }) do + case(:ets.next(@archethic_db_p2pview, previous_timestamp)) do + :"$end_of_table" -> + {:halt, %{start_timestamp: start_timestamp, last_timestamp: previous_timestamp}} + + next -> + acc = %{start_timestamp: start_timestamp, last_timestamp: next} + {[acc], acc} + end + end + + # TODO decliner avec enregistrement sur fichier + @spec read_nodes(unix_timestamp :: integer()) :: list(t()) + defp read_nodes(unix_timestamp) do + index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1) + + {_, data} = + :ets.lookup(@archethic_db_p2pview, index) + |> Enum.at(0) + + data + |> deserialize() + end + + @spec serialize(nodes :: list(t()), acc :: binary()) :: binary() + defp serialize(nodes, acc \\ <<>>) + + defp serialize([], acc) do + acc + end + + defp serialize(nodes, acc) do + [ + %__MODULE__{ + geo_patch: geo_patch, + available?: available?, + avg_availability: avg_availability + } + | rest + ] = nodes + + available_bin = if available?, do: 1, else: 0 + avg_availability_int = round(avg_availability * 100) + + serialize( + rest, + <> <> acc + ) + end + + @spec deserialize(rest :: binary(), acc :: list(t())) :: list(t()) + defp deserialize(rest, acc \\ []) + + defp deserialize(<<>>, acc) do + acc + end + + defp deserialize( + <>, + acc + ) do + available? = available_bin == 1 + avg_availability = avg_availability_int / 100 + + deserialize(rest, [ + %__MODULE__{ + geo_patch: geo_patch, + available?: available?, + avg_availability: avg_availability + } + | acc + ]) + end +end diff --git a/lib/archethic/p2p/mem_table.ex b/lib/archethic/p2p/mem_table.ex index 4b61706b7..cea91d3ec 100644 --- a/lib/archethic/p2p/mem_table.ex +++ b/lib/archethic/p2p/mem_table.ex @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do @doc """ List the P2P nodes """ + # TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date @spec list_nodes() :: list(Node.t()) def list_nodes do :ets.foldl( diff --git a/test/archethic/db/embedded_impl/p2p_view_test.exs b/test/archethic/db/embedded_impl/p2p_view_test.exs new file mode 100644 index 000000000..ae4ac92e3 --- /dev/null +++ b/test/archethic/db/embedded_impl/p2p_view_test.exs @@ -0,0 +1,227 @@ +defmodule Archethic.DB.EmbeddedImpl.P2PViewTest do + use ArchethicCase + + alias Archethic.DB.EmbeddedImpl.P2PViewTwo + + # import ArchethicCase + + # Doit stocker sur disque les années précédentes + + setup do + P2PViewTwo.init() + + :ok + end + + @doc """ + Creates an P2PView node object. + """ + @spec init_node(id :: non_neg_integer()) :: P2PViewTwo.t() + defp init_node(id) do + %P2PViewTwo{ + geo_patch: "AAA", + available?: true, + avg_availability: id / 10 + } + end + + @doc """ + Initializes P2PView with nodes. + """ + defp given_nodes(summaries) do + summaries + |> Enum.with_index() + |> Enum.each(fn + {nil, i} -> + nil + + {nodes, i} -> + P2PViewTwo.set_summary( + DateTime.from_unix!(i), + nodes + ) + end) + end + + @doc """ + Gets P2PView nodes from unix timestamp. + """ + def get_summary(timestamp) do + P2PViewTwo.get_summary(DateTime.from_unix!(timestamp)) + end + + describe "set_summary/1" do + test "should create summary for a timestamp" do + nodes_view = + 1..10 + |> Enum.map(fn i -> + init_node(i) + end) + + P2PViewTwo.set_summary( + DateTime.truncate(DateTime.utc_now(), :second), + nodes_view + ) + + assert nodes_view == P2PViewTwo.get_summary(DateTime.utc_now()) + end + end + + describe "add_node/3" do + test "should update first record and the following ones" do + node1 = init_node(1) + node2 = init_node(2) + node3 = init_node(3) + + # Given + # - one node on timestamps 0, 1 + # - two nodes on timestamps 3, 4 + given_nodes([ + [node1], + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When adding a new node from timestamp 1 + assert :ok == + P2PViewTwo.add_node( + node3, + DateTime.from_unix!(1), + fn + 1 -> 1 + _ -> 2 + end + ) + + assert [node1] == get_summary(0) + assert [node1, node3] == get_summary(1) + assert [node1, node2, node3] == get_summary(3) + assert [node1, node2, node3] == get_summary(4) + end + + test "should create first record and update the following ones" do + node1 = init_node(1) + node2 = init_node(2) + node3 = init_node(3) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 2, 3 + given_nodes([ + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When adding a new node from timestamp 1 + assert :ok == + P2PViewTwo.add_node( + node3, + DateTime.from_unix!(1), + fn + 1 -> 0 + _ -> 1 + end + ) + + assert [node1] == get_summary(0) + assert [node3, node1] == get_summary(1) + assert [node1, node3, node2] == get_summary(2) + assert [node1, node3, node2] == get_summary(3) + end + end + + describe "update_node/2" do + test "should update first record and the following ones" do + node1 = init_node(1) + node2 = init_node(2) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 1, 3, 5 + given_nodes([ + [node1], + nil, + [node1, node2], + [node1, node2] + ]) + + # When updating node 2 from timestamp 3 + assert :ok == + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(3), + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == get_summary(0) + assert [node1, updated_node_2] == get_summary(3) + assert [node1, updated_node_2] == get_summary(5) + end + end + + test "should create first record and update the following ones" do + node1 = init_node(1) + node2 = init_node(2) + + # Given + # - one node on timestamp 0 + # - two nodes on timestamps 1, 3 + given_nodes([ + [node1], + [node1, node2], + nil, + [node1, node2] + ]) + + # When updating node 2 from timestamp 2 + assert :ok == + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(2), + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == get_summary(0) + assert [node1, node2] == get_summary(1) + assert [node1, updated_node_2] == get_summary(2) + assert [node1, updated_node_2] == get_summary(3) + end + + test "should update nodes properties until another mutation is met" do + node1 = init_node(1) + + # Given node1 on timestamps 0, 2, 3 + given_nodes([ + [node1], + [node1], + [node1] + ]) + + # Given a mutation on timestamp 1 + P2PViewTwo.update_node( + [avg_availability: 0.5], + DateTime.from_unix!(1), + fn _ -> 1 end + ) + + # When updating node 1 from timestamp 0 + assert :ok == + P2PViewTwo.update_node( + [geo_patch: "BBB", avg_availability: 0.3], + DateTime.from_unix!(0), + fn _ -> 1 end + ) + + assert [%{node1 | avg_availability: 0.3, geo_patch: "BBB"}] == get_summary(0) + assert [%{node1 | avg_availability: 0.5, geo_patch: "BBB"}] == get_summary(1) + assert [%{node1 | avg_availability: 0.5, geo_patch: "BBB"}] == get_summary(2) + end +end