Skip to content

Commit

Permalink
Leverage page in the summary cache to stream summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Dec 6, 2024
1 parent 2813bb7 commit 524f739
Show file tree
Hide file tree
Showing 20 changed files with 609 additions and 235 deletions.
6 changes: 3 additions & 3 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,11 @@ defmodule Archethic do
TransactionChain.fetch_genesis_address(address, nodes)
end

defdelegate list_transactions_summaries_from_current_slot(),
defdelegate list_transactions_summaries_from_current_slot(opts),
to: BeaconChain

defdelegate list_transactions_summaries_from_current_slot(date),
to: BeaconChain
# defdelegate list_transactions_summaries_from_current_slot(date),
# to: BeaconChain

@doc """
Check if a transaction exists at address
Expand Down
148 changes: 95 additions & 53 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ defmodule Archethic.BeaconChain do
beacon_subset: Base.encode16(subset)
)

SummaryCache.add_slot(subset, slot, node_public_key)
SummaryCache.add_slot(slot, node_public_key)

{:error, reason} ->
Logger.error("Invalid beacon slot - #{inspect(reason)}")
Expand Down Expand Up @@ -193,34 +193,53 @@ defmodule Archethic.BeaconChain do
@doc """
Get all slots of a subset from summary cache and return unique transaction summaries
"""
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn {slot, _} -> slot end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
|> Enum.map(& &1.transaction_summary)

transaction_summaries
end)
|> Stream.uniq_by(fn %TransactionSummary{address: address} -> address end)
|> Enum.to_list()
@spec get_summary_slots(page :: pos_integer() | nil, order :: :asc | :desc) ::
{list(TransactionSummary.t()), next_page :: pos_integer()}
def get_summary_slots(page \\ nil, order \\ :asc) do
summary_time = DateTime.utc_now() |> SummaryTimer.next_summary()

page =
case page do
nil ->
case order do
:asc -> 0
:desc -> SummaryCache.last_page(summary_time)
end

page ->
page
end

summaries =
summary_time
|> SummaryCache.stream_current_summaries(page)
|> Enum.uniq_by(fn %TransactionSummary{address: address} -> address end)

{summaries, SummaryCache.next_page(summary_time, page, order)}
end

@doc """
Returns the current summary's replication attestations that current node have
"""
@spec get_current_summary_replication_attestations(subset :: binary()) ::
@spec get_current_summary_replication_attestations(subsets :: list(binary())) ::
Enumerable.t() | list(ReplicationAttestation.t())
def get_current_summary_replication_attestations(subset) do
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)
def get_current_summary_replication_attestations(subsets) do
cache_replication_attestations =
DateTime.utc_now()
|> SummaryTimer.next_summary()
|> SummaryCache.stream_current_slots()
|> Stream.filter(fn {%Slot{subset: subset}, _} -> subset in subsets end)
|> Enum.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
replication_attestations
end)

SummaryCache.stream_current_slots(subset)
|> Stream.flat_map(fn {%Slot{transaction_attestations: replication_attestations}, _} ->
Task.async_stream(subsets, fn subset ->
%Slot{transaction_attestations: replication_attestations} = Subset.get_current_slot(subset)
replication_attestations
end)
|> Stream.concat(replication_attestations)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, x} -> x end)
|> Enum.concat(cache_replication_attestations)
|> ReplicationAttestation.reduce_confirmations()
end

Expand Down Expand Up @@ -354,23 +373,52 @@ defmodule Archethic.BeaconChain do
Function only used by the explorer to retrieve current slot transactions.
We only ask 3 nodes because it's OK if it's not 100% accurate.
"""
@spec list_transactions_summaries_from_current_slot(DateTime.t()) ::
list(TransactionSummary.t())
def list_transactions_summaries_from_current_slot(datetime = %DateTime{} \\ DateTime.utc_now()) do
get_next_summary_elected_subsets_by_nodes(datetime)
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summaries(node, subsets)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(fn {:ok, summaries} -> summaries end)
@spec list_transactions_summaries_from_current_slot(
opts :: [datetime: DateTime.t(), page: pos_integer()]
) ::
{list(TransactionSummary.t()), next_page :: pos_integer()}
def list_transactions_summaries_from_current_slot(opts \\ []) do
datetime = Keyword.get(opts, :datetime, DateTime.utc_now())
page = Keyword.get(opts, :page)
order = Keyword.get(opts, :order, :asc)

