Skip to content

Commit

Permalink
Merge pull request #277 from korpling/feature/optimize-token-search
Browse files Browse the repository at this point in the history
Optimize speed of loading corpora into memory
  • Loading branch information
thomaskrause authored Jan 10, 2024
2 parents b1f1dca + cf7df84 commit fb32b9d
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 140 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- New `Graph::ensure_loaded_parallel` function to load needed graph storages in
parallel.

### Fixed

- Do not attempt to unload corpora that are not loaded when trying to free
memory.
- Improve performance of loading a main memory corpus by using the standard
`HashMap` for fields that are deserialized.

## [3.0.0] - 2023-11-28

### Added
Expand Down
14 changes: 7 additions & 7 deletions core/src/annostorage/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::annostorage::ValueSearch;
use crate::errors::Result;
use crate::graph::NODE_NAME_KEY;
use crate::types::{AnnoKey, Annotation, Edge, NodeID};
use crate::util;
use crate::util::{self};
use crate::{annostorage::symboltable::SymbolTable, errors::GraphAnnisCoreError};
use core::ops::Bound::*;
use itertools::Itertools;
use rustc_hash::{FxHashMap, FxHashSet};
use rustc_hash::FxHashSet;
use smartstring::alias::String;
use smartstring::{LazyCompact, SmartString};
use std::borrow::Cow;
Expand All @@ -22,13 +22,13 @@ struct SparseAnnotation {
val: usize,
}

type ValueItemMap<T> = FxHashMap<usize, Vec<T>>;
type ValueItemMap<T> = HashMap<usize, Vec<T>>;

