Skip to content

Commit 85885b9

Browse files
committed
Remove Clone from Synchronizer
1 parent 5ba499a commit 85885b9

File tree

3 files changed

+47
-37
lines changed

3 files changed

+47
-37
lines changed

sync/src/relayer/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ pub enum ReconstructionResult {
6868
}
6969

7070
/// Relayer protocol handle
71-
#[derive(Clone)]
7271
pub struct Relayer {
7372
chain: ChainController,
7473
pub(crate) shared: Arc<SyncShared>,

sync/src/synchronizer/block_fetcher.rs

+36-26
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::block_status::BlockStatus;
2-
use crate::synchronizer::Synchronizer;
32
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView, IBDState};
3+
use crate::SyncShared;
44
use ckb_constant::sync::{
55
BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
66
};
@@ -9,34 +9,38 @@ use ckb_network::PeerIndex;
99
use ckb_systemtime::unix_time_as_millis;
1010
use ckb_types::packed;
1111
use std::cmp::min;
12+
use std::sync::Arc;
1213

13-
pub struct BlockFetcher<'a> {
14-
synchronizer: &'a Synchronizer,
14+
pub struct BlockFetcher {
15+
sync_shared: Arc<SyncShared>,
1516
peer: PeerIndex,
1617
active_chain: ActiveChain,
1718
ibd: IBDState,
1819
}
1920

