Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 8f4d1b3

Browse files
authored
Network sync refactoring (part 5) (#11825)
* Make `chain_sync` an explicit networking parameter instead of offering factory method * Derive `Copy` on `SyncMode` and remove cloning
1 parent 3e0554a commit 8f4d1b3

File tree

5 files changed

+69
-84
lines changed

5 files changed

+69
-84
lines changed

client/network/src/config.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,8 @@ where
9696
/// valid.
9797
pub import_queue: Box<dyn ImportQueue<B>>,
9898

99-
/// Factory function that creates a new instance of chain sync.
100-
pub create_chain_sync: Box<
101-
dyn FnOnce(
102-
sc_network_common::sync::SyncMode,
103-
Arc<Client>,
104-
Option<Arc<dyn WarpSyncProvider<B>>>,
105-
) -> crate::error::Result<Box<dyn ChainSync<B>>>,
106-
>,
99+
/// Instance of chain sync implementation.
100+
pub chain_sync: Box<dyn ChainSync<B>>,
107101

108102
/// Registry for recording prometheus metrics to.
109103
pub metrics_registry: Option<Registry>,
@@ -138,8 +132,8 @@ where
138132
/// both outgoing and incoming requests.
139133
pub state_request_protocol_config: RequestResponseConfig,
140134

141-
/// Optional warp sync protocol support. Include protocol config and sync provider.
142-
pub warp_sync: Option<(Arc<dyn WarpSyncProvider<B>>, RequestResponseConfig)>,
135+
/// Optional warp sync protocol config.
136+
pub warp_sync_protocol_config: Option<RequestResponseConfig>,
143137
}
144138

145139
/// Role of the local node.
@@ -352,7 +346,7 @@ impl From<multiaddr::Error> for ParseErr {
352346
}
353347

354348
/// Sync operation mode.
355-
#[derive(Clone, Debug, Eq, PartialEq)]
349+
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
356350
pub enum SyncMode {
357351
/// Full block download and verification.
358352
Full,

client/network/src/service.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
use crate::{
3131
behaviour::{self, Behaviour, BehaviourOut},
3232
bitswap::Bitswap,
33-
config::{self, parse_str_addr, Params, TransportConfig},
33+
config::{parse_str_addr, Params, TransportConfig},
3434
discovery::DiscoveryConfig,
3535
error::Error,
3636
network_state::{
@@ -60,7 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
6060
use parking_lot::Mutex;
6161
use sc_client_api::{BlockBackend, ProofProvider};
6262
use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link};
63-
use sc_network_common::sync::{SyncMode, SyncState, SyncStatus};
63+
use sc_network_common::sync::{SyncState, SyncStatus};
6464
use sc_peerset::PeersetHandle;
6565
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
6666
use sp_blockchain::{HeaderBackend, HeaderMetadata};
@@ -239,21 +239,6 @@ where
239239

240240
let default_notif_handshake_message = Roles::from(&params.role).encode();
241241

242-
let (warp_sync_provider, warp_sync_protocol_config) = match params.warp_sync {
243-
Some((p, c)) => (Some(p), Some(c)),
244-
None => (None, None),
245-
};
246-
247-
let chain_sync = (params.create_chain_sync)(
248-
match params.network_config.sync_mode {
249-
config::SyncMode::Full => SyncMode::Full,
250-
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
251-
SyncMode::LightState { skip_proofs, storage_chain_mode },
252-
config::SyncMode::Warp => SyncMode::Warp,
253-
},
254-
params.chain.clone(),
255-
warp_sync_provider,
256-
)?;
257242
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
258243
From::from(&params.role),
259244
params.chain.clone(),
@@ -266,7 +251,7 @@ where
266251
)
267252
.collect(),
268253
params.metrics_registry.as_ref(),
269-
chain_sync,
254+
params.chain_sync,
270255
)?;
271256

272257
// List of multiaddresses that we know in the network.
@@ -303,7 +288,6 @@ where
303288
let is_major_syncing = Arc::new(AtomicBool::new(false));
304289

305290
// Build the swarm.
306-
let client = params.chain.clone();
307291
let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = {
308292
let user_agent = format!(
309293
"{} ({})",
@@ -389,15 +373,15 @@ where
389373
};
390374

391375
let behaviour = {
392-
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client));
376+
let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(params.chain));
393377
let result = Behaviour::new(
394378
protocol,
395379
user_agent,
396380
local_public,
397381
discovery_config,
398382
params.block_request_protocol_config,
399383
params.state_request_protocol_config,
400-
warp_sync_protocol_config,
384+
params.warp_sync_protocol_config,
401385
bitswap,
402386
params.light_client_request_protocol_config,
403387
params.network_config.request_response_protocols,

client/network/src/service/tests.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type TestNetworkService = NetworkService<
4242
/// > **Note**: We return the events stream in order to not possibly lose events between the
4343
/// > construction of the service and the moment the events stream is grabbed.
4444
fn build_test_full_node(
45-
config: config::NetworkConfiguration,
45+
network_config: config::NetworkConfiguration,
4646
) -> (Arc<TestNetworkService>, impl Stream<Item = Event>) {
4747
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
4848

@@ -111,35 +111,36 @@ fn build_test_full_node(
111111
protocol_config
112112
};
113113

114-
let max_parallel_downloads = config.max_parallel_downloads;
114+
let chain_sync = ChainSync::new(
115+
match network_config.sync_mode {
116+
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
117+
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
118+
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
119+
config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
120+
},
121+
client.clone(),
122+
Box::new(DefaultBlockAnnounceValidator),
123+
network_config.max_parallel_downloads,
124+
None,
125+
)
126+
.unwrap();
115127
let worker = NetworkWorker::new(config::Params {
116128
role: config::Role::Full,
117129
executor: None,
118130
transactions_handler_executor: Box::new(|task| {
119131
async_std::task::spawn(task);
120132
}),
121-
network_config: config,
133+
network_config,
122134
chain: client.clone(),
123-
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
135+
transaction_pool: Arc::new(config::EmptyTransactionPool),
124136
protocol_id,
125137
import_queue,
126-
create_chain_sync: Box::new(
127-
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
128-
sync_mode,
129-
chain,
130-
Box::new(DefaultBlockAnnounceValidator),
131-
max_parallel_downloads,
132-
warp_sync_provider,
133-
) {
134-
Ok(chain_sync) => Ok(Box::new(chain_sync)),
135-
Err(error) => Err(Box::new(error).into()),
136-
},
137-
),
138+
chain_sync: Box::new(chain_sync),
138139
metrics_registry: None,
139140
block_request_protocol_config,
140141
state_request_protocol_config,
141142
light_client_request_protocol_config,
142-
warp_sync: None,
143+
warp_sync_protocol_config: None,
143144
})
144145
.unwrap();
145146

client/network/test/src/lib.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -838,10 +838,25 @@ where
838838
protocol_config
839839
};
840840

