diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 7349cf3916..1970038b10 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -45,7 +45,7 @@ use std::sync::Arc; use std::time::Duration; use std::{cmp, thread}; -const ORPHAN_BLOCK_SIZE: usize = 100000; +const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize; type ProcessBlockRequest = Request<(Arc, Switch), Result>; type TruncateRequest = Request>; @@ -251,7 +251,7 @@ pub struct ChainService { #[derive(Clone)] struct UnverifiedBlock { - block: BlockView, + block: Arc, parent_header: HeaderView, switch: Switch, } @@ -260,7 +260,7 @@ impl ChainService { /// Create a new ChainService instance with shared and initial proposal_table. pub fn new(shared: Shared, proposal_table: ProposalTable) -> ChainService { let (unverified_tx, unverified_rx) = - channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize * 11); + channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize * 3); let (new_block_tx, new_block_rx) = channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize); @@ -478,7 +478,7 @@ impl ChainService { } let mut accept_error_occurred = false; for descendant in &descendants { - match self.accept_block(descendant) { + match self.accept_block(descendant.to_owned()) { Err(err) => { accept_error_occurred = true; error!("accept block {} failed: {}", descendant.hash(), err); @@ -653,7 +653,7 @@ impl ChainService { self.non_contextual_verify(&block)?; } - self.orphan_blocks_broker.insert(block.as_ref().to_owned()); + self.orphan_blocks_broker.insert(block); match self.new_block_tx.send(switch) { Ok(_) => {} @@ -673,7 +673,7 @@ impl ChainService { Ok(false) } - fn accept_block(&self, block: &BlockView) -> Result, Error> { + fn accept_block(&self, block: Arc) -> Result, Error> { let (block_number, block_hash) = (block.number(), block.hash()); if self @@ -703,7 +703,7 @@ impl ChainService { let db_txn = Arc::new(self.shared.store().begin_transaction()); - db_txn.insert_block(block)?; + db_txn.insert_block(block.as_ref())?; // if parent_ext.verified == Some(false) { // return Err(InvalidParentError { diff --git a/chain/src/orphan_block_pool.rs b/chain/src/orphan_block_pool.rs index ead446d3ca..9459f4864b 100644 --- a/chain/src/orphan_block_pool.rs +++ b/chain/src/orphan_block_pool.rs @@ -3,6 +3,7 @@ use ckb_types::core::EpochNumber; use ckb_types::{core, packed}; use ckb_util::{parking_lot::RwLock, shrink_to_fit}; use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::Arc; pub type ParentHash = packed::Byte32; @@ -12,7 +13,7 @@ const EXPIRED_EPOCH: u64 = 6; #[derive(Default)] struct InnerPool { // Group by blocks in the pool by the parent hash. - blocks: HashMap>, + blocks: HashMap>>, // The map tells the parent hash when given the hash of a block in the pool. // // The block is in the orphan pool if and only if the block hash exists as a key in this map. @@ -30,7 +31,7 @@ impl InnerPool { } } - fn insert(&mut self, block: core::BlockView) { + fn insert(&mut self, block: Arc) { let hash = block.header().hash(); let parent_hash = block.data().header().raw().parent_hash(); self.blocks @@ -50,7 +51,10 @@ impl InnerPool { self.parents.insert(hash, parent_hash); } - pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent( + &mut self, + parent_hash: &ParentHash, + ) -> Vec> { // try remove leaders first if !self.leaders.remove(parent_hash) { return Vec::new(); @@ -59,7 +63,7 @@ impl InnerPool { let mut queue: VecDeque = VecDeque::new(); queue.push_back(parent_hash.to_owned()); - let mut removed: Vec = Vec::new(); + let mut removed: Vec> = Vec::new(); while let Some(parent_hash) = queue.pop_front() { if let Some(orphaned) = self.blocks.remove(&parent_hash) { let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip(); @@ -84,7 +88,7 @@ impl InnerPool { removed } - pub fn get_block(&self, hash: &packed::Byte32) -> Option { + pub fn get_block(&self, hash: &packed::Byte32) -> Option> { self.parents.get(hash).and_then(|parent_hash| { self.blocks .get(parent_hash) @@ -135,15 +139,15 @@ impl OrphanBlockPool { } /// Insert orphaned block, for which we have already requested its parent block - pub fn insert(&self, block: core::BlockView) { + pub fn insert(&self, block: Arc) { self.inner.write().insert(block); } - pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec> { self.inner.write().remove_blocks_by_parent(parent_hash) } - pub fn get_block(&self, hash: &packed::Byte32) -> Option { + pub fn get_block(&self, hash: &packed::Byte32) -> Option> { self.inner.read().get_block(hash) } diff --git a/rpc/src/module/net.rs b/rpc/src/module/net.rs index 4bce29dd32..ec2d48af10 100644 --- a/rpc/src/module/net.rs +++ b/rpc/src/module/net.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use ckb_chain::chain::ChainController; use ckb_jsonrpc_types::{ BannedAddr, LocalNode, LocalNodeProtocol, NodeAddress, PeerSyncState, RemoteNode, RemoteNodeProtocol, SyncState, Timestamp, @@ -534,6 +535,7 @@ pub trait NetRpc { pub(crate) struct NetRpcImpl { pub network_controller: NetworkController, pub sync_shared: Arc, + pub chain_controller: Arc, } impl NetRpc for NetRpcImpl { @@ -711,7 +713,6 @@ impl NetRpc for NetRpcImpl { fn sync_state(&self) -> Result { let chain = self.sync_shared.active_chain(); - let shared = chain.shared(); let state = chain.shared().state(); let (fast_time, normal_time, low_time) = state.read_inflight_blocks().division_point(); let best_known = state.shared_best_header(); @@ -719,7 +720,7 @@ impl NetRpc for NetRpcImpl { ibd: chain.is_initial_block_download(), best_known_block_number: best_known.number().into(), best_known_block_timestamp: best_known.timestamp().into(), - orphan_blocks_count: (shared.shared().orphan_pool_count()).into(), + orphan_blocks_count: (self.chain_controller.orphan_blocks_len() as u64).into(), inflight_blocks_count: (state.read_inflight_blocks().total_inflight_count() as u64) .into(), fast_time: fast_time.into(), diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 19372ce8a0..bd5ae2f9d2 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -96,10 +96,12 @@ impl<'a> ServiceBuilder<'a> { mut self, network_controller: NetworkController, sync_shared: Arc, + chain_controller: Arc, ) -> Self { let rpc_methods = NetRpcImpl { network_controller, sync_shared, + chain_controller, } .to_delegate(); if self.config.net_enable() { diff --git a/shared/src/shared.rs b/shared/src/shared.rs index b420f1ca1e..c41ec8265f 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -413,7 +413,6 @@ impl Shared { pub fn get_orphan_block(&self, block_hash: &Byte32) -> Option { todo!("get_orphan_block") - // self.orphan_block_pool.get_block(block_hash) } pub fn orphan_pool_count(&self) -> u64 { diff --git a/shared/src/types/header_map/memory.rs b/shared/src/types/header_map/memory.rs index 0411e8c671..0bf62d50f4 100644 --- a/shared/src/types/header_map/memory.rs +++ b/shared/src/types/header_map/memory.rs @@ -1,4 +1,4 @@ -use crate::types::{HeaderIndexView, SHRINK_THRESHOLD}; +use crate::types::HeaderIndexView; use ckb_types::{ core::{BlockNumber, EpochNumberWithFraction}, packed::Byte32, @@ -7,6 +7,8 @@ use ckb_types::{ use ckb_util::{shrink_to_fit, LinkedHashMap, RwLock}; use std::default; +const SHRINK_THRESHOLD: usize = 300; + #[derive(Clone, Debug, PartialEq, Eq)] struct HeaderIndexViewInner { number: BlockNumber, @@ -99,7 +101,7 @@ impl MemoryMap { pub(crate) fn remove(&self, key: &Byte32) -> Option { let mut guard = self.0.write(); let ret = guard.remove(key); - shrink_to_fit!(guard, SHRINK_THRESHOLD); + // shrink_to_fit!(guard, SHRINK_THRESHOLD); ret.map(|inner| (key.clone(), inner).into()) } diff --git a/shared/src/types/header_map/mod.rs b/shared/src/types/header_map/mod.rs index ee2907e8f9..61bd215b4b 100644 --- a/shared/src/types/header_map/mod.rs +++ b/shared/src/types/header_map/mod.rs @@ -24,7 +24,7 @@ pub struct HeaderMap { inner: Arc>, } -const INTERVAL: Duration = Duration::from_millis(500); +const INTERVAL: Duration = Duration::from_millis(5000); const ITEM_BYTES_SIZE: usize = size_of::(); const WARN_THRESHOLD: usize = ITEM_BYTES_SIZE * 100_000; @@ -53,7 +53,9 @@ impl HeaderMap { loop { tokio::select! { _ = interval.tick() => { + let now = std::time::Instant::now(); map.limit_memory(); + debug!("HeaderMap limit_memory cost: {:?}", now.elapsed()); } _ = stop_rx.cancelled() => { debug!("HeaderMap limit_memory received exit signal, exit now"); diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index c3a9f34ee0..ebe0bba240 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -147,6 +147,12 @@ impl<'a> BlockFetcher<'a> { ); let mut fetch = Vec::with_capacity(n_fetch); let now = unix_time_as_millis(); + debug!( + "finding which blocks to fetch, start: {}, end: {}, best_known: {}", + start, + end, + best_known.number(), + ); while fetch.len() < n_fetch && start <= end { let span = min(end - start + 1, (n_fetch - fetch.len()) as u64); @@ -155,14 +161,18 @@ impl<'a> BlockFetcher<'a> { let mut header = self .active_chain .get_ancestor(&best_known.hash(), start + span - 1)?; - let mut status = self.active_chain.get_block_status(&header.hash()); + let mut status = self + .synchronizer + .shared() + .shared() + .get_block_status(&header.hash()); // Judge whether we should fetch the target block, neither stored nor in-flighted for _ in 0..span { let parent_hash = header.parent_hash(); let hash = header.hash(); - if status.contains(BlockStatus::BLOCK_STORED) { + if status.contains(BlockStatus::BLOCK_PARTIAL_STORED) { // If the block is stored, its ancestor must on store // So we can skip the search of this space directly self.synchronizer @@ -205,7 +215,7 @@ impl<'a> BlockFetcher<'a> { if fetch.is_empty() { debug!( "[block fetch empty] peer-{}, fixed_last_common_header = {} \ - best_known_header = {}, tip = {}, unverified_tip = {}, inflight_len = {}, time_cost: {}ms", + best_known_header = {}, [tip/unverified_tip]: [{}/{}], inflight_len = {}, time_cost: {}ms", self.peer, last_common.number(), best_known.number(), @@ -225,11 +235,12 @@ impl<'a> BlockFetcher<'a> { let inflight_peer_count = inflight.peer_inflight_count(self.peer); let inflight_total_count = inflight.total_inflight_count(); debug!( - "request peer-{} for batch blocks: [{}-{}], batch len:{} , unverified_tip: {}, [peer/total inflight count]: [{} / {}], timecost: {}ms, blocks: {}", + "request peer-{} for batch blocks: [{}-{}], batch len:{}, [tip/unverified_tip]: [{}/{}], [peer/total inflight count]: [{} / {}], timecost: {}ms, blocks: {}", self.peer, fetch_head, fetch_last, fetch.len(), + tip, self.synchronizer.shared().shared().get_unverified_tip().number(), inflight_peer_count, inflight_total_count, diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 664d748b2c..aebc364a0a 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -29,8 +29,8 @@ use ckb_chain::chain::ChainController; use ckb_channel as channel; use ckb_channel::{select, Receiver}; use ckb_constant::sync::{ - BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME, - INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE, + BAD_MESSAGE_BAN_TIME, BLOCK_DOWNLOAD_WINDOW, CHAIN_SYNC_TIMEOUT, + EVICTION_HEADERS_RESPONSE_TIME, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE, }; use ckb_error::Error as CKBError; use ckb_logger::{debug, error, info, trace, warn}; @@ -586,10 +586,14 @@ impl Synchronizer { } fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) { - let tip = self.shared.active_chain().tip_number(); + let unverified_tip = self.shared.active_chain().unverified_tip_number(); let disconnect_list = { - let mut list = self.shared().state().write_inflight_blocks().prune(tip); + let mut list = self + .shared() + .state() + .write_inflight_blocks() + .prune(unverified_tip); if let IBDState::In = ibd { // best known < tip and in IBD state, and unknown list is empty, // these node can be disconnect @@ -597,7 +601,7 @@ impl Synchronizer { self.shared .state() .peers() - .get_best_known_less_than_tip_and_unknown_empty(tip), + .get_best_known_less_than_tip_and_unknown_empty(unverified_tip), ) }; list diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index ad56947f35..54207d2119 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -406,7 +406,11 @@ impl Launcher { chain_controller.clone(), miner_enable, ) - .enable_net(network_controller.clone(), sync_shared) + .enable_net( + network_controller.clone(), + sync_shared, + Arc::new(chain_controller.clone()), + ) .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) .enable_experiment(shared.clone()) .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller)