Skip to content

Commit

Permalink
Merge branch 'main' into 2.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 24, 2024
2 parents 6752afd + 675f184 commit 8beaa1e
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 68 deletions.
55 changes: 11 additions & 44 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,70 +103,37 @@ impl Batch {
}
}

// NOTE: Fully (write) lock, so the batch can be committed atomically
log::trace!("batch: Acquiring partitions lock");
let partitions = self.keyspace.partitions.write().expect("lock is poisoned");

// IMPORTANT: Need to WRITE lock all affected partition's memtables
// Otherwise, there may be read skew
log::trace!("batch: Acquiring memtable locks");
let locked_memtables = {
let mut lock_map = HashMap::new();

for item in &self.data {
if lock_map.contains_key(&item.partition) {
continue;
}

let Some(partition) = partitions.get(&item.partition) else {
continue;
};

if partition.is_deleted.load(Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}

lock_map.insert(
item.partition.clone(),
partition.tree.lock_active_memtable(),
);
}

lock_map
};

#[allow(clippy::mutable_key_type)]
let mut partitions_with_possible_stall = HashSet::new();
let partitions = self.keyspace.partitions.read().expect("lock is poisoned");

let mut batch_size = 0u64;

log::trace!("Applying {} batched items to memtable(s)", self.data.len());

for item in std::mem::take(&mut self.data) {
let Some(partition) = partitions.get(&item.partition) else {
continue;
};

let Some(active_memtable) = locked_memtables.get(&item.partition) else {
continue;
// TODO: need a better, generic write op
let (item_size, _) = match item.value_type {
ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
};

let (item_size, _) = partition.tree.raw_insert_with_lock(
active_memtable,
item.key,
item.value,
batch_seqno,
item.value_type,
);

batch_size += u64::from(item_size);

// IMPORTANT: Clone the handle, because we don't want to keep the partitions lock open
partitions_with_possible_stall.insert(partition.clone());
}

drop(journal_writer);
self.keyspace
.visible_seqno
.fetch_max(batch_seqno + 1, Ordering::AcqRel);

drop(locked_memtables);
drop(journal_writer);
drop(partitions);

// IMPORTANT: Add batch size to current write buffer size
Expand Down
50 changes: 30 additions & 20 deletions src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct KeyspaceInner {
/// Current sequence number
pub(crate) seqno: SequenceNumberCounter,

/// Current visible sequence number
pub(crate) visible_seqno: SequenceNumberCounter,

/// Caps write buffer size by flushing
/// memtables to disk segments
pub(crate) flush_manager: Arc<RwLock<FlushManager>>,
Expand Down Expand Up @@ -519,7 +522,7 @@ impl Keyspace {
/// ```
#[must_use]
pub fn instant(&self) -> crate::Instant {
self.seqno.get()
self.visible_seqno.get()
}

fn check_version<P: AsRef<Path>>(path: P) -> crate::Result<()> {
Expand Down Expand Up @@ -567,6 +570,7 @@ impl Keyspace {
xxhash_rust::xxh3::Xxh3Builder::new(),
))),
seqno: SequenceNumberCounter::default(),
visible_seqno: SequenceNumberCounter::default(),
flush_manager: Arc::new(RwLock::new(FlushManager::new())),
journal_manager: Arc::new(RwLock::new(journal_manager)),
flush_semaphore: Arc::new(Semaphore::new(0)),
Expand Down Expand Up @@ -661,6 +665,11 @@ impl Keyspace {
}
}

keyspace.visible_seqno.store(
keyspace.seqno.load(std::sync::atomic::Ordering::Acquire),
std::sync::atomic::Ordering::Release,
);

Ok(keyspace)
}