841-
let max_parallel_downloads = network_config.max_parallel_downloads;
842841
let block_announce_validator = config
843842
.block_announce_validator
844843
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator));
844+
let chain_sync = ChainSync::new(
845+
match network_config.sync_mode {
846+
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
847+
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
848+
sc_network_common::sync::SyncMode::LightState {
849+
skip_proofs,
850+
storage_chain_mode,
851+
},
852+
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
853+
},
854+
client.clone(),
855+
block_announce_validator,
856+
network_config.max_parallel_downloads,
857+
Some(warp_sync),
858+
)
859+
.unwrap();
845860
let network = NetworkWorker::new(sc_network::config::Params {
846861
role: if config.is_authority { Role::Authority } else { Role::Full },
847862
executor: None,
@@ -853,23 +868,12 @@ where
853868
transaction_pool: Arc::new(EmptyTransactionPool),
854869
protocol_id,
855870
import_queue,
856-
create_chain_sync: Box::new(move |sync_mode, chain, warp_sync_provider| {
857-
match ChainSync::new(
858-
sync_mode,
859-
chain,
860-
block_announce_validator,
861-
max_parallel_downloads,
862-
warp_sync_provider,
863-
) {
864-
Ok(chain_sync) => Ok(Box::new(chain_sync)),
865-
Err(error) => Err(Box::new(error).into()),
866-
}
867-
}),
871+
chain_sync: Box::new(chain_sync),
868872
metrics_registry: None,
869873
block_request_protocol_config,
870874
state_request_protocol_config,
871875
light_client_request_protocol_config,
872-
warp_sync: Some((warp_sync, warp_protocol_config)),
876+
warp_sync_protocol_config: Some(warp_protocol_config),
873877
})
874878
.unwrap();
875879

client/service/src/builder.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -760,13 +760,15 @@ where
760760
protocol_config
761761
};
762762

763-
let warp_sync_params = warp_sync.map(|provider| {
764-
// Allow both outgoing and incoming requests.
765-
let (handler, protocol_config) =
766-
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
767-
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
768-
(provider, protocol_config)
769-
});
763+
let (warp_sync_provider, warp_sync_protocol_config) = warp_sync
764+
.map(|provider| {
765+
// Allow both outgoing and incoming requests.
766+
let (handler, protocol_config) =
767+
WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone());
768+
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
769+
(Some(provider), Some(protocol_config))
770+
})
771+
.unwrap_or_default();
770772

771773
let light_client_request_protocol_config = {
772774
// Allow both outgoing and incoming requests.
@@ -776,7 +778,18 @@ where
776778
protocol_config
777779
};
778780

779-
let max_parallel_downloads = config.network.max_parallel_downloads;
781+
let chain_sync = ChainSync::new(
782+
match config.network.sync_mode {
783+
SyncMode::Full => sc_network_common::sync::SyncMode::Full,
784+
SyncMode::Fast { skip_proofs, storage_chain_mode } =>
785+
sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
786+
SyncMode::Warp => sc_network_common::sync::SyncMode::Warp,
787+
},
788+
client.clone(),
789+
block_announce_validator,
790+
config.network.max_parallel_downloads,
791+
warp_sync_provider,
792+
)?;
780793
let network_params = sc_network::config::Params {
781794
role: config.role.clone(),
782795
executor: {
@@ -796,22 +809,11 @@ where
796809
transaction_pool: transaction_pool_adapter as _,
797810
protocol_id,
798811
import_queue: Box::new(import_queue),
799-
create_chain_sync: Box::new(
800-
move |sync_mode, chain, warp_sync_provider| match ChainSync::new(
801-
sync_mode,
802-
chain,
803-
block_announce_validator,
804-
max_parallel_downloads,
805-
warp_sync_provider,
806-
) {
807-
Ok(chain_sync) => Ok(Box::new(chain_sync)),
808-
Err(error) => Err(Box::new(error).into()),
809-
},
810-
),
812+
chain_sync: Box::new(chain_sync),
811813
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
812814
block_request_protocol_config,
813815
state_request_protocol_config,
814-
warp_sync: warp_sync_params,
816+
warp_sync_protocol_config,
815817
light_client_request_protocol_config,
816818
};
817819

0 commit comments

Comments
 (0)