Skip to content

Commit b11407d

Browse files
committed
messages
1 parent 3777139 commit b11407d

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

src/datatrove/pipeline/dedup/fast_mh3/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
indicatif = "0.17.7"
78
# AWS SDK
89
aws-config = { version = "1.1.1", features = ["behavior-version-latest"] }
910
aws-sdk-s3 = "1.1.1"

src/datatrove/pipeline/dedup/fast_mh3/src/main.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
88
use clap::Parser;
99
use tokio::task;
1010
use std::sync::{Arc, Mutex};
11+
use indicatif::{ProgressBar, ProgressStyle};
1112

1213
#[derive(Parser, Debug)]
1314
#[command(version, about, long_about = None)]
@@ -277,6 +278,12 @@ async fn process_post_union(
277278
// Create sorted list of nodes
278279
let mut nodes: Vec<(u32, u32)> = union_find.union_set.keys().cloned().collect();
279280
nodes.sort();
281+
println!("Writing output files...");
282+
let pb = ProgressBar::new(nodes.len() as u64);
283+
pb.set_style(ProgressStyle::default_bar()
284+
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")
285+
.unwrap()
286+
.progress_chars("#>-"));
280287

281288
for node in nodes {
282289
let (file, doc) = node;
@@ -345,7 +352,9 @@ async fn process_post_union(
345352
if node == p {
346353
clusters += 1;
347354
}
355+
pb.inc(1);
348356
}
357+
pb.finish_with_message("Output writing complete");
349358

350359
// Finalize all writers
351360
for (_, writer) in writers {
@@ -371,16 +380,25 @@ async fn main() -> Result<()> {
371380

372381
let mut handles = Vec::new();
373382

383+
println!("Processing {} input files...", files.len());
384+
let pb = ProgressBar::new(files.len() as u64);
385+
pb.set_style(ProgressStyle::default_bar()
386+
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")
387+
.unwrap()
388+
.progress_chars("#>-"));
389+
374390
for file_path in files {
375391
let client = client.clone();
376392
let union_find = Arc::clone(&union_find);
393+
let pb = pb.clone();
377394

378395
let handle = task::spawn(async move {
379396
let tuples = download_and_parse_file(&client, &file_path).await?;
380397
let mut uf = union_find.lock().unwrap();
381398
for (f1, d1, f2, d2) in tuples {
382399
uf.union((f1, d1), (f2, d2));
383400
}
401+
pb.inc(1);
384402
Ok::<(), anyhow::Error>(())
385403
});
386404

@@ -390,6 +408,7 @@ async fn main() -> Result<()> {
390408
for handle in handles {
391409
handle.await??;
392410
}
411+
pb.finish_with_message("File processing complete");
393412

394413
let union_find = match Arc::try_unwrap(union_find) {
395414
Ok(mutex) => mutex.into_inner().unwrap(),

0 commit comments

Comments
 (0)