Skip to content

Commit 2996109

Browse files
committed
move shared_best_header from SyncState to Shared
Signed-off-by: Eval EXEC <[email protected]>
1 parent 7c5f6d5 commit 2996109

File tree

7 files changed

+152
-50
lines changed

7 files changed

+152
-50
lines changed

rpc/src/module/net.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ impl NetRpc for NetRpcImpl {
731731
let shared = chain.shared();
732732
let state = chain.state();
733733
let (fast_time, normal_time, low_time) = state.read_inflight_blocks().division_point();
734-
let best_known = state.shared_best_header();
734+
let best_known = self.sync_shared.shared_best_header();
735735
let min_chain_work = {
736736
let mut min_chain_work_500k_u128: [u8; 16] = [0; 16];
737737
min_chain_work_500k_u128
@@ -751,7 +751,7 @@ impl NetRpc for NetRpcImpl {
751751
)
752752
.into(),
753753
min_chain_work: min_chain_work.into(),
754-
min_chain_work_reached: state.min_chain_work_ready(),
754+
min_chain_work_reached: self.sync_shared.min_chain_work_ready(),
755755
best_known_block_number: best_known.number().into(),
756756
best_known_block_timestamp: best_known.timestamp().into(),
757757
orphan_blocks_count: (self.chain_controller.orphan_blocks_len() as u64).into(),

shared/src/types/header_map/backend_sled.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#![allow(unused)]
22

3+
use crate::shared;
34
use crate::types::HeaderIndexView;
5+
use ckb_logger::info;
46
use ckb_types::{packed::Byte32, prelude::*};
57
use sled::{Config, Db, Mode};
68
use std::path;
@@ -9,6 +11,8 @@ use std::sync::atomic::AtomicUsize;
911
use std::sync::atomic::Ordering;
1012
use tempfile::TempDir;
1113

14+
const SHARED_BEST_HEADER_KEY: &[u8] = b"__ckb_shared_best_header__";
15+
1216
pub(crate) struct SledBackend {
1317
count: AtomicUsize,
1418
db: Db,
@@ -33,6 +37,7 @@ impl SledBackend {
3337
});
3438
let header_map_path = header_map_base_path.join("header_map");
3539

40+
info!("header_map_path: {}", header_map_path.display());
3641
// use a smaller system page cache here since we are using sled as a temporary storage,
3742
// most of the time we will only read header from memory.
3843
let db: Db = Config::new()
@@ -42,14 +47,21 @@ impl SledBackend {
4247
.open()
4348
.expect("failed to open a key-value database to save header map into disk");
4449

45-
Self {
46-
db,
47-
_tmpdir,
48-
count: AtomicUsize::new(0),
50+
let count = AtomicUsize::new(db.len());
51+
let header_map = Self { db, _tmpdir, count };
52+
if let Some(shared_best_header) = header_map.load_shared_best_header() {
53+
info!(
54+
"found shared_best_header in SledBackend: {:?}",
55+
shared_best_header.number_and_hash()
56+
);
57+
header_map.count.fetch_sub(1, Ordering::SeqCst);
4958
}
59+
info!("SledBackend have {} items", header_map.len());
60+
61+
header_map
5062
}
5163

52-
fn len(&self) -> usize {
64+
pub fn len(&self) -> usize {
5365
self.count.load(Ordering::SeqCst)
5466
}
5567

@@ -118,4 +130,33 @@ impl SledBackend {
118130
self.count.fetch_sub(1, Ordering::SeqCst);
119131
}
120132
}
133+
134+
pub fn load_shared_best_header(&self) -> Option<HeaderIndexView> {
135+
self.db
136+
.get(SHARED_BEST_HEADER_KEY)
137+
.unwrap_or_else(|err| {
138+
panic!("read shared best header from disk should be ok, but {err}")
139+
})
140+
.map(|slice| {
141+
if slice.len() < 32 {
142+
panic!(
143+
"stored shared best header should contain hash and payload, len {}",
144+
slice.len()
145+
);
146+
}
147+
let (hash, payload) = slice.split_at(32);
148+
HeaderIndexView::from_slice_should_be_ok(hash, payload)
149+
})
150+
}
151+
152+
pub fn store_shared_best_header(&self, header: &HeaderIndexView) {
153+
let hash = header.hash();
154+
let payload = header.to_vec();
155+
let mut buf = Vec::with_capacity(32 + payload.len());
156+
buf.extend_from_slice(hash.as_slice());
157+
buf.extend_from_slice(&payload);
158+
self.db
159+
.insert(SHARED_BEST_HEADER_KEY, buf)
160+
.expect("failed to persist shared best header to sled");
161+
}
121162
}

shared/src/types/header_map/kernel_lru.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ use std::path;
22
use std::sync::Arc;
33
use std::sync::atomic::{AtomicBool, Ordering};
44

5-
use ckb_logger::info;
6-
#[cfg(feature = "stats")]
75
use ckb_logger::info;
86
use ckb_metrics::HistogramTimer;
97
#[cfg(feature = "stats")]
108
use ckb_util::{Mutex, MutexGuard};
9+
use ckb_util::{RwLock, RwLockReadGuard};
1110

12-
use ckb_types::packed::Byte32;
11+
use ckb_types::{U256, core::EpochNumberWithFraction, packed::Byte32};
1312

1413
use super::{MemoryMap, SledBackend};
1514
use crate::types::HeaderIndexView;
@@ -21,6 +20,7 @@ pub(crate) struct HeaderMapKernel {
2120
memory_limit: usize,
2221
// if ckb is in IBD mode, don't shrink memory map
2322
ibd_finished: Arc<AtomicBool>,
23+
shared_best_header: RwLock<HeaderIndexView>,
2424
// Statistics
2525
#[cfg(feature = "stats")]
2626
stats: Mutex<HeaderMapKernelStats>,
@@ -54,6 +54,8 @@ impl Drop for HeaderMapKernel {
5454
self.memory
5555
.remove_batch(items.iter().map(|item| item.hash()), false);
5656
}
57+
let best_header = self.shared_best_header.read().clone();
58+
self.backend.store_shared_best_header(&best_header);
5759
info!("HeaderMap persisted all items to backend");
5860
}
5961
}
@@ -69,6 +71,11 @@ impl HeaderMapKernel {
6971
{
7072
let memory = Default::default();
7173
let backend = SledBackend::new(tmpdir);
74+
info!("backend is empty: {}", backend.is_empty());
75+
let shared_best_header_value = backend
76+
.load_shared_best_header()
77+
.unwrap_or_else(Self::default_shared_best_header);
78+
let shared_best_header = RwLock::new(shared_best_header_value);
7279

7380
#[cfg(not(feature = "stats"))]
7481
{
@@ -77,6 +84,7 @@ impl HeaderMapKernel {
7784
backend,
7885
memory_limit,
7986
ibd_finished,
87+
shared_best_header,
8088
}
8189
}
8290

@@ -87,6 +95,7 @@ impl HeaderMapKernel {
8795
backend,
8896
memory_limit,
8997
ibd_finished,
98+
shared_best_header,
9099
stats: Mutex::new(HeaderMapKernelStats::new(50_000)),
91100
}
92101
}
@@ -192,6 +201,43 @@ impl HeaderMapKernel {
192201
}
193202
}
194203

204+
pub(crate) fn shared_best_header(&self) -> HeaderIndexView {
205+
self.shared_best_header.read().clone()
206+
}
207+
208+
pub(crate) fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
209+
self.shared_best_header.read()
210+
}
211+
212+
pub(crate) fn set_shared_best_header(&self, header: HeaderIndexView) {
213+
if let Some(metrics) = ckb_metrics::handle() {
214+
metrics.ckb_shared_best_number.set(header.number() as i64);
215+
}
216+
*self.shared_best_header.write() = header;
217+
}
218+
219+
pub(crate) fn may_set_shared_best_header(&self, header: HeaderIndexView) {
220+
{
221+
let current = self.shared_best_header.read();
222+
if !header.is_better_than(current.total_difficulty()) {
223+
return;
224+
}
225+
}
226+
227+
self.set_shared_best_header(header);
228+
}
229+
230+
fn default_shared_best_header() -> HeaderIndexView {
231+
HeaderIndexView::new(
232+
Byte32::zero(),
233+
0,
234+
EpochNumberWithFraction::from_full_value(0),
235+
0,
236+
Byte32::default(),
237+
U256::zero(),
238+
)
239+
}
240+
195241
#[cfg(feature = "stats")]
196242
fn trace(&self) {
197243
let mut stats = self.stats();

shared/src/types/header_map/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::time::Duration;
88
use std::{mem::size_of, path};
99

1010
use ckb_metrics::HistogramTimer;
11+
use ckb_util::RwLockReadGuard;
1112
use tokio::time::MissedTickBehavior;
1213

1314
mod backend_sled;
@@ -111,4 +112,20 @@ impl HeaderMap {
111112

112113
self.inner.remove(hash)
113114
}
115+
116+
pub fn shared_best_header(&self) -> HeaderIndexView {
117+
self.inner.shared_best_header()
118+
}
119+
120+
pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
121+
self.inner.shared_best_header_ref()
122+
}
123+
124+
pub fn set_shared_best_header(&self, header: HeaderIndexView) {
125+
self.inner.set_shared_best_header(header)
126+
}
127+
128+
pub fn may_set_shared_best_header(&self, header: HeaderIndexView) {
129+
self.inner.may_set_shared_best_header(header)
130+
}
114131
}