20-
impl<'a> BlockFetcher<'a> {
21-
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
22-
let active_chain = synchronizer.shared.active_chain();
21+
impl BlockFetcher {
22+
pub fn new(sync_shared: Arc<SyncShared>, peer: PeerIndex, ibd: IBDState) -> Self {
23+
let active_chain = sync_shared.active_chain();
2324
BlockFetcher {
25+
sync_shared,
2426
peer,
25-
synchronizer,
2627
active_chain,
2728
ibd,
2829
}
2930
}
3031

3132
pub fn reached_inflight_limit(&self) -> bool {
32-
let inflight = self.synchronizer.shared().state().read_inflight_blocks();
33+
let inflight = self.sync_shared.state().read_inflight_blocks();
3334

3435
// Can't download any more from this peer
3536
inflight.peer_can_fetch_count(self.peer) == 0
3637
}
3738

3839
pub fn peer_best_known_header(&self) -> Option<HeaderIndex> {
39-
self.synchronizer.peers().get_best_known_header(self.peer)
40+
self.sync_shared
41+
.state()
42+
.peers()
43+
.get_best_known_header(self.peer)
4044
}
4145

4246
pub fn update_last_common_header(
@@ -45,23 +49,28 @@ impl<'a> BlockFetcher<'a> {
4549
) -> Option<BlockNumberAndHash> {
4650
// Bootstrap quickly by guessing an ancestor of our best tip is forking point.
4751
// Guessing wrong in either direction is not a problem.
48-
let mut last_common =
49-
if let Some(header) = self.synchronizer.peers().get_last_common_header(self.peer) {
50-
header
51-
} else {
52-
let tip_header = self.active_chain.tip_header();
53-
let guess_number = min(tip_header.number(), best_known.number());
54-
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
55-
(guess_number, guess_hash).into()
56-
};
52+
let mut last_common = if let Some(header) = self
53+
.sync_shared
54+
.state()
55+
.peers()
56+
.get_last_common_header(self.peer)
57+
{
58+
header
59+
} else {
60+
let tip_header = self.active_chain.tip_header();
61+
let guess_number = min(tip_header.number(), best_known.number());
62+
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
63+
(guess_number, guess_hash).into()
64+
};
5765

5866
// If the peer reorganized, our previous last_common_header may not be an ancestor
5967
// of its current tip anymore. Go back enough to fix that.
6068
last_common = self
6169
.active_chain
6270
.last_common_ancestor(&last_common, best_known)?;
6371

64-
self.synchronizer
72+
self.sync_shared
73+
.state()
6574
.peers()
6675
.set_last_common_header(self.peer, last_common.clone());
6776

@@ -80,13 +89,13 @@ impl<'a> BlockFetcher<'a> {
8089
// Update `best_known_header` based on `unknown_header_list`. It must be involved before
8190
// our acquiring the newest `best_known_header`.
8291
if let IBDState::In = self.ibd {
83-
let state = self.synchronizer.shared.state();
92+
let state = self.sync_shared.state();
8493
// unknown list is an ordered list, sorted from highest to lowest,
8594
// when header hash unknown, break loop is ok
8695
while let Some(hash) = state.peers().take_unknown_last(self.peer) {
8796
// Here we need to first try search from headermap, if not, fallback to search from the db.
8897
// if not search from db, it can stuck here when the headermap may have been removed just as the block was downloaded
89-
if let Some(header) = self.synchronizer.shared.get_header_index_view(&hash, false) {
98+
if let Some(header) = self.sync_shared.get_header_index_view(&hash, false) {
9099
state
91100
.peers()
92101
.may_set_best_known_header(self.peer, header.as_header_index());
@@ -114,7 +123,8 @@ impl<'a> BlockFetcher<'a> {
114123
// specially advance this peer's last_common_header at the case of both us on the same
115124
// active chain.
116125
if self.active_chain.is_main_chain(&best_known.hash()) {
117-
self.synchronizer
126+
self.sync_shared
127+
.state()
118128
.peers()
119129
.set_last_common_header(self.peer, best_known.number_and_hash());
120130
}
@@ -128,7 +138,7 @@ impl<'a> BlockFetcher<'a> {
128138
return None;
129139
}
130140

131-
let state = self.synchronizer.shared().state();
141+
let state = self.sync_shared.state();
132142
let mut inflight = state.write_inflight_blocks();
133143
let mut start = last_common.number() + 1;
134144
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
@@ -156,7 +166,8 @@ impl<'a> BlockFetcher<'a> {
156166
if status.contains(BlockStatus::BLOCK_STORED) {
157167
// If the block is stored, its ancestor must on store
158168
// So we can skip the search of this space directly
159-
self.synchronizer
169+
self.sync_shared
170+
.state()
160171
.peers()
161172
.set_last_common_header(self.peer, header.number_and_hash());
162173
end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW);
@@ -172,8 +183,7 @@ impl<'a> BlockFetcher<'a> {
172183

173184
status = self.active_chain.get_block_status(&parent_hash);
174185
header = self
175-
.synchronizer
176-
.shared
186+
.sync_shared
177187
.get_header_index_view(&parent_hash, false)?;
178188
}
179189

sync/src/synchronizer/mod.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ enum FetchCMD {
8080
}
8181

8282
struct BlockFetchCMD {
83-
sync: Synchronizer,
83+
sync_shared: Arc<SyncShared>,
8484
p2p_control: ServiceControl,
8585
recv: channel::Receiver<FetchCMD>,
8686
can_start: CanStart,
@@ -93,15 +93,17 @@ impl BlockFetchCMD {
9393
FetchCMD::Fetch((peers, state)) => match self.can_start() {
9494
CanStart::Ready => {
9595
for peer in peers {
96-
if let Some(fetch) = BlockFetcher::new(&self.sync, peer, state).fetch() {
96+
if let Some(fetch) =
97+
BlockFetcher::new(Arc::clone(&self.sync_shared), peer, state).fetch()
98+
{
9799
for item in fetch {
98100
BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer);
99101
}
100102
}
101103
}
102104
}
103105
CanStart::MinWorkNotReach => {
104-
let best_known = self.sync.shared.state().shared_best_header_ref();
106+
let best_known = self.sync_shared.state().shared_best_header_ref();
105107
let number = best_known.number();
106108
if number != self.number && (number - self.number) % 10000 == 0 {
107109
self.number = number;
@@ -111,12 +113,12 @@ impl BlockFetchCMD {
111113
then start to download block",
112114
number,
113115
best_known.total_difficulty(),
114-
self.sync.shared.state().min_chain_work()
116+
self.sync_shared.state().min_chain_work()
115117
);
116118
}
117119
}
118120
CanStart::AssumeValidNotFound => {
119-
let state = self.sync.shared.state();
121+
let state = self.sync_shared.state();
120122
let best_known = state.shared_best_header_ref();
121123
let number = best_known.number();
122124
let assume_valid_target: Byte32 = state
@@ -161,7 +163,7 @@ impl BlockFetchCMD {
161163
return self.can_start;
162164
}
163165

164-
let state = self.sync.shared.state();
166+
let state = self.sync_shared.state();
165167

166168
let min_work_reach = |flag: &mut CanStart| {
167169
if state.min_chain_work_ready() {
@@ -230,7 +232,6 @@ impl BlockFetchCMD {
230232
}
231233

232234
/// Sync protocol handle
233-
#[derive(Clone)]
234235
pub struct Synchronizer {
235236
pub(crate) chain: ChainController,
236237
/// Sync shared state
@@ -364,7 +365,7 @@ impl Synchronizer {
364365
peer: PeerIndex,
365366
ibd: IBDState,
366367
) -> Option<Vec<Vec<packed::Byte32>>> {
367-
BlockFetcher::new(self, peer, ibd).fetch()
368+
BlockFetcher::new(Arc::to_owned(self.shared()), peer, ibd).fetch()
368369
}
369370

370371
pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
@@ -632,20 +633,20 @@ impl Synchronizer {
632633
}
633634
None => {
634635
let p2p_control = raw.clone();
635-
let sync = self.clone();
636636
let (sender, recv) = channel::bounded(2);
637637
let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
638638
sender.send(FetchCMD::Fetch((peers, ibd))).unwrap();
639639
self.fetch_channel = Some(sender);
640640
let thread = ::std::thread::Builder::new();
641641
let number = self.shared.state().shared_best_header_ref().number();
642642
const THREAD_NAME: &str = "BlockDownload";
643+
let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
643644
let blockdownload_jh = thread
644645
.name(THREAD_NAME.into())
645646
.spawn(move || {
646647
let stop_signal = new_crossbeam_exit_rx();
647648
BlockFetchCMD {
648-
sync,
649+
sync_shared,
649650
p2p_control,
650651
recv,
651652
number,

0 commit comments

Comments
 (0)