Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eval-exec committed Aug 4, 2023
1 parent 70663de commit b236f9e
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 31 deletions.
14 changes: 7 additions & 7 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{cmp, thread};

const ORPHAN_BLOCK_SIZE: usize = 100000;
const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize;

type ProcessBlockRequest = Request<(Arc<BlockView>, Switch), Result<bool, Error>>;
type TruncateRequest = Request<Byte32, Result<(), Error>>;
Expand Down Expand Up @@ -251,7 +251,7 @@ pub struct ChainService {

#[derive(Clone)]
struct UnverifiedBlock {
block: BlockView,
block: Arc<BlockView>,
parent_header: HeaderView,
switch: Switch,
}
Expand All @@ -260,7 +260,7 @@ impl ChainService {
/// Create a new ChainService instance with shared and initial proposal_table.
pub fn new(shared: Shared, proposal_table: ProposalTable) -> ChainService {
let (unverified_tx, unverified_rx) =
channel::bounded::<UnverifiedBlock>(BLOCK_DOWNLOAD_WINDOW as usize * 11);
channel::bounded::<UnverifiedBlock>(BLOCK_DOWNLOAD_WINDOW as usize * 3);

let (new_block_tx, new_block_rx) =
channel::bounded::<Switch>(BLOCK_DOWNLOAD_WINDOW as usize);
Expand Down Expand Up @@ -478,7 +478,7 @@ impl ChainService {
}
let mut accept_error_occurred = false;
for descendant in &descendants {
match self.accept_block(descendant) {
match self.accept_block(descendant.to_owned()) {
Err(err) => {
accept_error_occurred = true;
error!("accept block {} failed: {}", descendant.hash(), err);
Expand Down Expand Up @@ -653,7 +653,7 @@ impl ChainService {
self.non_contextual_verify(&block)?;
}

self.orphan_blocks_broker.insert(block.as_ref().to_owned());
self.orphan_blocks_broker.insert(block);

match self.new_block_tx.send(switch) {
Ok(_) => {}
Expand All @@ -673,7 +673,7 @@ impl ChainService {
Ok(false)
}

fn accept_block(&self, block: &BlockView) -> Result<Option<(HeaderView, U256)>, Error> {
fn accept_block(&self, block: Arc<BlockView>) -> Result<Option<(HeaderView, U256)>, Error> {
let (block_number, block_hash) = (block.number(), block.hash());

if self
Expand Down Expand Up @@ -703,7 +703,7 @@ impl ChainService {

let db_txn = Arc::new(self.shared.store().begin_transaction());

db_txn.insert_block(block)?;
db_txn.insert_block(block.as_ref())?;

// if parent_ext.verified == Some(false) {
// return Err(InvalidParentError {
Expand Down
20 changes: 12 additions & 8 deletions chain/src/orphan_block_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ckb_types::core::EpochNumber;
use ckb_types::{core, packed};
use ckb_util::{parking_lot::RwLock, shrink_to_fit};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;

pub type ParentHash = packed::Byte32;

Expand All @@ -12,7 +13,7 @@ const EXPIRED_EPOCH: u64 = 6;
#[derive(Default)]
struct InnerPool {
// Group by blocks in the pool by the parent hash.
blocks: HashMap<ParentHash, HashMap<packed::Byte32, core::BlockView>>,
blocks: HashMap<ParentHash, HashMap<packed::Byte32, Arc<core::BlockView>>>,
// The map tells the parent hash when given the hash of a block in the pool.
//
// The block is in the orphan pool if and only if the block hash exists as a key in this map.
Expand All @@ -30,7 +31,7 @@ impl InnerPool {
}
}

fn insert(&mut self, block: core::BlockView) {
fn insert(&mut self, block: Arc<core::BlockView>) {
let hash = block.header().hash();
let parent_hash = block.data().header().raw().parent_hash();
self.blocks
Expand All @@ -50,7 +51,10 @@ impl InnerPool {
self.parents.insert(hash, parent_hash);
}

pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec<core::BlockView> {
pub fn remove_blocks_by_parent(
&mut self,
parent_hash: &ParentHash,
) -> Vec<Arc<core::BlockView>> {
// try remove leaders first
if !self.leaders.remove(parent_hash) {
return Vec::new();
Expand All @@ -59,7 +63,7 @@ impl InnerPool {
let mut queue: VecDeque<packed::Byte32> = VecDeque::new();
queue.push_back(parent_hash.to_owned());

let mut removed: Vec<core::BlockView> = Vec::new();
let mut removed: Vec<Arc<core::BlockView>> = Vec::new();
while let Some(parent_hash) = queue.pop_front() {
if let Some(orphaned) = self.blocks.remove(&parent_hash) {
let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip();
Expand All @@ -84,7 +88,7 @@ impl InnerPool {
removed
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<core::BlockView> {
pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<core::BlockView>> {
self.parents.get(hash).and_then(|parent_hash| {
self.blocks
.get(parent_hash)
Expand Down Expand Up @@ -135,15 +139,15 @@ impl OrphanBlockPool {
}

/// Insert orphaned block, for which we have already requested its parent block
pub fn insert(&self, block: core::BlockView) {
pub fn insert(&self, block: Arc<core::BlockView>) {
self.inner.write().insert(block);
}

pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<core::BlockView> {
pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<Arc<core::BlockView>> {
self.inner.write().remove_blocks_by_parent(parent_hash)
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<core::BlockView> {
pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<core::BlockView>> {
self.inner.read().get_block(hash)
}

Expand Down
5 changes: 3 additions & 2 deletions rpc/src/module/net.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::RPCError;
use ckb_chain::chain::ChainController;
use ckb_jsonrpc_types::{
BannedAddr, LocalNode, LocalNodeProtocol, NodeAddress, PeerSyncState, RemoteNode,
RemoteNodeProtocol, SyncState, Timestamp,
Expand Down Expand Up @@ -534,6 +535,7 @@ pub trait NetRpc {
pub(crate) struct NetRpcImpl {
pub network_controller: NetworkController,
pub sync_shared: Arc<SyncShared>,
pub chain_controller: Arc<ChainController>,
}

impl NetRpc for NetRpcImpl {
Expand Down Expand Up @@ -711,15 +713,14 @@ impl NetRpc for NetRpcImpl {

fn sync_state(&self) -> Result<SyncState> {
let chain = self.sync_shared.active_chain();
let shared = chain.shared();
let state = chain.shared().state();
let (fast_time, normal_time, low_time) = state.read_inflight_blocks().division_point();
let best_known = state.shared_best_header();
let sync_state = SyncState {
ibd: chain.is_initial_block_download(),
best_known_block_number: best_known.number().into(),
best_known_block_timestamp: best_known.timestamp().into(),
orphan_blocks_count: (shared.shared().orphan_pool_count()).into(),
orphan_blocks_count: (self.chain_controller.orphan_blocks_len() as u64).into(),
inflight_blocks_count: (state.read_inflight_blocks().total_inflight_count() as u64)
.into(),
fast_time: fast_time.into(),
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/service_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ impl<'a> ServiceBuilder<'a> {
mut self,
network_controller: NetworkController,
sync_shared: Arc<SyncShared>,
chain_controller: Arc<ChainController>,
) -> Self {
let rpc_methods = NetRpcImpl {
network_controller,
sync_shared,
chain_controller,
}
.to_delegate();
if self.config.net_enable() {
Expand Down
1 change: 0 additions & 1 deletion shared/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ impl Shared {

pub fn get_orphan_block(&self, block_hash: &Byte32) -> Option<core::BlockView> {
todo!("get_orphan_block")
// self.orphan_block_pool.get_block(block_hash)
}

pub fn orphan_pool_count(&self) -> u64 {
Expand Down
6 changes: 4 additions & 2 deletions shared/src/types/header_map/memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::{HeaderIndexView, SHRINK_THRESHOLD};
use crate::types::HeaderIndexView;
use ckb_types::{
core::{BlockNumber, EpochNumberWithFraction},
packed::Byte32,
Expand All @@ -7,6 +7,8 @@ use ckb_types::{
use ckb_util::{shrink_to_fit, LinkedHashMap, RwLock};
use std::default;

const SHRINK_THRESHOLD: usize = 300;

#[derive(Clone, Debug, PartialEq, Eq)]
struct HeaderIndexViewInner {
number: BlockNumber,
Expand Down Expand Up @@ -99,7 +101,7 @@ impl MemoryMap {
pub(crate) fn remove(&self, key: &Byte32) -> Option<HeaderIndexView> {
let mut guard = self.0.write();
let ret = guard.remove(key);
shrink_to_fit!(guard, SHRINK_THRESHOLD);
// shrink_to_fit!(guard, SHRINK_THRESHOLD);
ret.map(|inner| (key.clone(), inner).into())
}

Expand Down
4 changes: 3 additions & 1 deletion shared/src/types/header_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct HeaderMap {
inner: Arc<HeaderMapKernel<SledBackend>>,
}

const INTERVAL: Duration = Duration::from_millis(500);
const INTERVAL: Duration = Duration::from_millis(5000);
const ITEM_BYTES_SIZE: usize = size_of::<HeaderIndexView>();
const WARN_THRESHOLD: usize = ITEM_BYTES_SIZE * 100_000;

Expand Down Expand Up @@ -53,7 +53,9 @@ impl HeaderMap {
loop {
tokio::select! {
_ = interval.tick() => {
let now = std::time::Instant::now();
map.limit_memory();
debug!("HeaderMap limit_memory cost: {:?}", now.elapsed());
}
_ = stop_rx.cancelled() => {
debug!("HeaderMap limit_memory received exit signal, exit now");
Expand Down
19 changes: 15 additions & 4 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ impl<'a> BlockFetcher<'a> {
);
let mut fetch = Vec::with_capacity(n_fetch);
let now = unix_time_as_millis();
debug!(
"finding which blocks to fetch, start: {}, end: {}, best_known: {}",
start,
end,
best_known.number(),
);

while fetch.len() < n_fetch && start <= end {
let span = min(end - start + 1, (n_fetch - fetch.len()) as u64);
Expand All @@ -155,14 +161,18 @@ impl<'a> BlockFetcher<'a> {
let mut header = self
.active_chain
.get_ancestor(&best_known.hash(), start + span - 1)?;
let mut status = self.active_chain.get_block_status(&header.hash());
let mut status = self
.synchronizer
.shared()
.shared()
.get_block_status(&header.hash());

// Judge whether we should fetch the target block, neither stored nor in-flighted
for _ in 0..span {
let parent_hash = header.parent_hash();
let hash = header.hash();

if status.contains(BlockStatus::BLOCK_STORED) {
if status.contains(BlockStatus::BLOCK_PARTIAL_STORED) {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
self.synchronizer
Expand Down Expand Up @@ -205,7 +215,7 @@ impl<'a> BlockFetcher<'a> {
if fetch.is_empty() {
debug!(
"[block fetch empty] peer-{}, fixed_last_common_header = {} \
best_known_header = {}, tip = {}, unverified_tip = {}, inflight_len = {}, time_cost: {}ms",
best_known_header = {}, [tip/unverified_tip]: [{}/{}], inflight_len = {}, time_cost: {}ms",
self.peer,
last_common.number(),
best_known.number(),
Expand All @@ -225,11 +235,12 @@ impl<'a> BlockFetcher<'a> {
let inflight_peer_count = inflight.peer_inflight_count(self.peer);
let inflight_total_count = inflight.total_inflight_count();
debug!(
"request peer-{} for batch blocks: [{}-{}], batch len:{} , unverified_tip: {}, [peer/total inflight count]: [{} / {}], timecost: {}ms, blocks: {}",
"request peer-{} for batch blocks: [{}-{}], batch len:{}, [tip/unverified_tip]: [{}/{}], [peer/total inflight count]: [{} / {}], timecost: {}ms, blocks: {}",
self.peer,
fetch_head,
fetch_last,
fetch.len(),
tip,
self.synchronizer.shared().shared().get_unverified_tip().number(),
inflight_peer_count,
inflight_total_count,
Expand Down
14 changes: 9 additions & 5 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use ckb_chain::chain::ChainController;
use ckb_channel as channel;
use ckb_channel::{select, Receiver};
use ckb_constant::sync::{
BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
BAD_MESSAGE_BAN_TIME, BLOCK_DOWNLOAD_WINDOW, CHAIN_SYNC_TIMEOUT,
EVICTION_HEADERS_RESPONSE_TIME, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
};
use ckb_error::Error as CKBError;
use ckb_logger::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -586,18 +586,22 @@ impl Synchronizer {
}

fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) {
let tip = self.shared.active_chain().tip_number();
let unverified_tip = self.shared.active_chain().unverified_tip_number();

let disconnect_list = {
let mut list = self.shared().state().write_inflight_blocks().prune(tip);
let mut list = self
.shared()
.state()
.write_inflight_blocks()
.prune(unverified_tip);
if let IBDState::In = ibd {
// best known < tip and in IBD state, and unknown list is empty,
// these node can be disconnect
list.extend(
self.shared
.state()
.peers()
.get_best_known_less_than_tip_and_unknown_empty(tip),
.get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
)
};
list
Expand Down
6 changes: 5 additions & 1 deletion util/launcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,11 @@ impl Launcher {
chain_controller.clone(),
miner_enable,
)
.enable_net(network_controller.clone(), sync_shared)
.enable_net(
network_controller.clone(),
sync_shared,
Arc::new(chain_controller.clone()),
)
.enable_stats(shared.clone(), Arc::clone(&alert_notifier))
.enable_experiment(shared.clone())
.enable_integration_test(shared.clone(), network_controller.clone(), chain_controller)
Expand Down

0 comments on commit b236f9e

Please sign in to comment.