Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXP: Show multisig loading issue #202

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f1edc2c
init changes
bluegenes Sep 27, 2023
cb702b3
compiling code using newer mastiff branch
bluegenes Sep 27, 2023
db318ba
use selection
bluegenes Oct 2, 2023
d4393b1
Merge branch 'main' into upd-smash-core
bluegenes Nov 5, 2023
07c8362
rustfmt
bluegenes Nov 5, 2023
6975a72
Merge branch 'main' into upd-smash-core
bluegenes Nov 17, 2023
ff48469
update deps
luizirber Nov 22, 2023
ea88f20
Merge branch 'main' into upd-smash-core
bluegenes Nov 22, 2023
ddf6c8c
update to sourmash 0.12.0
luizirber Dec 1, 2023
b48ac88
fix index
luizirber Dec 1, 2023
f1145a1
rm reporting line checks not in smash core idx
bluegenes Dec 2, 2023
a6785dc
use selection instead of template
bluegenes Dec 7, 2023
5dd52f2
Merge branch 'main' into upd-smash-core
bluegenes Jan 23, 2024
21f20ca
rustfmt
bluegenes Jan 23, 2024
45b598f
fix query file no exist errs
bluegenes Jan 23, 2024
34ed1bb
update mastiff_manygather
bluegenes Jan 23, 2024
243d106
rustfmt
bluegenes Jan 23, 2024
cee5597
Merge branch 'main' into upd-smash-core
bluegenes Jan 23, 2024
87219aa
add cargo lock
bluegenes Jan 23, 2024
2fcf684
switch to commit in latest br
bluegenes Jan 24, 2024
86d6c16
cleanup unused imports and code
bluegenes Jan 24, 2024
13940cd
init use collection for query loading
bluegenes Jan 25, 2024
cd8be99
...collection loading in progress
bluegenes Jan 26, 2024
32fc2d5
fix fastgather
bluegenes Jan 26, 2024
39fb7dc
re-enable more permissive pathlist loading
bluegenes Jan 26, 2024
15f7dba
clean up ms
bluegenes Jan 26, 2024
4e3b7ee
harmonize errors
bluegenes Jan 26, 2024
14ee1bd
harmonize error text and output filenames
bluegenes Jan 27, 2024
b5a175e
Merge branch 'main' into upd-smash-core
bluegenes Jan 30, 2024
363b90d
re-allow load from sig; upd manysearch
bluegenes Jan 30, 2024
0ea39b5
fix all except moltype selection
bluegenes Jan 31, 2024
912f717
update fastgather and multisearch
bluegenes Jan 31, 2024
c518e83
Merge branch 'upd-smash-core' into use-core-more-broadly
bluegenes Jan 31, 2024
f5216f8
update pairwise
bluegenes Jan 31, 2024
893e0a7
clean up a little
bluegenes Jan 31, 2024
dbdff4a
clean up; unify sketch loading for pairwise/multisearch
bluegenes Feb 1, 2024
ab339ba
...cleaner
bluegenes Feb 2, 2024
f769aee
unify more code
bluegenes Feb 2, 2024
8d7781c
rm unused save_paths option
bluegenes Feb 2, 2024
b6ebc7a
use updated mh loading
bluegenes Feb 2, 2024
a463ac8
standardize indexed writing using local struct for now
bluegenes Feb 2, 2024
c7b865b
clean up sketch loading and file opening/writing
bluegenes Feb 2, 2024
14af130
apply clippy suggestions
bluegenes Feb 2, 2024
13c96d1
add back SmallSignature and use
bluegenes Feb 2, 2024
2453c9b
rename fn back to load_sketches
bluegenes Feb 2, 2024
0086b3a
use serde serialize for writing instead of custom traits
bluegenes Feb 2, 2024
2237619
narrow down multisketch issue
bluegenes Feb 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
748 changes: 245 additions & 503 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ crate-type = ["cdylib"]
pyo3 = { version = "0.20.2", features = ["extension-module", "anyhow"] }
rayon = "1.8.1"
serde = { version = "1.0.196", features = ["derive"] }
sourmash = { git = "https://github.com/sourmash-bio/sourmash", "rev" = "ff1092f8f366339caa59d7203f623813228f4356" }
sourmash = { git = "https://github.com/sourmash-bio/sourmash", rev= "409aeb415ba8b04b9c09f203817d67791afa96da", features = ["branchwater"] }
#sourmash = { version = "0.12.1", features = ["branchwater"] }
serde_json = "1.0.113"
niffler = "2.4.0"
log = "0.4.14"
env_logger = "0.10.2"
simple-error = "0.3.0"
anyhow = "1.0.79"
zip = "0.6"
zip = { version = "0.6", default-features = false, features = ["deflate"] }
tempfile = "3.9"
needletail = "0.5.1"
csv = "1.3.0"
camino = "1.1.6"

