Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WAL persistance #311

Merged
merged 9 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release_capi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
omitPrereleaseDuringUpdate: true
deploy_macos_binaries:
if: ${{ github.event.action == 'completed' || github.event.label.name == 'test-release-process' || (github.event_name == 'release' && github.event.action == 'published') }}
runs-on: macos-12
runs-on: macos-14
steps:
- id: latest-release
uses: pozetroninc/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
RUST_LOG: debug
test_mac:
name: Execute automated tests on OSX
runs-on: macos-12
runs-on: macos-14
steps:
- uses: actions/checkout@v2
- uses: actions-rust-lang/[email protected]
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `UpdateEvent` now implements `PartialEq` to make possible to compare changes.

### Fixed

- Deserializing a write-ahead log failed because it was located at the wrong
sub-directory and the deserialization routine for the map had a bug.

## [3.5.1] - 2024-09-25

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion capi/src/cerror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct CauseIterator<'a> {
current: Option<&'a dyn StdError>,
}

impl<'a> std::iter::Iterator for CauseIterator<'a> {
impl std::iter::Iterator for CauseIterator<'_> {
type Item = Error;

fn next(&mut self) -> std::option::Option<Error> {
Expand Down
14 changes: 8 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@ bincode = "1.2"
clru = "0.6.1"
itertools = "0.10"
lazy_static = "1.4"
toml = "0.8"
log = "0.4"
memmap2 = "0.9"
normpath = "1.1.1"
num-traits = "0.2"
percent-encoding = "2.1"
quick-xml = "0.28"
rand = { version = "0.8", features = ["small_rng"] }
rayon = { version = "1.3", default-features = false }
rand = {version = "0.8", features = ["small_rng"]}
rayon = {version = "1.3", default-features = false}
regex = "1"
regex-syntax = "0.8"
rustc-hash = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde = {version = "1.0", features = ["rc"]}
serde_bytes = "0.11"
serde_derive = "1.0"
smallvec = "1.6"
smartstring = { version = "1", features = ["serde"] }
smartstring = {version = "1", features = ["serde"]}
sstable = "0.11"
strum = "0.21"
strum_macros = "0.21"
tempfile = "3.1"
thiserror = "1"
toml = "0.8"
transient-btree-index = "0.5"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["heapapi"] }
winapi = {version = "0.3", features = ["heapapi"]}

[dev-dependencies]
env_logger = "0.9"
fake = "2.2"
insta = {version = "1.38.0", features = ["json"]}
pretty_assertions = "1.3"
serde_json = "1.0"
2 changes: 1 addition & 1 deletion core/src/dfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'a> CycleSafeDFS<'a> {
}
}

impl<'a> Iterator for CycleSafeDFS<'a> {
impl Iterator for CycleSafeDFS<'_> {
type Item = Result<DFSStep>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
60 changes: 59 additions & 1 deletion core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl<CT: ComponentType> Graph<CT> {
std::fs::create_dir_all(&current_path)?;

// If successfull write log
let log_path = location.join("update_log.bin");
let log_path = current_path.join("update_log.bin");

// Create a temporary directory in the same file system as the output
let temporary_dir = tempfile::tempdir_in(&current_path)?;
Expand Down Expand Up @@ -1155,4 +1155,62 @@ mod tests {
db.ensure_loaded_parallel(&[component]).unwrap();
assert_eq!(0, db.components.len());
}

#[test]
fn load_with_wal_file() {
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
let example_node = 0;
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_TYPE_KEY.as_ref().clone(),
val: "corpus".into(),
},
)
.unwrap();
db.node_annos
.insert(
example_node,
Annotation {
key: NODE_NAME_KEY.as_ref().clone(),
val: "root".into(),
},
)
.unwrap();

let tmp = tempfile::tempdir().unwrap();
// Save and remember the location, so that updates are recorded in a WAL
// file
db.persist_to(tmp.path()).unwrap();

// Add an node annotation with apply_update
let mut u = GraphUpdate::new();
u.add_event(UpdateEvent::AddNodeLabel {
node_name: "root".into(),
anno_ns: "example".into(),
anno_name: "anno-name".into(),
anno_value: "anno-value".into(),
})
.unwrap();
db.apply_update(&mut u, |_| {}).unwrap();

std::mem::drop(db);

// Check that loading the database again contains the changes
let mut db = Graph::<DefaultComponentType>::new(false).unwrap();
db.load_from(tmp.path(), true).unwrap();
let anno_value = db
.node_annos
.get_value_for_item(
&example_node,
&AnnoKey {
name: "anno-name".into(),
ns: "example".into(),
},
)
.unwrap()
.unwrap();
assert_eq!("anno-value", anno_value);
}
}
2 changes: 1 addition & 1 deletion core/src/graph/storage/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl<'a> UnionEdgeContainer<'a> {
}
}