Expand Down Expand Up @@ -692,6 +701,7 @@ impl Keyspace {
xxhash_rust::xxh3::Xxh3Builder::new(),
))),
seqno: SequenceNumberCounter::default(),
visible_seqno: SequenceNumberCounter::default(),
flush_manager: Arc::new(RwLock::new(FlushManager::new())),
journal_manager: Arc::new(RwLock::new(JournalManager::from_active(
active_journal_path,
Expand Down Expand Up @@ -753,28 +763,28 @@ impl Keyspace {
thread_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

std::thread::Builder::new()
.name("syncer".into())
.spawn(move || {
while !stop_signal.is_stopped() {
log::trace!("fsync thread: sleeping {ms}ms");
std::thread::sleep(std::time::Duration::from_millis(ms as u64));

log::trace!("fsync thread: fsyncing journal");
if let Err(e) = journal.persist(PersistMode::SyncAll) {
is_poisoned.store(true, std::sync::atomic::Ordering::Release);
log::error!(
"flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);
return;
.name("syncer".into())
.spawn(move || {
while !stop_signal.is_stopped() {
log::trace!("fsync thread: sleeping {ms}ms");
std::thread::sleep(std::time::Duration::from_millis(ms as u64));

log::trace!("fsync thread: fsyncing journal");
if let Err(e) = journal.persist(PersistMode::SyncAll) {
is_poisoned.store(true, std::sync::atomic::Ordering::Release);
log::error!(
"flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);
return;
}
}
}

log::trace!("fsync thread: exiting because keyspace is dropping");
log::trace!("fsync thread: exiting because keyspace is dropping");

thread_counter.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
})
.map(|_| ())
.map_err(Into::into)
thread_counter.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
})
.map(|_| ())
.map_err(Into::into)
}

fn spawn_compaction_worker(&self) -> crate::Result<()> {
Expand Down
10 changes: 10 additions & 0 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ pub struct PartitionHandleInner {
#[doc(hidden)]
pub seqno: SequenceNumberCounter,

/// Visible sequence number of keyspace
#[doc(hidden)]
pub visible_seqno: SequenceNumberCounter,

/// Snapshot tracker
pub(crate) snapshot_tracker: SnapshotTracker,
}
Expand Down Expand Up @@ -210,6 +214,7 @@ impl PartitionHandle {
journal: keyspace.journal.clone(),
compaction_manager: keyspace.compaction_manager.clone(),
seqno: keyspace.seqno.clone(),
visible_seqno: keyspace.visible_seqno.clone(),
write_buffer_manager: keyspace.write_buffer_manager.clone(),
is_deleted: AtomicBool::default(),
is_poisoned: keyspace.is_poisoned.clone(),
Expand Down Expand Up @@ -279,6 +284,7 @@ impl PartitionHandle {
journal: keyspace.journal.clone(),
compaction_manager: keyspace.compaction_manager.clone(),
seqno: keyspace.seqno.clone(),
visible_seqno: keyspace.visible_seqno.clone(),
tree,
write_buffer_manager: keyspace.write_buffer_manager.clone(),
is_deleted: AtomicBool::default(),
Expand Down Expand Up @@ -899,6 +905,8 @@ impl PartitionHandle {

let (item_size, memtable_size) = self.tree.insert(key, value, seqno);

self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);

drop(journal_writer);

let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
Expand Down Expand Up @@ -973,6 +981,8 @@ impl PartitionHandle {

let (item_size, memtable_size) = self.tree.remove(key, seqno);

self.visible_seqno.fetch_max(seqno + 1, Ordering::AcqRel);

drop(journal_writer);

let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));
Expand Down
8 changes: 4 additions & 4 deletions tests/partition_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ fn tx_partition_delete() -> fjall::Result<()> {
assert_eq!(keyspace.read_tx().len(&tree)?, ITEM_COUNT * 2);
assert_eq!(
keyspace.read_tx().iter(&tree).flatten().count(),
ITEM_COUNT * 2
ITEM_COUNT * 2,
);
assert_eq!(
keyspace.read_tx().iter(&tree).rev().flatten().count(),
ITEM_COUNT * 2
ITEM_COUNT * 2,
);
}

Expand All @@ -116,11 +116,11 @@ fn tx_partition_delete() -> fjall::Result<()> {
assert_eq!(keyspace.read_tx().len(&tree)?, ITEM_COUNT * 2);
assert_eq!(
keyspace.read_tx().iter(&tree).flatten().count(),
ITEM_COUNT * 2
ITEM_COUNT * 2,
);
assert_eq!(
keyspace.read_tx().iter(&tree).rev().flatten().count(),
ITEM_COUNT * 2
ITEM_COUNT * 2,
);

assert!(path.try_exists()?);
Expand Down
44 changes: 44 additions & 0 deletions tests/seqno_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,47 @@ fn recover_seqno() -> fjall::Result<()> {

Ok(())
}

#[test]
fn recover_seqno_tombstone() -> fjall::Result<()> {
let folder = tempfile::tempdir()?;

let mut seqno = 0;

// NOTE: clippy bug
#[allow(unused_assignments)]
{
let keyspace = Config::new(&folder).open()?;

let partitions = &[
keyspace.open_partition("default1", PartitionCreateOptions::default())?,
keyspace.open_partition("default2", PartitionCreateOptions::default())?,
keyspace.open_partition("default3", PartitionCreateOptions::default())?,
];

for tree in partitions {
for x in 0..ITEM_COUNT as u64 {
let key = x.to_be_bytes();
tree.remove(key)?;

seqno += 1;
assert_eq!(seqno, keyspace.instant());
}

for x in 0..ITEM_COUNT as u64 {
let key: [u8; 8] = (x + ITEM_COUNT as u64).to_be_bytes();
tree.remove(key)?;

seqno += 1;
assert_eq!(seqno, keyspace.instant());
}
}
}

for _ in 0..10 {
let keyspace = Config::new(&folder).open()?;
assert_eq!(seqno, keyspace.instant());
}

Ok(())
}

0 comments on commit 8beaa1e

Please sign in to comment.