Skip to content

Commit 756ccca

Browse files
authored
feat: make checkpoint sync asynchronous (#1219)
1 parent 8fbaa9e commit 756ccca

File tree

7 files changed

+221
-140
lines changed

7 files changed

+221
-140
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
4141
[
4242
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
4343
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
44-
LambdaEthereumConsensus.Beacon.SyncBlocks,
4544
{Task.Supervisor, name: PruneStatesSupervisor},
4645
{Task.Supervisor, name: PruneBlocksSupervisor},
4746
{Task.Supervisor, name: PruneBlobsSupervisor}

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
77
require Logger
88

99
alias LambdaEthereumConsensus.ForkChoice
10-
alias LambdaEthereumConsensus.Libp2pPort
1110
alias LambdaEthereumConsensus.P2P.BlockDownloader
1211

1312
alias LambdaEthereumConsensus.Metrics
@@ -140,7 +139,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
140139
end
141140

142141
defp process_downloaded_block({:ok, [block]}) do
143-
Libp2pPort.add_block(block)
142+
add_block(block)
144143
end
145144

146145
defp process_downloaded_block({:error, reason}) do

lib/lambda_ethereum_consensus/beacon/sync_blocks.ex

Lines changed: 39 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -3,128 +3,69 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
33
Performs an optimistic block sync from the finalized checkpoint to the current slot.
44
"""
55

6-
use Task
7-
86
require Logger
97

108
alias LambdaEthereumConsensus.ForkChoice
119
alias LambdaEthereumConsensus.Libp2pPort
1210
alias LambdaEthereumConsensus.P2P.BlockDownloader
13-
alias LambdaEthereumConsensus.P2P.Gossip
1411
alias LambdaEthereumConsensus.StateTransition.Misc
15-
alias Types.SignedBeaconBlock
1612

1713
@blocks_per_chunk 16
14+
@retries 50
1815

19-
@type chunk :: %{from: Types.slot(), count: integer()}
20-
21-
def start_link(opts) do
22-
Task.start_link(__MODULE__, :run, [opts])
23-
end
16+
@doc """
17+
Calculates how which blocks need to be downloaded to be up to date., and launches the download
18+
requests. Returns the amount of blocks that need to be downloaded.
2419
25-
def run(_opts) do
20+
If N blocks should be downloaded, N/16 range requests are performed. When each of those
21+
finish, each block of those responses will be sent to libp2p port module individually using
22+
Libp2pPort.add_block/1.
23+
"""
24+
@spec run() :: non_neg_integer()
25+
def run() do
2626
# Initial sleep for faster app start
27-
Process.sleep(1000)
2827
checkpoint = ForkChoice.get_finalized_checkpoint()
2928
initial_slot = Misc.compute_start_slot_at_epoch(checkpoint.epoch) + 1
3029
last_slot = ForkChoice.get_current_chain_slot()
3130

3231
# If we're around genesis, we consider ourselves synced
33-
if last_slot > 0 do
34-
perform_sync(initial_slot, last_slot)
32+
if last_slot <= 0 do
33+
Logger.info("[Optimistic sync] At genesis. No block sync will be needed.")
34+
0
3535
else
36-
start_subscriptions()
37-
end
38-
end
39-
40-
@spec perform_sync(integer(), integer()) :: :ok
41-
def perform_sync(initial_slot, last_slot) do
42-
Enum.chunk_every(initial_slot..last_slot, @blocks_per_chunk)
43-
|> Enum.map(fn chunk ->
44-
first_slot = List.first(chunk)
45-
last_slot = List.last(chunk)
46-
count = last_slot - first_slot + 1
47-
%{from: first_slot, count: count}
48-
end)
49-
|> perform_sync()
50-
end
51-
52-
@spec perform_sync([chunk()]) :: :ok
53-
def perform_sync(chunks) do
54-
remaining = chunks |> Stream.map(fn %{count: c} -> c end) |> Enum.sum()
55-
Logger.info("[Optimistic Sync] Blocks remaining: #{remaining}")
56-
57-
results =
58-
chunks
59-
|> Task.async_stream(
60-
fn chunk -> fetch_blocks_by_slot(chunk.from, chunk.count) end,
61-
max_concurrency: 4,
62-
timeout: 20_000,
63-
on_timeout: :kill_task
36+
Logger.info(
37+
"[Optimistic sync] Performing optimistic sync between slots #{initial_slot} and #{last_slot}, for a total of #{last_slot - initial_slot + 1} slots."
6438
)
65-
|> Enum.map(fn
66-
{:ok, result} -> result
67-
{:error, error} -> {:error, error}
68-
{:exit, :timeout} -> {:error, "timeout"}
69-
end)
70-
71-
results
72-
|> Enum.flat_map(fn
73-
{:ok, blocks} -> blocks
74-
_other -> []
75-
end)
76-
|> tap(fn blocks ->
77-
Logger.info("[Optimistic Sync] Downloaded #{length(blocks)} blocks successfully.")
78-
end)
79-
|> Enum.each(&Libp2pPort.add_block/1)
80-
81-
remaining_chunks =
82-
Enum.zip(chunks, results)
83-
|> Enum.flat_map(fn
84-
{chunk, {:error, reason}} ->
85-
if not String.contains?(inspect(reason), "failed to dial") do
86-
Logger.debug(
87-
"[Optimistic Sync] Failed downloading the chunk #{inspect(chunk)}. Reason: #{inspect(reason)}"
88-
)
89-
end
9039

91-
[chunk]
92-
93-
_other ->
94-
[]
40+
initial_slot..last_slot
41+
|> Enum.chunk_every(@blocks_per_chunk)
42+
|> Enum.map(fn chunk ->
43+
first_slot = List.first(chunk)
44+
last_slot = List.last(chunk)
45+
count = last_slot - first_slot + 1
46+
47+
Logger.info(
48+
"[Optimistic sync] Sending request for slots #{first_slot} to #{last_slot} (request size = #{count})."
49+
)
50+
51+
BlockDownloader.request_blocks_by_range(
52+
first_slot,
53+
count,
54+
&on_chunk_downloaded/1,
55+
@retries
56+
)
57+
58+
count
9559
end)
96-
97-
if Enum.empty?(chunks) do
98-
Logger.info("[Optimistic Sync] Sync completed")
99-
start_subscriptions()
100-
else
101-
Process.sleep(1000)
102-
perform_sync(remaining_chunks)
60+
|> Enum.sum()
10361
end
10462
end
10563

106-
# TODO: handle subscription failures.
107-
defp start_subscriptions() do
108-
Gossip.BeaconBlock.subscribe_to_topic()
109-
Gossip.BlobSideCar.subscribe_to_topics()
110-
Gossip.OperationsCollector.subscribe_to_topics()
64+
defp on_chunk_downloaded({:ok, range, blocks}) do
65+
Libp2pPort.notify_blocks_downloaded(range, blocks)
11166
end
11267

113-
@spec fetch_blocks_by_slot(Types.slot(), non_neg_integer()) ::
114-
{:ok, [SignedBeaconBlock.t()]} | {:error, String.t()}
115-
def fetch_blocks_by_slot(from, count) do
116-
case BlockDownloader.request_blocks_by_range_sync(from, count, 0) do
117-
{:ok, blocks} ->
118-
{:ok, blocks}
119-
120-
{:error, error} ->
121-
if not String.contains?(inspect(error), "failed to dial") do
122-
Logger.debug(
123-
"Blocks download failed for slot #{from} count #{count} Error: #{inspect(error)}"
124-
)
125-
end
126-
127-
{:error, error}
128-
end
68+
defp on_chunk_downloaded({:error, range, reason}) do
69+
Libp2pPort.notify_block_download_failed(range, reason)
12970
end
13071
end

lib/lambda_ethereum_consensus/p2p/block_downloader.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
1717
# so we want to try again with a different peer
1818
@default_retries 5
1919

20+
@type range :: {Types.slot(), Types.slot()}
2021
@type download_result :: {:ok, [SignedBeaconBlock.t()]} | {:error, any()}
21-
@type on_blocks :: (download_result() -> term())
22+
@type on_blocks ::
23+
({:ok, range(), [SignedBeaconBlock.t()]} | {:error, range(), any()} -> term())
2224

2325
@doc """
2426
Requests a series of blocks in batch, and synchronously (the caller will block waiting for the
@@ -73,7 +75,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
7375
:ok <- verify_batch(blocks, slot, count) do
7476
tags = %{result: "success", type: "by_slot", reason: "success"}
7577
:telemetry.execute([:network, :request], %{blocks: count}, tags)
76-
on_blocks.({:ok, blocks})
78+
on_blocks.({:ok, {slot, slot + count - 1}, blocks})
7779
else
7880
{:error, reason} ->
7981
tags = %{type: "by_slot", reason: parse_reason(reason)}
@@ -85,7 +87,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
8587
request_blocks_by_range(slot, count, on_blocks, retries - 1)
8688
else
8789
:telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error"))
88-
on_blocks.({:error, reason})
90+
# TODO: Add block range that failed in the reason
91+
on_blocks.({:error, {slot, slot + count - 1}, reason})
8992
{:error, reason}
9093
end
9194
end

lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
5757
"/eth2/#{fork_context}/beacon_block/ssz_snappy"
5858
end
5959

60+
def topics(), do: [topic()]
61+
6062
##########################
6163
### Private functions
6264
##########################

lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
220220
end
221221
end
222222

223-
defp topics() do
223+
def topics() do
224224
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
225225

226226
topics =

0 commit comments

Comments
 (0)