Skip to content

Commit d216081

Browse files
authored
feat: add Finish stage (paradigmxyz#1279)
1 parent ce2354a commit d216081

File tree

21 files changed

+414
-134
lines changed

21 files changed

+414
-134
lines changed

bin/reth/src/args/network_args.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
33
use crate::dirs::{KnownPeersPath, PlatformPath};
44
use clap::Args;
5-
use reth_primitives::NodeRecord;
5+
use reth_discv4::bootnodes::mainnet_nodes;
6+
use reth_net_nat::NatResolver;
7+
use reth_network::NetworkConfigBuilder;
8+
use reth_primitives::{ChainSpec, NodeRecord};
9+
use reth_staged_sync::Config;
610
use std::path::PathBuf;
711

812
/// Parameters for configuring the network more granularity via CLI
@@ -22,7 +26,7 @@ pub struct NetworkArgs {
2226
#[arg(long)]
2327
pub trusted_only: bool,
2428

25-
/// Nodes to bootstrap network discovery.
29+
/// Bootnodes to connect to initially.
2630
///
2731
/// Will fall back to a network-specific default if not specified.
2832
#[arg(long, value_delimiter = ',')]
@@ -37,6 +41,23 @@ pub struct NetworkArgs {
3741
/// Do not persist peers. Cannot be used with --peers-file
3842
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
3943
pub no_persist_peers: bool,
44+
45+
/// NAT resolution method.
46+
#[arg(long, default_value = "any")]
47+
pub nat: NatResolver,
48+
}
49+
50+
impl NetworkArgs {
51+
/// Build a [`NetworkConfigBuilder`] from a [`Config`] and a [`ChainSpec`], in addition to the
52+
/// values in this option struct.
53+
pub fn network_config(&self, config: &Config, chain_spec: ChainSpec) -> NetworkConfigBuilder {
54+
let peers_file = (!self.no_persist_peers).then_some(&self.peers_file);
55+
config
56+
.network_config(self.nat, peers_file.map(|f| f.as_ref().to_path_buf()))
57+
.boot_nodes(self.bootnodes.clone().unwrap_or_else(mainnet_nodes))
58+
.chain_spec(chain_spec)
59+
.set_discovery(self.disable_discovery)
60+
}
4061
}
4162

4263
// === impl NetworkArgs ===

bin/reth/src/chain/import.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use reth_downloaders::{
1313
};
1414
use reth_interfaces::{
1515
consensus::{Consensus, ForkchoiceState},
16+
p2p::headers::client::NoopStatusUpdater,
1617
sync::SyncStateUpdater,
1718
};
1819
use reth_primitives::ChainSpec;
@@ -142,22 +143,23 @@ impl ImportCommand {
142143
let mut pipeline = Pipeline::builder()
143144
.with_sync_state_updater(file_client)
144145
.add_stages(
145-
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
146-
TotalDifficultyStage {
147-
chain_spec: self.chain.clone(),
148-
commit_threshold: config.stages.total_difficulty.commit_threshold,
149-
},
150-
),
151-
)
152-
.add_stages(
153-
OfflineStages::default()
154-
.set(SenderRecoveryStage {
155-
commit_threshold: config.stages.sender_recovery.commit_threshold,
156-
})
157-
.set(ExecutionStage {
158-
chain_spec: self.chain.clone(),
159-
commit_threshold: config.stages.execution.commit_threshold,
160-
}),
146+
DefaultStages::new(
147+
consensus.clone(),
148+
header_downloader,
149+
body_downloader,
150+
NoopStatusUpdater::default(),
151+
)
152+
.set(TotalDifficultyStage {
153+
chain_spec: self.chain.clone(),
154+
commit_threshold: config.stages.total_difficulty.commit_threshold,
155+
})
156+
.set(SenderRecoveryStage {
157+
commit_threshold: config.stages.sender_recovery.commit_threshold,
158+
})
159+
.set(ExecutionStage {
160+
chain_spec: self.chain.clone(),
161+
commit_threshold: config.stages.execution.commit_threshold,
162+
}),
161163
)
162164
.with_max_block(0)
163165
.build();

bin/reth/src/node/mod.rs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,29 @@ use eyre::Context;
1212
use fdlimit::raise_fd_limit;
1313
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
1414
use reth_consensus::beacon::BeaconConsensus;
15-
use reth_db::mdbx::{Env, WriteMap};
15+
use reth_db::{
16+
database::Database,
17+
mdbx::{Env, WriteMap},
18+
tables,
19+
transaction::DbTx,
20+
};
1621
use reth_downloaders::{
1722
bodies::bodies::BodiesDownloaderBuilder,
1823
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
1924
};
2025
use reth_interfaces::{
2126
consensus::{Consensus, ForkchoiceState},
22-
p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader},
27+
p2p::{
28+
bodies::downloader::BodyDownloader,
29+
headers::{client::StatusUpdater, downloader::HeaderDownloader},
30+
},
2331
sync::SyncStateUpdater,
2432
};
25-
use reth_net_nat::NatResolver;
2633
use reth_network::{
2734
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
2835
};
2936
use reth_network_api::NetworkInfo;
30-
use reth_primitives::{BlockNumber, ChainSpec, H256};
37+
use reth_primitives::{BlockNumber, ChainSpec, Head, H256};
3138
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
3239
use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig};
3340
use reth_staged_sync::{
@@ -40,7 +47,7 @@ use reth_staged_sync::{
4047
};
4148
use reth_stages::{
4249
prelude::*,
43-
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
50+
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
4451
};
4552
use reth_tasks::TaskExecutor;
4653
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
@@ -89,9 +96,6 @@ pub struct Command {
8996
#[clap(flatten)]
9097
network: NetworkArgs,
9198

92-
#[arg(long, default_value = "any")]
93-
nat: NatResolver,
94-
9599
/// Set the chain tip manually for testing purposes.
96100
///
97101
/// NOTE: This is a temporary flag
@@ -135,8 +139,9 @@ impl Command {
135139
self.init_trusted_nodes(&mut config);
136140

137141
info!(target: "reth::cli", "Connecting to P2P network");
138-
let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
139-
let network = self.start_network(netconf, &ctx.task_executor, ()).await?;
142+
let network_config =
143+
self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
144+
let network = self.start_network(network_config, &ctx.task_executor, ()).await?;
140145
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
141146

142147
// TODO: Use the resolved secret to spawn the Engine API server
@@ -277,22 +282,42 @@ impl Command {
277282
Ok(handle)
278283
}
279284

285+
fn fetch_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
286+
db.view(|tx| {
287+
let head = FINISH.get_progress(tx)?.unwrap_or_default();
288+
let header = tx
289+
.get::<tables::Headers>(head)?
290+
.expect("the header for the latest block is missing, database is corrupt");
291+
let total_difficulty = tx.get::<tables::HeaderTD>(head)?.expect(
292+
"the total difficulty for the latest block is missing, database is corrupt",
293+
);
294+
let hash = tx
295+
.get::<tables::CanonicalHeaders>(head)?
296+
.expect("the hash for the latest block is missing, database is corrupt");
297+
Ok::<Head, reth_interfaces::db::Error>(Head {
298+
number: head,
299+
hash,
300+
difficulty: header.difficulty,
301+
total_difficulty: total_difficulty.into(),
302+
timestamp: header.timestamp,
303+
})
304+
})?
305+
.map_err(Into::into)
306+
}
307+
280308
fn load_network_config(
281309
&self,
282310
config: &Config,
283311
db: Arc<Env<WriteMap>>,
284312
executor: TaskExecutor,
285313
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
286-
let peers_file = self.network.persistent_peers_file();
287-
config.network_config(
288-
db,
289-
self.chain.clone(),
290-
self.network.disable_discovery,
291-
self.network.bootnodes.clone(),
292-
self.nat,
293-
peers_file,
294-
Some(executor),
295-
)
314+
let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing");
315+
316+
self.network
317+
.network_config(config, self.chain.clone())
318+
.executor(Some(executor))
319+
.set_head(head)
320+
.build(Arc::new(ShareableDatabase::new(db)))
296321
}
297322

298323
async fn build_pipeline<H, B, U>(
@@ -306,7 +331,7 @@ impl Command {
306331
where
307332
H: HeaderDownloader + 'static,
308333
B: BodyDownloader + 'static,
309-
U: SyncStateUpdater,
334+
U: SyncStateUpdater + StatusUpdater + Clone + 'static,
310335
{
311336
let stage_conf = &config.stages;
312337

@@ -318,17 +343,13 @@ impl Command {
318343
}
319344

320345
let pipeline = builder
321-
.with_sync_state_updater(updater)
346+
.with_sync_state_updater(updater.clone())
322347
.add_stages(
323-
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
324-
TotalDifficultyStage {
348+
DefaultStages::new(consensus.clone(), header_downloader, body_downloader, updater)
349+
.set(TotalDifficultyStage {
325350
chain_spec: self.chain.clone(),
326351
commit_threshold: stage_conf.total_difficulty.commit_threshold,
327-
},
328-
),
329-
)
330-
.add_stages(
331-
OfflineStages::default()
352+
})
332353
.set(SenderRecoveryStage {
333354
commit_threshold: stage_conf.sender_recovery.commit_threshold,
334355
})

bin/reth/src/p2p/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use reth_interfaces::p2p::{
1010
};
1111
use reth_network::FetchClient;
1212
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader};
13+
use reth_provider::ShareableDatabase;
1314
use reth_staged_sync::{
1415
utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser},
1516
Config,
@@ -98,15 +99,10 @@ impl Command {
9899
config.peers.connect_trusted_nodes_only = self.trusted_only;
99100

100101
let network = config
101-
.network_config(
102-
noop_db,
103-
self.chain.clone(),
104-
self.disable_discovery,
105-
None,
106-
self.nat,
107-
None,
108-
None,
109-
)
102+
.network_config(self.nat, None)
103+
.set_discovery(self.disable_discovery)
104+
.chain_spec(self.chain.clone())
105+
.build(Arc::new(ShareableDatabase::new(noop_db)))
110106
.start_network()
111107
.await?;
112108

bin/reth/src/stage/mod.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ use crate::{
99
use clap::{Parser, ValueEnum};
1010
use reth_consensus::beacon::BeaconConsensus;
1111
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
12-
use reth_net_nat::NatResolver;
1312
use reth_primitives::ChainSpec;
14-
use reth_provider::Transaction;
13+
use reth_provider::{ShareableDatabase, Transaction};
1514
use reth_staged_sync::{
1615
utils::{chainspec::chain_spec_value_parser, init::init_db},
1716
Config,
@@ -85,9 +84,6 @@ pub struct Command {
8584

8685
#[clap(flatten)]
8786
network: NetworkArgs,
88-
89-
#[arg(long, default_value = "any")]
90-
nat: NatResolver,
9187
}
9288

9389
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, ValueEnum)]
@@ -137,16 +133,10 @@ impl Command {
137133
});
138134
}
139135

140-
let network = config
141-
.network_config(
142-
db.clone(),
143-
self.chain.clone(),
144-
self.network.disable_discovery,
145-
None,
146-
self.nat,
147-
None,
148-
None,
149-
)
136+
let network = self
137+
.network
138+
.network_config(&config, self.chain.clone())
139+
.build(Arc::new(ShareableDatabase::new(db.clone())))
150140
.start_network()
151141
.await?;
152142
let fetch_client = Arc::new(network.fetch_client().await?);

crates/interfaces/src/p2p/headers/client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,12 @@ pub trait StatusUpdater: Send + Sync {
4545
/// Updates the status of the p2p node
4646
fn update_status(&self, head: Head);
4747
}
48+
49+
/// A [StatusUpdater] implementation that does nothing.
50+
#[derive(Debug, Clone, Default)]
51+
#[non_exhaustive]
52+
pub struct NoopStatusUpdater;
53+
54+
impl StatusUpdater for NoopStatusUpdater {
55+
fn update_status(&self, _: Head) {}
56+
}

crates/interfaces/src/test_utils/headers.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,27 @@ impl TestConsensus {
298298
}
299299
}
300300

301-
/// Nil status updater for testing
302-
#[derive(Debug, Clone, Default)]
303-
pub struct TestStatusUpdater;
301+
/// Status updater for testing.
302+
///
303+
/// [`TestStatusUpdater::new()`] creates a new [`TestStatusUpdater`] that is **not** shareable. This
304+
/// struct wraps the sender side of a [`tokio::sync::watch`] channel. The receiving side of the
305+
/// channel (which is shareable by cloning it) is also returned.
306+
#[derive(Debug)]
307+
pub struct TestStatusUpdater(tokio::sync::watch::Sender<Head>);
308+
309+
impl TestStatusUpdater {
310+
/// Create a new test status updater and a receiver to listen to status updates on.
311+
pub fn new() -> (Self, tokio::sync::watch::Receiver<Head>) {
312+
let (tx, rx) = tokio::sync::watch::channel(Head::default());
313+
314+
(Self(tx), rx)
315+
}
316+
}
304317

305318
impl StatusUpdater for TestStatusUpdater {
306-
fn update_status(&self, _: Head) {}
319+
fn update_status(&self, head: Head) {
320+
self.0.send(head).expect("could not send status update");
321+
}
307322
}
308323

309324
#[async_trait::async_trait]

crates/net/network/src/config.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ impl NetworkConfigBuilder {
185185
self
186186
}
187187

188+
/// Sets the highest synced block.
189+
///
190+
/// This is used to construct the appropriate [`ForkFilter`] and [`Status`] message.
191+
///
192+
/// If not set, this defaults to the genesis specified by the current chain specification.
193+
pub fn set_head(mut self, head: Head) -> Self {
194+
self.head = Some(head);
195+
self
196+
}
197+
188198
/// Sets the `HelloMessage` to send when connecting to peers.
189199
///
190200
/// ```
@@ -265,6 +275,7 @@ impl NetworkConfigBuilder {
265275
}
266276

267277
/// Sets the discovery service off on true.
278+
// TODO(onbjerg): This name does not imply `true` = disable
268279
pub fn set_discovery(mut self, disable_discovery: bool) -> Self {
269280
if disable_discovery {
270281
self.disable_discovery();
@@ -309,7 +320,7 @@ impl NetworkConfigBuilder {
309320
let head = head.unwrap_or(Head {
310321
hash: chain_spec.genesis_hash(),
311322
number: 0,
312-
timestamp: 0,
323+
timestamp: chain_spec.genesis.timestamp,
313324
difficulty: chain_spec.genesis.difficulty,
314325
total_difficulty: chain_spec.genesis.difficulty,
315326
});

0 commit comments

Comments
 (0)