#[derive(Serialize, Deserialize, Clone, Default)]
pub struct AnnoStorageImpl<T: Ord + Hash + Default> {
by_container: FxHashMap<T, Vec<SparseAnnotation>>,
by_container: HashMap<T, Vec<SparseAnnotation>>,
/// A map from an annotation key symbol to a map of all its values to the items having this value for the annotation key
by_anno: FxHashMap<usize, ValueItemMap<T>>,
by_anno: HashMap<usize, ValueItemMap<T>>,
/// Maps a distinct annotation key to the number of elements having this annotation key.
anno_key_sizes: BTreeMap<AnnoKey, usize>,
anno_keys: SymbolTable<AnnoKey>,
Expand All @@ -45,8 +45,8 @@ impl<T: Ord + Hash + Clone + serde::Serialize + serde::de::DeserializeOwned + De
{
pub fn new() -> AnnoStorageImpl<T> {
AnnoStorageImpl {
by_container: FxHashMap::default(),
by_anno: FxHashMap::default(),
by_container: HashMap::default(),
by_anno: HashMap::default(),
anno_keys: SymbolTable::new(),
anno_values: SymbolTable::new(),
anno_key_sizes: BTreeMap::new(),
Expand Down
5 changes: 3 additions & 2 deletions core/src/annostorage/symboltable.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::errors::{GraphAnnisCoreError, Result};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

Expand All @@ -23,14 +24,14 @@ where
let by_id = Vec::default();
SymbolTable {
by_id,
by_value: FxHashMap::default(),
by_value: HashMap::default(),
empty_slots: Vec::default(),
}
}

pub fn after_deserialization(&mut self) {
// restore the by_value map and make sure the smart pointers point to the same instance
//self.by_value.reserve(self.by_id.len());
self.by_value.reserve(self.by_id.len());
for i in 0..self.by_id.len() {
if let Some(ref existing) = self.by_id[i] {
self.by_value.insert(existing.clone(), i);
Expand Down
47 changes: 30 additions & 17 deletions core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,23 +793,7 @@ impl<CT: ComponentType> Graph<CT> {
}
}

// load missing components in parallel
let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
.into_par_iter()
.map(|c| match component_path(&self.location, &c) {
Some(cpath) => {
debug!("loading component {} from {}", c, &cpath.to_string_lossy());
(c, load_component_from_disk(&cpath))
}
None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
})
.collect();

// insert all the loaded components
for (c, gs) in loaded_components {
let gs = gs?;
self.components.insert(c, Some(gs));
}
self.ensure_loaded_parallel(&components_to_load)?;
Ok(())
}

Expand All @@ -833,6 +817,35 @@ impl<CT: ComponentType> Graph<CT> {
Ok(())
}

/// Ensure that the graph storage for a the given component is loaded and ready to use.
/// Loading is done in paralell.
pub fn ensure_loaded_parallel(&mut self, components_to_load: &[Component<CT>]) -> Result<()> {
// We only load known components, so check the map if the entry exists
let components_to_load: Vec<_> = components_to_load
.iter()
.filter(|c| self.components.contains_key(c))
.collect();

// load missing components in parallel
let loaded_components: Vec<(_, Result<Arc<dyn GraphStorage>>)> = components_to_load
.into_par_iter()
.map(|c| match component_path(&self.location, c) {
Some(cpath) => {
debug!("loading component {} from {}", c, &cpath.to_string_lossy());
(c, load_component_from_disk(&cpath))
}
None => (c, Err(GraphAnnisCoreError::EmptyComponentPath)),
})
.collect();

// insert all the loaded components
for (c, gs) in loaded_components {
let gs = gs?;
self.components.insert(c.clone(), Some(gs));
}
Ok(())
}

pub fn optimize_impl(&mut self, disk_based: bool) -> Result<()> {
self.ensure_loaded_all()?;

Expand Down
14 changes: 7 additions & 7 deletions core/src/graph/storage/adjacencylist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ use crate::{

use super::{EdgeContainer, GraphStatistic, GraphStorage, WriteableGraphStorage};
use itertools::Itertools;
use rustc_hash::{FxHashMap, FxHashSet};
use rustc_hash::FxHashSet;
use serde::Deserialize;
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::{ops::Bound, path::Path};

#[derive(Serialize, Deserialize, Clone)]
pub struct AdjacencyListStorage {
edges: FxHashMap<NodeID, Vec<NodeID>>,
inverse_edges: FxHashMap<NodeID, Vec<NodeID>>,
edges: HashMap<NodeID, Vec<NodeID>>,
inverse_edges: HashMap<NodeID, Vec<NodeID>>,
annos: AnnoStorageImpl<Edge>,
stats: Option<GraphStatistic>,
}

fn get_fan_outs(edges: &FxHashMap<NodeID, Vec<NodeID>>) -> Vec<usize> {
fn get_fan_outs(edges: &HashMap<NodeID, Vec<NodeID>>) -> Vec<usize> {
let mut fan_outs: Vec<usize> = Vec::new();
if !edges.is_empty() {
for outgoing in edges.values() {
Expand All @@ -44,8 +44,8 @@ impl Default for AdjacencyListStorage {
impl AdjacencyListStorage {
pub fn new() -> AdjacencyListStorage {
AdjacencyListStorage {
edges: FxHashMap::default(),
inverse_edges: FxHashMap::default(),
edges: HashMap::default(),
inverse_edges: HashMap::default(),
annos: AnnoStorageImpl::new(),
stats: None,
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/graph/storage/dense_adjacency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use crate::{
};
use itertools::Itertools;
use num_traits::ToPrimitive;
use rustc_hash::{FxHashMap, FxHashSet};
use rustc_hash::FxHashSet;
use serde::Deserialize;
use std::{ops::Bound, path::Path};
use std::{collections::HashMap, ops::Bound, path::Path};

#[derive(Serialize, Deserialize, Clone)]
pub struct DenseAdjacencyListStorage {
edges: Vec<Option<NodeID>>,
inverse_edges: FxHashMap<NodeID, Vec<NodeID>>,
inverse_edges: HashMap<NodeID, Vec<NodeID>>,
annos: AnnoStorageImpl<Edge>,
stats: Option<GraphStatistic>,
}
Expand All @@ -31,7 +31,7 @@ impl DenseAdjacencyListStorage {
pub fn new() -> DenseAdjacencyListStorage {
DenseAdjacencyListStorage {
edges: Vec::default(),
inverse_edges: FxHashMap::default(),
inverse_edges: HashMap::default(),
annos: AnnoStorageImpl::new(),
stats: None,
}
Expand Down
20 changes: 14 additions & 6 deletions core/src/graph/storage/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use crate::{
graph::NODE_NAME_KEY,
types::{Edge, NodeID, NumValue},
};
use rustc_hash::FxHashMap;
use rustc_hash::FxHashSet;
use serde::{Deserialize, Serialize};
use std::{clone::Clone, path::Path};
use std::{clone::Clone, collections::HashMap, path::Path};

#[derive(Serialize, Deserialize, Clone)]
struct RelativePosition<PosT> {
Expand All @@ -21,8 +20,8 @@ struct RelativePosition<PosT> {

#[derive(Serialize, Deserialize, Clone)]
pub struct LinearGraphStorage<PosT: NumValue> {
node_to_pos: FxHashMap<NodeID, RelativePosition<PosT>>,
node_chains: FxHashMap<NodeID, Vec<NodeID>>,
node_to_pos: HashMap<NodeID, RelativePosition<PosT>>,
node_chains: HashMap<NodeID, Vec<NodeID>>,
annos: AnnoStorageImpl<Edge>,
stats: Option<GraphStatistic>,
}
Expand All @@ -33,8 +32,8 @@ where
{
pub fn new() -> LinearGraphStorage<PosT> {
LinearGraphStorage {
node_to_pos: FxHashMap::default(),
node_chains: FxHashMap::default(),
node_to_pos: HashMap::default(),
node_chains: HashMap::default(),
annos: AnnoStorageImpl::new(),
stats: None,
}
Expand Down Expand Up @@ -97,6 +96,15 @@ where
Box::from(std::iter::empty())
}

fn has_ingoing_edges(&self, node: NodeID) -> Result<bool> {
let result = self
.node_to_pos
.get(&node)
.map(|pos| !pos.pos.is_zero())
.unwrap_or(false);
Ok(result)
}

fn source_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a> {
// use the node chains to find source nodes, but always skip the last element
// because the last element is only a target node, not a source node
Expand Down
10 changes: 10 additions & 0 deletions core/src/graph/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ pub trait EdgeContainer: Sync + Send {
node: NodeID,
) -> Box<dyn Iterator<Item = Result<NodeID>> + 'a>;

/// Return true of the given node has any incoming edges.
fn has_ingoing_edges(&self, node: NodeID) -> Result<bool> {
if let Some(ingoing) = self.get_ingoing_edges(node).next() {
ingoing?;
Ok(true)
} else {
Ok(false)
}
}

fn get_statistics(&self) -> Option<&GraphStatistic> {
None
}
Expand Down
1 change: 1 addition & 0 deletions core/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};

use std::borrow::Cow;

pub mod disk_collections;
Expand Down
13 changes: 8 additions & 5 deletions graphannis/src/annis/db/corpusstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ impl CorpusStorage {
// make it known to the cache
cache.insert(
corpus_name.clone(),
Arc::new(RwLock::new(CacheEntry::Loaded(graph))),
Arc::new(RwLock::new(CacheEntry::NotLoaded)),
);
check_cache_size_and_remove_with_cache(
cache,
Expand Down Expand Up @@ -1389,9 +1389,7 @@ impl CorpusStorage {
{
let mut lock = db_entry.write()?;
let db = get_write_or_error(&mut lock)?;
for c in missing_components {
db.ensure_loaded(&c)?;
}
db.ensure_loaded_parallel(&missing_components)?;
}
self.check_cache_size_and_remove(vec![corpus_name], true)?;
};
Expand Down Expand Up @@ -2502,8 +2500,13 @@ fn check_cache_size_and_remove_with_cache(
// but never remove the last loaded entry
let all_corpus_names: Vec<String> = cache.keys().cloned().collect();
for corpus_name in all_corpus_names {
let corpus_is_loaded = if let Some(cache_entry) = cache.get(&corpus_name) {
matches!(*cache_entry.read()?, CacheEntry::Loaded(_))
} else {
false
};
if size_sum > max_cache_size {
if !keep.contains(corpus_name.as_str()) {
if corpus_is_loaded && !keep.contains(corpus_name.as_str()) {
cache.remove(&corpus_name);
// Re-measure the currently used memory size for this process
size_sum = memory_stats().map(|s| s.physical_mem).unwrap_or(usize::MAX);
Expand Down
2 changes: 1 addition & 1 deletion graphannis/src/annis/db/exec/tokensearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<'a> AnyTokenSearch<'a> {
let n = tok_candidate?.node;
let mut is_root_tok = true;
if let Some(order_gs) = self.order_gs {
is_root_tok = is_root_tok && order_gs.get_ingoing_edges(n).next().is_none();
is_root_tok = !order_gs.has_ingoing_edges(n)?;
}
if let Some(ref token_helper) = self.token_helper {
if is_root_tok {
Expand Down
2 changes: 0 additions & 2 deletions graphannis/src/annis/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ mod plan;
pub mod relannis;
pub mod sort_matches;
pub mod token_helper;

pub use graphannis_core::annostorage::AnnotationStorage;
Loading

0 comments on commit fb32b9d

Please sign in to comment.