Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store_past_p2p_view #1660

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions lib/archethic/db/embedded_impl/p2p_view2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
defmodule Archethic.DB.EmbeddedImpl.P2PViewTwo do

Check warning on line 1 in lib/archethic/db/embedded_impl/p2p_view2.ex

View workflow job for this annotation

GitHub Actions / Build and test

Modules should have a @moduledoc tag.
defstruct [
:geo_patch,
:available?,
:avg_availability
]

@type t :: %{
geo_patch: binary(),
available?: boolean(),
avg_availability: float()
}

@archethic_db_p2pview :archethic_db_p2pview

require Logger

# TODO use GenServer pour s'assurer que les opérations de mutation sont atomiques.

def init() do
setup_ets_table()
end

defp setup_ets_table do
:ets.new(@archethic_db_p2pview, [:ordered_set, :named_table])
end

@spec set_summary(timestamp :: DateTime.t(), nodes :: list(t())) :: :ok
def set_summary(timestamp, nodes) do
write_nodes(nodes, DateTime.to_unix(timestamp))
:ok
end

@spec get_summary(timestamp :: DateTime.t()) :: list(t())
def get_summary(timestamp) do
read_nodes(DateTime.to_unix(timestamp))
end

@spec update_node(
changes :: [],
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) :: :ok
def update_node(
changes,
start_timestamp,
index_at_timestamp
) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)

stream_timestamps_from(unix_start_timestamp, :include_start_timestamp)
|> Stream.each(fn timestamp ->
index = index_at_timestamp.(timestamp)

read_nodes(timestamp)
|> update_nodes(index, changes)
|> write_nodes(timestamp)
end)
|> Stream.run()
end

@spec add_node(
node :: t(),
start_timestamp :: DateTime.t(),
index_at_timestamp :: (DateTime.t() -> integer())
) ::
:ok
def add_node(node, start_timestamp, index_at_timestamp) do
unix_start_timestamp = DateTime.to_unix(start_timestamp)

stream_timestamps_from(unix_start_timestamp, :include_start_timestamp)
|> Stream.each(fn timestamp ->
index = index_at_timestamp.(timestamp)

read_nodes(timestamp)
|> List.insert_at(index, node)
|> write_nodes(timestamp)
end)
|> Stream.run()

:ok
end

@spec update_nodes(nodes :: list(t()), index :: non_neg_integer(), changes :: []) :: list(t())
defp update_nodes(nodes, index, changes) do
List.update_at(
nodes,
index,
fn node -> update_node(node, changes) end
)
end

@spec update_node(node :: t(), changes :: []) :: t()
defp update_node(node, changes) do
%__MODULE__{
geo_patch: Keyword.get(changes, :geo_patch, node.geo_patch),
available?: Keyword.get(changes, :available?, node.available?),
avg_availability: Keyword.get(changes, :avg_availability, node.avg_availability)
}
end

# TODO decliner avec enregistrement sur fichier
@spec write_nodes(nodes :: list(t()), unix_timestamp :: integer()) :: :ok
defp write_nodes(nodes, unix_timestamp) do
:ets.insert(
@archethic_db_p2pview,
{unix_timestamp, serialize(nodes)}
)

:ok
end

@spec stream_timestamps_from(
start_unix_timestamp :: integer(),
opt :: atom()
) :: Enumerable.t()
defp stream_timestamps_from(start_unix_timestamp, opt \\ nil)

defp stream_timestamps_from(start_unix_timestamp, :include_start_timestamp) do
Stream.concat(
[start_unix_timestamp],
stream_timestamps_from(start_unix_timestamp + 1)
)
end

defp stream_timestamps_from(start_unix_timestamp, nil) do
Stream.resource(
fn -> %{start_timestamp: start_unix_timestamp, last_timestamp: nil} end,
&do_stream_timestamps_from/1,
fn _ -> nil end
)
|> Stream.map(fn %{last_timestamp: timestamp} -> timestamp end)
end

# TODO decliner avec enregistrement sur fichier
defp do_stream_timestamps_from(%{start_timestamp: start_timestamp, last_timestamp: nil}) do
case :ets.next(@archethic_db_p2pview, start_timestamp - 1) do
:"$end_of_table" ->
{:halt, nil}

timestamp ->
acc = %{start_timestamp: start_timestamp, last_timestamp: timestamp}
{[acc], acc}
end
end

# TODO decliner avec enregistrement sur fichier
defp do_stream_timestamps_from(%{
start_timestamp: start_timestamp,
last_timestamp: previous_timestamp
}) do
case(:ets.next(@archethic_db_p2pview, previous_timestamp)) do
:"$end_of_table" ->
{:halt, %{start_timestamp: start_timestamp, last_timestamp: previous_timestamp}}

next ->
acc = %{start_timestamp: start_timestamp, last_timestamp: next}
{[acc], acc}
end
end

# TODO decliner avec enregistrement sur fichier
@spec read_nodes(unix_timestamp :: integer()) :: list(t())
defp read_nodes(unix_timestamp) do
index = :ets.prev(@archethic_db_p2pview, unix_timestamp + 1)

{_, data} =
:ets.lookup(@archethic_db_p2pview, index)
|> Enum.at(0)

data
|> deserialize()
end

@spec serialize(nodes :: list(t()), acc :: binary()) :: binary()
defp serialize(nodes, acc \\ <<>>)

defp serialize([], acc) do
acc
end

defp serialize(nodes, acc) do
[
%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
| rest
] = nodes

available_bin = if available?, do: 1, else: 0
avg_availability_int = round(avg_availability * 100)

serialize(
rest,
<<geo_patch::binary-size(3), available_bin, avg_availability_int::8>> <> acc
)
end

@spec deserialize(rest :: binary(), acc :: list(t())) :: list(t())
defp deserialize(rest, acc \\ [])

defp deserialize(<<>>, acc) do
acc
end

defp deserialize(
<<geo_patch::binary-size(3), available_bin, avg_availability_int::8, rest::binary>>,
acc
) do
available? = available_bin == 1
avg_availability = avg_availability_int / 100

deserialize(rest, [
%__MODULE__{
geo_patch: geo_patch,
available?: available?,
avg_availability: avg_availability
}
| acc
])
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/mem_table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do
@doc """
List the P2P nodes
"""
# TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date
@spec list_nodes() :: list(Node.t())
def list_nodes do
:ets.foldl(
Expand Down
Loading
Loading