[dev-dependencies]
assert_cmd = "2.0.13"
Expand Down
15 changes: 5 additions & 10 deletions src/check.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
use std::path::Path;

use crate::utils::is_revindex_database;

use sourmash::index::revindex::RevIndex;
use sourmash::index::revindex::{RevIndex, RevIndexOps};

pub fn check<P: AsRef<Path>>(index: P, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
if !is_revindex_database(index.as_ref()) {
bail!(
"'{}' is not a valid RevIndex database",
index.as_ref().display()
);
pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
if !is_revindex_database(&index) {
bail!("'{}' is not a valid RevIndex database", index);
}

println!("Opening DB");
let db = RevIndex::open(index.as_ref(), true);
let db = RevIndex::open(index, true)?;

println!("Starting check");
db.check(quick);
Expand Down
85 changes: 41 additions & 44 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
/// fastgather: Run gather with a query against a list of files.
use anyhow::Result;

use sourmash::signature::Signature;
use sourmash::sketch::Sketch;
use std::path::Path;
use sourmash::prelude::Select;
use sourmash::selection::Selection;

use crate::utils::{
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_above_threshold,
prepare_query, write_prefetch, ReportType,
consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch,
ReportType,
};

pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
query_filename: P,
matchlist_filename: P,
pub fn fastgather(
query_filepath: String,
against_filepath: String,
threshold_bp: usize,
ksize: u8,
scaled: usize,
template: Sketch,
gather_output: Option<P>,
prefetch_output: Option<P>,
selection: &Selection,
gather_output: Option<String>,
prefetch_output: Option<String>,
allow_failed_sigpaths: bool,
) -> Result<()> {
let location = query_filename.to_string();
eprintln!("Loading query from '{}'", location);
let query = {
let sigs = Signature::from_path(query_filename)?;
let query_collection = load_collection(
&query_filepath,
selection,
ReportType::Query,
allow_failed_sigpaths,
)?;

prepare_query(&sigs, &template, &location)
};
// did we find anything matching the desired template?
let query = match query {
Some(query) => query,
None => bail!("No sketch found with scaled={}, k={}", scaled, ksize),
if query_collection.len() != 1 {
bail!(
"Fastgather requires a single query sketch. Check input: '{:?}'",
&query_filepath
)
}
// get single query sig and minhash
let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum
let query_sig_ds = query_sig.clone().select(selection)?; // downsample
let query_mh = match query_sig_ds.minhash() {
Some(query_mh) => query_mh,
None => {
bail!("No query sketch matching selection parameters.");
}
};

// build the list of paths to match against.
eprintln!(
"Loading matchlist from '{}'",
matchlist_filename.as_ref().display()
);

let matchlist_filename = matchlist_filename.as_ref().to_string_lossy().to_string();
let (matchlist_paths, _temp_dir) =
load_sigpaths_from_zip_or_pathlist(matchlist_filename, &template, ReportType::Against)?;

eprintln!("Loaded {} sig paths in matchlist", matchlist_paths.len());
// load collection to match against.
let against_collection = load_collection(
&against_filepath,
selection,
ReportType::Against,
allow_failed_sigpaths,
)?;

// calculate the minimum number of hashes based on desired threshold
let threshold_hashes: u64 = {
Expand All @@ -62,16 +65,10 @@ pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
);

// load a set of sketches, filtering for those with overlaps > threshold
let result = load_sketches_above_threshold(
matchlist_paths,
&template,
&query.minhash,
threshold_hashes,
)?;
let result = load_sketches_above_threshold(against_collection, query_mh, threshold_hashes)?;
let matchlist = result.0;
let skipped_paths = result.1;
let failed_paths = result.2;

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} search paths - no compatible signatures.",
Expand All @@ -91,10 +88,10 @@ pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
}

if prefetch_output.is_some() {
write_prefetch(&query, prefetch_output, &matchlist).ok();
write_prefetch(&query_sig, prefetch_output, &matchlist).ok();
}

// run the gather!
consume_query_by_gather(query, matchlist, threshold_hashes, gather_output).ok();
consume_query_by_gather(query_sig, matchlist, threshold_hashes, gather_output).ok();
Ok(())
}
155 changes: 80 additions & 75 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,35 @@
use anyhow::Result;
use rayon::prelude::*;

use sourmash::signature::Signature;
use sourmash::sketch::Sketch;
use std::path::Path;
use sourmash::selection::Selection;

use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use std::collections::BinaryHeap;

use camino::Utf8Path as PathBuf;

use crate::utils::{
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist,
load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType,
consume_query_by_gather, load_collection, load_sketches, write_prefetch, PrefetchResult,
ReportType,
};

pub fn fastmultigather<P: AsRef<Path> + std::fmt::Debug + Clone>(
query_filenames: P,
matchlist_filename: P,
pub fn fastmultigather(
query_filepath: String,
against_filepath: String,
threshold_bp: usize,
scaled: usize,
template: Sketch,
selection: &Selection,
allow_failed_sigpaths: bool,
) -> Result<()> {
// load the list of query paths
let queryfile_name = query_filenames.as_ref().to_string_lossy().to_string();
let (querylist_paths, _temp_dir) =
load_sigpaths_from_zip_or_pathlist(&query_filenames, &template, ReportType::Query)?;
println!("Loaded {} sig paths in querylist", querylist_paths.len());
// load query collection
let query_collection = load_collection(
&query_filepath,
selection,
ReportType::Query,
allow_failed_sigpaths,
)?;

let threshold_hashes: u64 = {
let x = threshold_bp / scaled;
Expand All @@ -42,80 +45,82 @@ pub fn fastmultigather<P: AsRef<Path> + std::fmt::Debug + Clone>(

println!("threshold overlap: {} {}", threshold_hashes, threshold_bp);

// Load all the against sketches
let sketchlist =
load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?;
// load against collection
let against_collection = load_collection(
&against_filepath,
selection,
ReportType::Against,
allow_failed_sigpaths,
)?;
// load against sketches into memory, downsampling on the way
let against = load_sketches(against_collection, selection, ReportType::Against).unwrap();

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

querylist_paths.par_iter().for_each(|q| {
// increment counter of # of queries
query_collection.par_iter().for_each(|(_idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);

// set query_label to the last path element.
let location = q.clone().into_os_string().into_string().unwrap();
let location = location.split('/').last().unwrap().to_string();

let query = match Signature::from_path(dbg!(q)) {
Ok(sigs) => {
let mm = prepare_query(&sigs, &template, &location);

if mm.is_none() {
if !queryfile_name.ends_with(".zip") {
eprintln!("WARNING: no compatible sketches in path '{}'", q.display());
// Load query sig (downsampling happens here)
match query_collection.sig_from_record(record) {
Ok(query_sig) => {
let prefix = query_sig.name();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
overlap,
};
mm = Some(result);
}
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();
} else {
println!("No matches to '{}'", location);
}
} else {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
mm
}
Err(err) => {
eprintln!("Sketch loading error: {}", err);
Err(_) => {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: could not load sketches from path '{}'",
q.display()
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
None
}
};

if let Some(query) = query {
// filter first set of matches out of sketchlist
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;

if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name.clone(),
md5sum: sm.md5sum.clone(),
minhash: sm.minhash.clone(),
overlap,
};
mm = Some(result);
}
}
mm
})
.collect();

if !matchlist.is_empty() {
let prefetch_output = format!("{location}.prefetch.csv");
let gather_output = format!("{location}.gather.csv");

// save initial list of matches to prefetch output
write_prefetch(&query, Some(prefetch_output), &matchlist).ok();

// now, do the gather!
consume_query_by_gather(query, matchlist, threshold_hashes, Some(gather_output))
.ok();
} else {
println!("No matches to '{}'", location);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
});
Expand Down
Loading
Loading