sync/src/synchronizer/headers_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl<'a> HeadersProcess<'a> {
6262
fn debug(&self) {
6363
if log_enabled!(Level::Debug) {
6464
// Regain the updated best known
65-
let shared_best_known = self.synchronizer.shared.state().shared_best_header();
65+
let shared_best_known = self.synchronizer.shared.shared_best_header();
6666
let peer_best_known = self.synchronizer.peers().get_best_known_header(self.peer);
6767
debug!(
6868
"chain: num={}, diff={:#x};",

sync/src/synchronizer/mod.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ impl BlockFetchCMD {
120120
CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
121121
CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
122122
CanStart::MinWorkNotReach => {
123-
let best_known = self.sync_shared.state().shared_best_header_ref();
123+
let best_known = self.sync_shared.shared_best_header_ref();
124124
let number = best_known.number();
125125
if number != self.number && (number - self.number) % 10000 == 0 {
126126
self.number = number;
@@ -134,9 +134,8 @@ impl BlockFetchCMD {
134134
}
135135
}
136136
CanStart::AssumeValidNotFound => {
137-
let state = self.sync_shared.state();
138137
let shared = self.sync_shared.shared();
139-
let best_known = state.shared_best_header_ref();
138+
let best_known = self.sync_shared.shared_best_header_ref();
140139
let number = best_known.number();
141140
let assume_valid_target: Byte32 = shared
142141
.assume_valid_targets()
@@ -192,7 +191,7 @@ impl BlockFetchCMD {
192191
.genesis_block()
193192
.header()
194193
.timestamp();
195-
let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
194+
let shared_best_timestamp = self.sync_shared.shared_best_header().timestamp();
196195

197196
let ckb_process_start_timestamp = self.start_timestamp;
198197

@@ -236,10 +235,9 @@ impl BlockFetchCMD {
236235
}
237236

238237
let shared = self.sync_shared.shared();
239-
let state = self.sync_shared.state();
240238

241239
let min_work_reach = |flag: &mut CanStart| {
242-
if state.min_chain_work_ready() {
240+
if self.sync_shared.min_chain_work_ready() {
243241
*flag = CanStart::AssumeValidNotFound;
244242
}
245243
};
@@ -278,7 +276,7 @@ impl BlockFetchCMD {
278276
None => {
279277
// Best known already not in the scope of ibd, it means target is invalid
280278
if unix_time_as_millis()
281-
.saturating_sub(state.shared_best_header_ref().timestamp())
279+
.saturating_sub(self.sync_shared.shared_best_header_ref().timestamp())
282280
< MAX_TIP_AGE
283281
{
284282
warn!(
@@ -428,7 +426,7 @@ impl Synchronizer {
428426
active_chain.total_difficulty().to_owned(),
429427
)
430428
};
431-
let best_known = self.shared.state().shared_best_header();
429+
let best_known = self.shared.shared_best_header();
432430
// is_better_chain
433431
if total_difficulty > *best_known.total_difficulty() {
434432
(header, total_difficulty).into()
@@ -782,7 +780,7 @@ impl Synchronizer {
782780
.unwrap();
783781
self.fetch_channel = Some(sender);
784782
let thread = ::std::thread::Builder::new();
785-
let number = self.shared.state().shared_best_header_ref().number();
783+
let number = self.shared.shared_best_header_ref().number();
786784
const THREAD_NAME: &str = "BlockDownload";
787785
let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
788786
let blockdownload_jh = thread

0 commit comments

Comments
 (0)