Skip to content

Commit c334d0b

Browse files
committed
When using flock, downgrade exclusive file locks when committing
This allows readers to lock files while competing with a writer for the manifest conn
1 parent cb8faec commit c334d0b

File tree

3 files changed

+52
-14
lines changed

3 files changed

+52
-14
lines changed

src/exclusive_file.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,19 @@ use std::path::{Path, PathBuf};
66
use super::*;
77
use crate::FileId;
88

9+
#[derive(Debug)]
10+
enum LockLevel {
11+
Shared,
12+
Exclusive,
13+
}
14+
use LockLevel::*;
15+
916
#[derive(Debug)]
1017
pub(crate) struct ExclusiveFile {
1118
pub(crate) inner: File,
1219
pub(crate) id: FileId,
1320
last_committed_offset: u64,
21+
lock_level: LockLevel,
1422
}
1523

1624
impl ExclusiveFile {
@@ -68,6 +76,7 @@ impl ExclusiveFile {
6876
inner: file,
6977
id,
7078
last_committed_offset: end,
79+
lock_level: Exclusive,
7180
}))
7281
}
7382

@@ -90,6 +99,22 @@ impl ExclusiveFile {
9099
pub(crate) fn next_write_offset(&mut self) -> io::Result<u64> {
91100
self.inner.stream_position()
92101
}
102+
103+
pub(crate) fn downgrade_lock(&mut self) -> io::Result<bool> {
104+
assert!(flocking());
105+
assert!(matches!(self.lock_level, Exclusive));
106+
cfg_if! {
107+
if #[cfg(unix)] {
108+
if !self.inner.flock(LockSharedNonblock)? {
109+
return Ok(false);
110+
}
111+
self.lock_level = Shared;
112+
Ok(true)
113+
} else {
114+
unimplemented!()
115+
}
116+
}
117+
}
93118
}
94119

95120
impl Drop for ExclusiveFile {

src/lib.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,11 @@ impl<'handle> BatchWriter<'handle> {
321321
}
322322

323323
fn commit_inner(mut self, before_write: impl Fn()) -> Result<WriteCommitResult> {
324+
if flocking() {
325+
for ef in &mut self.exclusive_files {
326+
assert!(ef.downgrade_lock()?);
327+
}
328+
}
324329
let mut transaction: OwnedTx = self.handle.start_immediate_transaction()?;
325330
let mut write_commit_res = WriteCommitResult { count: 0 };
326331
for pw in self.pending_writes.drain(..) {
@@ -343,28 +348,32 @@ impl<'handle> BatchWriter<'handle> {
343348

344349
/// Flush Writer's exclusive files and return them to the Handle pool.
345350
fn flush_exclusive_files(&mut self) {
346-
let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
347-
for mut ef in self.exclusive_files.drain(..) {
351+
for ef in &mut self.exclusive_files {
348352
ef.committed().unwrap();
349-
// When we're flocking, we can't have writers and readers at the same time and still be
350-
// able to punch values asynchronously.
351-
if flocking() {
352-
debug!("returning exclusive file {} to handle", ef.id);
353-
assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
354-
}
355353
}
354+
self.return_exclusive_files_to_handle()
356355
}
357-
}
358356

359-
impl Drop for BatchWriter<'_> {
360-
fn drop(&mut self) {
357+
fn return_exclusive_files_to_handle(&mut self) {
358+
// When we're flocking, we can't have writers and readers at the same time and still be
359+
// able to punch values asynchronously.
360+
if flocking() {
361+
return;
362+
}
361363
let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
362364
for ef in self.exclusive_files.drain(..) {
365+
debug!("returning exclusive file {} to handle", ef.id);
363366
assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
364367
}
365368
}
366369
}
367370

371+
impl Drop for BatchWriter<'_> {
372+
fn drop(&mut self) {
373+
self.return_exclusive_files_to_handle()
374+
}
375+
}
376+
368377
type ValueLength = u64;
369378

370379
#[derive(Debug, Clone, PartialEq)]
@@ -543,7 +552,7 @@ where
543552
let file_offset = file_offset + pos;
544553
// Getting lazy: Using positioned-io's ReadAt because it works on Windows.
545554
let res = file.read_at(file_offset, buf);
546-
debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read_at");
555+
debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read_at");
547556
res
548557
}
549558
}
@@ -593,7 +602,7 @@ where
593602
let file = &mut file_clone.file;
594603
file.seek(Start(file_offset))?;
595604
let res = file.read(buf);
596-
debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read");
605+
debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read");
597606
res.map_err(Into::into)
598607
}
599608
}

src/sys/flock/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ mod tests {
4545
// Trying to exclusively lock from another file handle fails immediately.
4646
assert!(!file_reopen.lock_segment(LockExclusiveNonblock, None, 2)?);
4747
let file_reader = OpenOptions::new().read(true).open(file1_named.path())?;
48-
assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0,)?);
48+
// This won't work with flock, because the entire file is exclusively locked, not just a
49+
// different segment.
50+
if !flocking() {
51+
assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0, )?);
52+
}
4953
Ok(())
5054
}
5155
}

0 commit comments

Comments
 (0)