Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use rocksdb instead of in-memory map to reduce memory footprint #638

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
utoipa-swagger-ui = { version = "8.0", features = ["actix-web"] }
utoipa = { version = "5.2", features = ["actix_extras", "chrono", "indexmap", "preserve_order", "yaml"] }
tempfile = "3.15.0"

[dependencies.noodles]
version = "0.77.0"
Expand Down
144 changes: 90 additions & 54 deletions src/clinvar_genes/cli/import.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! Import of minimal ClinVar data.

use std::{collections::HashSet, io::BufRead, sync::Arc};

use clap::Parser;
use prost::Message;

use crate::common;
use crate::pbs::clinvar::per_gene::{ClinvarPerGeneRecord, ExtractedVariantsPerRelease};
use crate::pbs::clinvar_data::class_by_freq::GeneCoarseClinsigFrequencyCounts;
use crate::pbs::clinvar_data::extracted_vars::ExtractedVcvRecord;
use crate::pbs::clinvar_data::gene_impact::GeneImpactCounts;
use clap::Parser;
use prost::Message;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::path::Path;
use std::{collections::HashSet, io::BufRead, sync::Arc};

/// Command line arguments for `tsv import` sub command.
#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -86,17 +86,23 @@
Ok(result)
}

type PerRelease = indexmap::IndexMap<String, Vec<ExtractedVcvRecord>>;
type PerGene = indexmap::IndexMap<String, PerRelease>;
type VariantsPerGeneDb = DBWithThreadMode<MultiThreaded>;
type Releases = HashSet<String>;

/// Load per-gene sequence variants.

Check warning on line 93 in src/clinvar_genes/cli/import.rs

View workflow job for this annotation

GitHub Actions / clippy

empty line after doc comment