impl<'a> EdgeContainer for UnionEdgeContainer<'a> {
impl EdgeContainer for UnionEdgeContainer<'_> {
fn get_outgoing_edges<'b>(
&'b self,
node: NodeID,
Expand Down
10 changes: 5 additions & 5 deletions core/src/graph/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sstable::{SSIterator, Table, TableBuilder, TableIterator};
use tempfile::NamedTempFile;

/// Describes a single update on the graph.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum UpdateEvent {
/// Add a node with a name and type.
AddNode {
Expand Down Expand Up @@ -304,10 +304,7 @@ impl<'de> Visitor<'de> for GraphUpdateVisitor {

let mut event_counter = 0;

while let Some((id, event)) = access
.next_entry::<u64, GraphUpdate>()
.map_err(M::Error::custom)?
{
while let Some((id, event)) = access.next_entry::<u64, UpdateEvent>()? {
event_counter = id;
let key = id.create_key();
let value = serialization.serialize(&event).map_err(M::Error::custom)?;
Expand Down Expand Up @@ -338,3 +335,6 @@ impl<'de> Deserialize<'de> for GraphUpdate {
deserializer.deserialize_map(GraphUpdateVisitor {})
}
}

#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
source: core/src/graph/update/tests.rs
expression: seralized_string
---
{
"1": {
"AddNode": {
"node_name": "parent",
"node_type": "corpus"
}
},
"2": {
"AddNode": {
"node_name": "child",
"node_type": "corpus"
}
},
"3": {
"AddEdge": {
"source_node": "child",
"target_node": "parent",
"layer": "annis",
"component_type": "PartOf",
"component_name": ""
}
}
}
122 changes: 122 additions & 0 deletions core/src/graph/update/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use insta::assert_snapshot;

use super::*;

#[test]
fn serialize_deserialize_bincode() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_bytes: Vec<u8> = bincode::serialize(&updates).unwrap();
let deseralized_update: GraphUpdate = bincode::deserialize(&seralized_bytes).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}

#[test]
fn serialize_deserialize_bincode_empty() {
let example_updates: Vec<UpdateEvent> = Vec::new();

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_bytes: Vec<u8> = bincode::serialize(&updates).unwrap();
let deseralized_update: GraphUpdate = bincode::deserialize(&seralized_bytes).unwrap();

assert_eq!(0, deseralized_update.len().unwrap());
assert_eq!(true, deseralized_update.is_empty().unwrap());
}

#[test]
fn serialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
assert_snapshot!(seralized_string);
}

#[test]
fn serialize_deserialize_json() {
let example_updates = vec![
UpdateEvent::AddNode {
node_name: "parent".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddNode {
node_name: "child".into(),
node_type: "corpus".into(),
},
UpdateEvent::AddEdge {
source_node: "child".into(),
target_node: "parent".into(),
layer: "annis".into(),
component_type: "PartOf".into(),
component_name: "".into(),
},
];

let mut updates = GraphUpdate::new();
for e in example_updates.iter() {
updates.add_event(e.clone()).unwrap();
}

let seralized_string = serde_json::to_string_pretty(&updates).unwrap();
let deseralized_update: GraphUpdate = serde_json::from_str(&seralized_string).unwrap();

assert_eq!(3, deseralized_update.len().unwrap());
let deseralized_events: Vec<UpdateEvent> = deseralized_update
.iter()
.unwrap()
.map(|e| e.unwrap().1)
.collect();
assert_eq!(example_updates, deseralized_events);
}
4 changes: 2 additions & 2 deletions core/src/util/disk_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ where
}
}

impl<'a, K, V> Iterator for CombinedRange<'a, K, V>
impl<K, V> Iterator for CombinedRange<'_, K, V>
where
K: Ord,
for<'de> K: 'static + Clone + KeySerializer + Send,
Expand Down Expand Up @@ -556,7 +556,7 @@ where
}
}

impl<'a, K, V> FusedIterator for CombinedRange<'a, K, V>
impl<K, V> FusedIterator for CombinedRange<'_, K, V>
where
K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
Expand Down
Loading
Loading