Skip to content

Commit

Permalink
feat(P2PView): ✨ Store P2PView historic data.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chralu committed Feb 6, 2025
1 parent 311ea40 commit a54cf99
Show file tree
Hide file tree
Showing 3 changed files with 452 additions and 0 deletions.
224 changes: 224 additions & 0 deletions lib/archethic/db/embedded_impl/p2p_view2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do

Check warning on line 1 in lib/archethic/db/embedded_impl/p2p_view2.ex

View workflow job for this annotation

GitHub Actions / Build and test

Modules should have a @moduledoc tag.
defstruct [
:geo_patch,
:available?,
:avg_availability
]

@type t :: %{
geo_patch: binary(),
available?: boolean(),
avg_availability: float()
}

@archethic_db_p2pview :archethic_db_p2pview

require Logger

# TODO use GenServer pour s'assurer que les opérations de mutation sont atomiques.

def init() do
setup_ets_table()
end

defp setup_ets_table do
:ets.new(@archethic_db_p2pview, [:ordered_set, :named_table])
end

@spec set_summary(timestamp :: DateTime.t(), nodes :: list(t())) :: :ok
def set_summary(timestamp, nodes) do
write_nodes(nodes, DateTime.to_unix(timestamp))
:ok
end

@spec get_summary(timestamp :: DateTime.t()) :: list(t())
def get_summary(timestamp) do
read_nodes(DateTime.to_unix(timestamp))
end

@spec update_node(
changes :: [],
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def update_node(
changes,
start_timestamp,
index_at_timestamp
) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)

stream_timestamps_from(unix_start_timestamp, :include_start_timestamp)
|> Stream.each(fn timestamp ->
index = index_at_timestamp.(timestamp)

read_nodes(timestamp)
|> update_nodes(index, changes)
|> write_nodes(timestamp)
end)
|> Stream.run()
end

@spec add_node(
node :: t(),
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) ::
:ok
def add_node(node, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)

stream_timestamps_from(unix_start_timestamp, :include_start_timestamp)
|> Stream.each(fn timestamp ->
index = index_at_timestamp.(timestamp)

read_nodes(timestamp)
|> List.insert_at(index, node)
|> write_nodes(timestamp)
end)
|> Stream.run()

:ok
end

@spec update_nodes(nodes :: list(t()), index :: non_neg_integer(), changes :: []) :: list(t())
defp update_nodes(nodes, index, changes) do
List.update_at(
nodes,
index,
fn node -> update_node(node, changes) end
)
end

@spec update_node(node :: t(), changes :: []) :: t()
defp update_node(node, changes) do
%__MODULE__{
geo_patch: Keyword.get(changes, :geo_patch, node.geo_patch),
available?: Keyword.get(changes, :available?, node.available?),
avg_availability: Keyword.get(changes, :avg_availability, node.avg_availability)
}
end

# TODO decliner avec enregistrement sur fichier
@spec write_nodes(nodes :: list(t()), unix_timestamp :: integer()) :: :ok
defp write_nodes(nodes, unix_timestamp) do
:ets.insert(
@archethic_db_p2pview,
{unix_timestamp, serialize(nodes)}
)

:ok
end

@spec stream_timestamps_from(
start_unix_timestamp :: integer(),
opt :: atom()
) :: Enumerable.t()
defp stream_timestamps_from(start_unix_timestamp, opt \\ nil)

defp stream_timestamps_from(start_unix_timestamp, :include_start_timestamp) do
Stream.concat(
[start_unix_timestamp],
stream_timestamps_from(start_unix_timestamp + 1)
)
end

defp stream_timestamps_from(start_unix_timestamp, nil) do
Stream.resource(
fn -> %{start_timestamp: start_unix_timestamp, last_timestamp: nil} end,
&do_stream_timestamps_from/1,
fn _ -> nil end
)
|> Stream.map(fn %{last_timestamp: timestamp} -> timestamp end)
end

# TODO decliner avec enregistrement sur fichier
defp do_stream_timestamps_from(%{start_timestamp: start_timestamp, last_timestamp: nil}) do
case :ets.next(@archethic_db_p2pview, start_timestamp - 1) do
:"$end_of_table" ->
{:halt, nil}

timestamp ->
acc = %{start_timestamp: start_timestamp, last_timestamp: timestamp}
{[acc], acc}
end
end

# TODO decliner avec enregistrement sur fichier
defp do_stream_timestamps_from(%{
start_timestamp: start_timestamp,
last_timestamp: previous_timestamp
}) do
case(:ets.next(@archethic_db_p2pview, previous_timestamp)) do
:"$end_of_table" ->
{:halt, %{start_timestamp: start_timestamp, last_timestamp: previous_timestamp}}

next ->
acc = %{start_timestamp: start_timestamp, last_timestamp: next}
{[acc], acc}
end
end

# TODO decliner avec enregistrement sur fichier
@spec read_nodes(unix_timestamp :: integer()) :: list(t())
defp read_nodes(unix_timestamp) do
index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1)

{_, data} =
:ets.lookup(@archethic_db_p2pview, index)
|> Enum.at(0)

data
|> deserialize()
end

@spec serialize(nodes :: list(t()), acc :: binary()) :: binary()
defp serialize(nodes, acc \\ <<>>)

defp serialize([], acc) do
acc
end

defp serialize(nodes, acc) do
[
%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
| rest
] = nodes

available_bin = if available?, do: 1, else: 0
avg_availability_int = round(avg_availability * 100)

serialize(
rest,
<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>> <> acc
)
end

@spec deserialize(rest :: binary(), acc :: list(t())) :: list(t())
defp deserialize(rest, acc \\ [])

defp deserialize(<<>>, acc) do
acc
end

defp deserialize(
<<geo_patch::binary-size(3), available_bin, avg_availability_int::8, rest::binary>>,
acc
) do
available? = available_bin == 1
avg_availability = avg_availability_int / 100

deserialize(rest, [
%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
| acc
])
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do
@doc """
List the P2P nodes
"""
# TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date
@spec list_nodes() :: list(Node.t())
def list_nodes do
:ets.foldl(
Expand Down
Loading

0 comments on commit a54cf99

Please sign in to comment.