%{next_page: next_page, summaries: summaries} =
datetime
|> get_next_summary_elected_subsets_by_nodes()
|> Task.async_stream(
fn {node, subsets} ->
fetch_current_summaries(node, subsets, page, order)
end,
ordered: false,
max_concurrency: 256
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.map(fn {:ok, {summaries, next_page}} ->
{List.flatten(summaries), next_page}
end)
|> Enum.reduce(
%{more_in_cache?: false, summaries: [], addresses: MapSet.new()},
fn {summaries, next_page}, acc ->
acc =
unless Map.get(acc, :next_page?) do
Map.put(acc, :next_page, next_page)
end

addresses = summaries |> Enum.map(& &1.address) |> MapSet.new()

case MapSet.difference(addresses, acc.addresses) |> MapSet.size() do
0 ->
Map.update!(acc, :summaries, &(&1 ++ summaries))

_ ->
acc
|> Map.update!(:summaries, &(&1 ++ summaries))
|> Map.update!(:addresses, &MapSet.union(&1, addresses))
end
end
)

# remove duplicates & sort
|> Stream.uniq_by(& &1.address)
|> Enum.sort_by(& &1.timestamp, {:desc, DateTime})
{Enum.sort_by(summaries, & &1.timestamp, {order, DateTime}), next_page}
end

@doc """
Expand Down Expand Up @@ -423,21 +471,15 @@ defmodule Archethic.BeaconChain do
end)
end

defp fetch_current_summaries(node, subsets) do
subsets
|> Stream.chunk_every(10)
|> Task.async_stream(fn subsets ->
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets}) do
{:ok, %TransactionSummaryList{transaction_summaries: transaction_summaries}} ->
transaction_summaries
defp fetch_current_summaries(node, subsets, page, order) do
case P2P.send_message(node, %GetCurrentSummaries{subsets: subsets, page: page, order: order}) do
{:ok,
%TransactionSummaryList{transaction_summaries: transaction_summaries, next_page: next_page}} ->
{transaction_summaries, next_page}

_ ->
[]
end
end)
|> Stream.filter(&match?({:ok, _}, &1))
|> Stream.flat_map(&elem(&1, 1))
|> Enum.to_list()
_ ->
[]
end
end

defp fetch_current_summary_replication_attestations_from_node(node, subsets) do
Expand Down Expand Up @@ -531,8 +573,8 @@ defmodule Archethic.BeaconChain do
@doc """
Retrieve the network stats for a given subset from the cached slots
"""
@spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset)
@spec get_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def get_network_stats(subset, summary_time) when is_binary(subset) do
NetworkCoordinates.aggregate_network_stats(subset, summary_time)
end
end
6 changes: 3 additions & 3 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
The aggregation is using some weighted logistic regression.
"""
@spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset) when is_binary(subset) do
subset
@spec aggregate_network_stats(binary(), DateTime.t()) :: %{Crypto.key() => Slot.net_stats()}
def aggregate_network_stats(subset, summary_time = %DateTime{}) when is_binary(subset) do
summary_time
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1))
|> Stream.map(fn
Expand Down
5 changes: 3 additions & 2 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ defmodule Archethic.BeaconChain.Subset do
# Avoid to store or dispatch an empty beacon's slot
unless Slot.empty?(current_slot) do
if summary_time?(time) do
SummaryCache.add_slot(subset, current_slot, Crypto.first_node_public_key())
SummaryCache.add_slot(current_slot, Crypto.first_node_public_key())
else
next_summary_time = SummaryTimer.next_summary(time)
broadcast_beacon_slot(subset, next_summary_time, current_slot)
Expand Down Expand Up @@ -376,8 +376,9 @@ defmodule Archethic.BeaconChain.Subset do

defp handle_summary(time, subset) do
beacon_slots =
subset
time
|> SummaryCache.stream_current_slots()
|> Stream.filter(&match?({%Slot{subset: ^subset}, _}, &1))
|> Stream.map(fn {slot, _} -> slot end)

if Enum.empty?(beacon_slots) do
Expand Down
12 changes: 6 additions & 6 deletions lib/archethic/beacon_chain/subset/stats_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.get!(
{:get, summary_time},
function: fn ->
get_current_node_subsets(summary_time)
|> do_get_stats(timeout)
summary_time
|> get_current_node_subsets()
|> do_get_stats(summary_time, timeout)
end,
timeout: timeout
)
Expand Down Expand Up @@ -119,7 +120,7 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
function: fn ->
case action do
:get ->
do_get_stats(subsets, NetworkCoordinates.timeout())
do_get_stats(subsets, summary_time, NetworkCoordinates.timeout())

:fetch ->
do_fetch_stats(summary_time, NetworkCoordinates.timeout())
Expand All @@ -133,12 +134,11 @@ defmodule Archethic.BeaconChain.Subset.StatsCollector do
JobCache.stop(key)
end

defp do_get_stats(subsets, timeout) do
defp do_get_stats(subsets, summary_time, timeout) do
subsets
|> Task.async_stream(
fn subset ->
stats = BeaconChain.get_network_stats(subset)

stats = BeaconChain.get_network_stats(subset, summary_time)
{subset, stats}
end,
timeout: timeout,
Expand Down
Loading

0 comments on commit 524f739

Please sign in to comment.