warning: empty line after doc comment --> src/clinvar_genes/cli/import.rs:92:1 | 92 | / /// Load per-gene sequence variants. 93 | | | |_^ 94 | / fn load_variants_jsonl( 95 | | variant_jsonls: &[String], 96 | | db_path: impl AsRef<Path>, 97 | | options: &rocksdb::Options, 98 | | ) -> Result<(VariantsPerGeneDb, Releases), anyhow::Error> { | |_________________________________________________________- the comment documents this function | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_line_after_doc_comments = note: `#[warn(clippy::empty_line_after_doc_comments)]` on by default = help: if the empty line is unintentional remove it
fn load_variants_jsonl(
variant_jsonls: &[String],
) -> Result<indexmap::IndexMap<String, Vec<ExtractedVariantsPerRelease>>, anyhow::Error> {
// Build intermediate data structure using nested maps.
let mut per_gene: PerGene = Default::default();
db_path: impl AsRef<Path>,
options: &rocksdb::Options,
) -> Result<(VariantsPerGeneDb, Releases), anyhow::Error> {
tracing::info!("creating temporary RocksDB at {:?}", db_path.as_ref());

let db: VariantsPerGeneDb = rocksdb::DB::open(options, db_path)?;
let mut releases = Releases::default();

for path_jsonl in variant_jsonls {
tracing::info!("loading variants from {}", path_jsonl);
let reader: Box<dyn std::io::Read> = if path_jsonl.ends_with(".gz") {
Box::new(flate2::read::GzDecoder::new(std::fs::File::open(
path_jsonl,
Expand All @@ -109,51 +115,51 @@

for line in reader.lines() {
let line = line?;
let input_record = serde_json::from_str::<ExtractedVcvRecord>(&line);
match input_record {
Err(e) => {
tracing::warn!("skipping line because of error: {}", e);
continue;
}
Ok(input_record) => {
for hgnc_id in &input_record.hgnc_ids {
let per_gene_entry = per_gene.entry(hgnc_id.clone()).or_default();
let release = input_record
.sequence_location
.as_ref()
.expect("missing sequence_location")
.assembly
.clone();
let per_release_entry = per_gene_entry.entry(release.clone()).or_default();
per_release_entry.push(input_record.clone());
}
let input_record: ExtractedVcvRecord = serde_json::from_str(&line).map_err(|e| {
tracing::warn!("skipping line because of error: {}", e);
e
})?;

for hgnc_id in &input_record.hgnc_ids {
let release = input_record
.sequence_location
.as_ref()
.expect("missing sequence_location")
.assembly
.clone();

// Add the release to the set, so we can later iterate over it
releases.insert(release.clone());

// Create a key for RocksDB
let key = format!("{}:{}", hgnc_id, release);
let mut variants: Vec<ExtractedVcvRecord> = vec![];

// Retrieve existing data and deserialize it
if let Some(existing_data) = db.get(&key)? {
variants = serde_json::from_slice(&existing_data)?;
}
}
}
}

// Convert into final data structure that uses lists of entry records rather than nested maps.
let mut result = indexmap::IndexMap::new();
for (hgnc_id, per_release) in per_gene {
let mut per_gene_out = Vec::new();
for (release, extracted_vars) in per_release {
let mut variants = extracted_vars;
variants.sort_by(|a, b| {
a.accession
.as_ref()
.expect("no accession")
.accession
.cmp(&b.accession.as_ref().expect("no accession").accession)
});
per_gene_out.push(ExtractedVariantsPerRelease {
release: Some(release),
variants,
});
// Add the new variant
variants.push(input_record.clone());

// Sort the variants by accession
variants.sort_by(|a, b| {
a.accession
.as_ref()
.expect("no accession")
.accession
.cmp(&b.accession.as_ref().expect("no accession").accession)
});

// Serialize and store back in RocksDB
let serialized_data = serde_json::to_vec(&variants)?;
db.put(key, serialized_data)?;
}
}
result.insert(hgnc_id, per_gene_out);
}

Ok(result)
Ok((db, releases))
}

/// Perform import of the JSONL files.
Expand All @@ -179,34 +185,64 @@
);
tracing::info!("Loading variants per gene ...");
let before_vars = std::time::Instant::now();
let vars_per_gene = load_variants_jsonl(&args.paths_variant_jsonl)?;
let options = rocksdb_utils_lookup::tune_options(
rocksdb::Options::default(),
args.path_wal_dir.as_ref().map(|s| s.as_ref()),
);
let tempdir = tempfile::TempDir::new_in(&args.path_out_rocksdb)?;
let (vars_per_gene_db, releases) =
load_variants_jsonl(&args.paths_variant_jsonl, &tempdir, &options)?;
tracing::info!(
"... done loading variants per gene in {:?}",
&before_vars.elapsed()
);

tracing::info!("Writing to database ...");
let before_write_to_db = std::time::Instant::now();
let db_key_iter = vars_per_gene_db.iterator(rocksdb::IteratorMode::Start);
let mut hgnc_ids = counts_per_impact
.keys()
.cloned()
.chain(counts_per_freq.keys().cloned())
.chain(vars_per_gene.keys().cloned())
.chain(db_key_iter.filter_map(|item| {
if let Ok((key, _value)) = item {
Some(
String::from_utf8(Vec::from(key))
.expect("Failed to convert to string")
.split_once(":")
.expect("Expected hgnc_id:release key format")
.0
.to_string(),
)
} else {
None
}
}))
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
hgnc_ids.sort();

// Read through all records and insert each into the database.
for hgnc_id in hgnc_ids.iter() {
let per_release_vars = vars_per_gene.get(hgnc_id).cloned().unwrap_or_default();
let mut per_release_vars = vec![];
for release in &releases {
let key = format!("{}:{}", hgnc_id, release);
if let Some(buf) = vars_per_gene_db.get(key)? {
let extracted_vcvs: Vec<ExtractedVcvRecord> = serde_json::from_slice(&buf)?;
let per_release_var = ExtractedVariantsPerRelease {
release: Some(release.clone()),
variants: extracted_vcvs,
};
per_release_vars.push(per_release_var);
}
}
let record = ClinvarPerGeneRecord {
per_impact_counts: Some(counts_per_impact.get(hgnc_id).cloned().unwrap_or_default()),
per_freq_counts: Some(counts_per_freq.get(hgnc_id).cloned().unwrap_or_default()),
per_release_vars,
};
let buf = record.encode_to_vec();

db.put_cf(&cf_data, hgnc_id, buf)?;
}
tracing::info!(
Expand Down
Loading