Skip to content

Commit 25501d4

Browse files
committed
import: Add --num-threads to control parallel threads number
Signed-off-by: Eval EXEC <[email protected]>
1 parent c321fbc commit 25501d4

File tree

8 files changed

+64
-57
lines changed

8 files changed

+64
-57
lines changed

Cargo.lock

Lines changed: 3 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,6 @@ quote = "1.0"
277277
rand = "0.8.5"
278278
rand_distr = "0.4"
279279
rayon = "1.0"
280-
rayon-progress-bar = "1.9"
281280
regex = "1.1.6"
282281
reqwest = { version = "0.12", features = ["blocking", "json"] }
283282
rhai = "1.16.0"

ckb-bin/src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ pub const ARG_FORMAT: &str = "format";
3535
pub const ARG_TARGET: &str = "target";
3636
/// Command line argument `--source`.
3737
pub const ARG_SOURCE: &str = "source";
38+
/// Command line argument `--num-threads`.
39+
pub const ARG_NUM_THREADS: &str = "num-threads";
3840
/// Command line flag: `--skip-script-verify`.
3941
pub const ARG_SKIP_SCRIPT_VERIFY: &str = "skip-script-verify";
4042
/// Command line flag: `--skip-all-verify`.
@@ -410,6 +412,14 @@ fn export() -> Command {
410412
fn import() -> Command {
411413
Command::new(CMD_IMPORT)
412414
.about("Import CKB data")
415+
.arg(
416+
Arg::new(ARG_NUM_THREADS)
417+
.long(ARG_NUM_THREADS)
418+
.value_name(ARG_NUM_THREADS)
419+
.value_parser(clap::value_parser!(usize))
420+
.required(false)
421+
.help("Specify the number of threads to use for parallel processing. If not specified, it will use the number of logical CPUs available on the system."),
422+
)
413423
.arg(
414424
Arg::new(ARG_SOURCE)
415425
.index(1)

ckb-bin/src/setup.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ H256::from_str(&target[2..]).expect("default assume_valid_target for testnet mus
215215
pub fn import(self, matches: &ArgMatches) -> Result<ImportArgs, ExitCode> {
216216
let consensus = self.consensus()?;
217217
let config = self.config.into_ckb()?;
218+
let num_threads = matches
219+
.get_one::<usize>(cli::ARG_NUM_THREADS)
220+
.cloned()
221+
.unwrap_or(0);
218222
let source = {
219223
let source = matches
220224
.get_one::<String>(cli::ARG_SOURCE)
@@ -252,6 +256,7 @@ H256::from_str(&target[2..]).expect("default assume_valid_target for testnet mus
252256
consensus,
253257
source,
254258
switch,
259+
num_threads,
255260
})
256261
}
257262

ckb-bin/src/subcommand/import.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@ pub fn import(args: ImportArgs, async_handle: Handle) -> Result<(), ExitCode> {
2020
pack.take_tx_pool_builder();
2121
pack.take_relay_tx_receiver();
2222

23-
Import::new(chain_controller, shared, args.source, args.switch)
24-
.execute()
25-
.map_err(|err| {
26-
eprintln!("Import error: {err:?}");
27-
ExitCode::Failure
28-
})
23+
Import::new(
24+
chain_controller,
25+
shared,
26+
args.source,
27+
args.switch,
28+
args.num_threads,
29+
)
30+
.execute()
31+
.map_err(|err| {
32+
eprintln!("Import error: {err:?}");
33+
ExitCode::Failure
34+
})
2935
}

util/app-config/src/args.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ use ckb_verification_traits::Switch;
88
use std::path::PathBuf;
99
use std::sync::Arc;
1010

11+
/// The target directory to save the exported file or stdout.
1112
pub enum ExportTarget {
13+
/// The path to the file to be exported.
1214
Path(PathBuf),
15+
/// Export to stdout.
1316
Stdout,
1417
}
1518

@@ -38,8 +41,11 @@ pub struct DaemonArgs {
3841
pub pid_file: PathBuf,
3942
}
4043

44+
/// The source of the file to be imported.
4145
pub enum ImportSource {
46+
/// The path to the file to be imported.
4247
Path(PathBuf),
48+
/// Import from stdin, the file content must be encoded by base64.
4349
Stdin,
4450
}
4551

@@ -53,6 +59,8 @@ pub struct ImportArgs {
5359
pub source: ImportSource,
5460
/// The switch to control the verification behavior.
5561
pub switch: Switch,
62+
/// The number of threads to use for parallel processing.
63+
pub num_threads: usize,
5664
}
5765

5866
/// Parsed command line arguments for `ckb run`.

util/instrument/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@ repository = "https://github.com/nervosnetwork/ckb"
1212
ckb-types.workspace = true
1313
ckb-chain.workspace = true
1414
ckb-store.workspace = true
15-
ckb-chain-iter.workspace = true
1615
ckb-shared.workspace = true
1716
ckb-channel.workspace = true
1817
ckb-jsonrpc-types.workspace = true
1918
ckb-verification-traits.workspace = true
2019
serde_json.workspace = true
2120
indicatif = { workspace = true, optional = true }
2221
rayon.workspace = true
23-
rayon-progress-bar.workspace = true
2422
itertools.workspace = true
2523
ckb-app-config.workspace = true
2624

util/instrument/src/import.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub struct Import {
2121
shared: Shared,
2222
chain: ChainController,
2323
switch: Switch,
24+
num_threads: usize,
2425
}
2526

2627
impl Import {
@@ -30,12 +31,14 @@ impl Import {
3031
shared: Shared,
3132
source: ImportSource,
3233
switch: Switch,
34+
num_threads: usize,
3335
) -> Self {
3436
Import {
3537
chain,
3638
shared,
3739
source,
3840
switch,
41+
num_threads,
3942
}
4043
}
4144

@@ -78,7 +81,7 @@ impl Import {
7881
}
7982

8083
let f: Box<dyn Read + Send> = match &self.source {
81-
ImportSource::Path(source) => Box::new(fs::File::open(&source)?),
84+
ImportSource::Path(source) => Box::new(fs::File::open(source)?),
8285
ImportSource::Stdin => {
8386
// read from stdin
8487
Box::new(std::io::stdin())
@@ -131,10 +134,12 @@ impl Import {
131134
}
132135
}
133136

134-
#[cfg(feature = "progress_bar")]
135137
let progress_bar = {
136138
let bar = match &self.source {
137-
ImportSource::Path(source) => ProgressBar::new(fs::metadata(&source)?.len()),
139+
ImportSource::Path(source) => {
140+
let file_size = fs::metadata(source)?.len();
141+
ProgressBar::new(file_size)
142+
}
138143
ImportSource::Stdin => ProgressBar::new_spinner(),
139144
};
140145
let style = ProgressStyle::default_bar()
@@ -146,12 +151,13 @@ impl Import {
146151

147152
let mut largest_block_number = 0;
148153
const BLOCKS_COUNT_PER_CHUNK: usize = 1024 * 6;
149-
let (blocks_tx, blocks_rx) = ckb_channel::bounded::<Arc<BlockView>>(BLOCKS_COUNT_PER_CHUNK);
154+
let (blocks_tx, blocks_rx) =
155+
ckb_channel::bounded::<(Arc<BlockView>, usize)>(BLOCKS_COUNT_PER_CHUNK);
150156
std::thread::spawn({
151-
#[cfg(feature = "progress_bar")]
152-
let progress_bar = progress_bar.clone();
157+
let num_threads = self.num_threads;
153158
move || {
154159
let pool = rayon::ThreadPoolBuilder::new()
160+
.num_threads(num_threads)
155161
.build()
156162
.expect("rayon thread pool must build");
157163
pool.install(|| {
@@ -168,33 +174,35 @@ impl Import {
168174
let block: JsonBlock =
169175
serde_json::from_str(line).expect("parse block from json");
170176
let block: Arc<core::BlockView> = Arc::new(block.into());
171-
blocks_tx.send(block).expect("send block to channel");
172-
173-
#[cfg(feature = "progress_bar")]
174-
progress_bar.inc(line.len() as u64);
177+
blocks_tx
178+
.send((block, line.len()))
179+
.expect("send block to channel");
175180
});
176181
}
177182
drop(blocks_tx);
178183
});
179184
}
180185
});
181186

182-
let callback = |verify_result: VerifyResult| {
183-
if let Err(err) = verify_result {
184-
eprintln!("Error verifying block: {:?}", err);
185-
}
186-
};
187-
188-
for block in blocks_rx {
187+
for (block, block_size) in blocks_rx {
189188
if !block.is_genesis() {
190189
use ckb_chain::LonelyBlock;
191190

192191
largest_block_number = largest_block_number.max(block.number());
193192

193+
let progress_bar = progress_bar.clone();
194+
let callback = Box::new(move |verify_result: VerifyResult| {
195+
if let Err(err) = verify_result {
196+
eprintln!("Error verifying block: {:?}", err);
197+
} else {
198+
progress_bar.inc(block_size as u64);
199+
}
200+
});
201+
194202
let lonely_block = LonelyBlock {
195203
block,
196204
switch: Some(self.switch),
197-
verify_callback: Some(Box::new(callback)),
205+
verify_callback: Some(callback),
198206
};
199207
self.chain.asynchronous_process_lonely_block(lonely_block);
200208
}
@@ -209,7 +217,6 @@ impl Import {
209217
std::thread::sleep(std::time::Duration::from_secs(1));
210218
}
211219

212-
#[cfg(feature = "progress_bar")]
213220
progress_bar.finish_with_message("done!");
214221
Ok(())
215222
}

0 commit comments

Comments
 (0)