Skip to content

Commit 475e2f3

Browse files
committed
Run value_puncher until all values are punched
1 parent c334d0b commit 475e2f3

File tree

6 files changed

+99
-35
lines changed

6 files changed

+99
-35
lines changed

src/dir.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,9 @@ impl Dir {
6060
pub fn supports_file_cloning(&self) -> bool {
6161
self.supports_file_cloning
6262
}
63+
64+
/// Walks the underlying files in the possum directory.
65+
pub fn walk_dir(&self) -> Result<Vec<walk::Entry>> {
66+
crate::walk::walk_dir(self)
67+
}
6368
}

src/file_id.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ impl FileIdFancy {
1414
}
1515

1616
/// Value file identifier
17-
#[derive(Clone, Eq, PartialEq, Hash)]
17+
#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1818
pub struct FileId(OsString);
1919

2020
impl Deref for FileId {

src/handle.rs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct Handle {
2323
pub(crate) instance_limits: Limits,
2424
deleted_values: Option<DeletedValuesSender>,
2525
_value_puncher: Option<std::thread::JoinHandle<()>>,
26+
pub(crate) value_puncher_done: Arc<Mutex<std::sync::mpsc::Receiver<()>>>,
2627
}
2728

2829
/// 4 bytes stored in the database header https://sqlite.org/fileformat2.html#database_header.
@@ -106,18 +107,24 @@ impl Handle {
106107
let mut conn = Connection::open(dir.path().join(MANIFEST_DB_FILE_NAME))?;
107108
Self::init_sqlite_conn(&mut conn)?;
108109
let (deleted_values, receiver) = std::sync::mpsc::sync_channel(10);
110+
let (value_puncher_done_sender, value_puncher_done) = std::sync::mpsc::sync_channel(0);
111+
let value_puncher_done = Arc::new(Mutex::new(value_puncher_done));
109112
let handle = Self {
110113
conn: Mutex::new(conn),
111114
exclusive_files: Default::default(),
112115
dir: dir.clone(),
113116
clones: Default::default(),
114117
instance_limits: Default::default(),
115118
deleted_values: Some(deleted_values),
116-
_value_puncher: Some(std::thread::spawn(|| -> () {
119+
// Don't wait on this, at least in the Drop handler, because it stays alive until it
120+
// succeeds in punching everything.
121+
_value_puncher: Some(std::thread::spawn(move || -> () {
122+
let _value_puncher_done_sender = value_puncher_done_sender;
117123
if let Err(err) = Self::value_puncher(dir, receiver) {
118124
error!("value puncher thread failed with {err:?}");
119125
}
120126
})),
127+
value_puncher_done,
121128
};
122129
Ok(handle)
123130
}
@@ -289,15 +296,43 @@ impl Handle {
289296
| OpenFlags::SQLITE_OPEN_NO_MUTEX
290297
| OpenFlags::SQLITE_OPEN_URI,
291298
)?;
292-
while let Ok(mut values) = values_receiver.recv() {
293-
while let Ok(mut more_values) = values_receiver.try_recv() {
294-
values.append(&mut more_values);
299+
const RETRY_DURATION: Duration = Duration::from_secs(1);
300+
let mut pending_values: Vec<_> = Default::default();
301+
let mut values_receiver_opt = Some(values_receiver);
302+
while values_receiver_opt.is_some() || !pending_values.is_empty() {
303+
match &values_receiver_opt {
304+
Some(values_receiver) => {
305+
let timeout = if pending_values.is_empty() {
306+
Duration::MAX
307+
} else {
308+
RETRY_DURATION
309+
};
310+
let recv_result = values_receiver.recv_timeout(timeout);
311+
use std::sync::mpsc::RecvTimeoutError;
312+
match recv_result {
313+
Ok(mut values) => {
314+
pending_values.append(&mut values);
315+
// Drain the channel
316+
while let Ok(more_values) = values_receiver.try_recv() {
317+
pending_values.extend(more_values);
318+
}
319+
}
320+
Err(RecvTimeoutError::Timeout) => {}
321+
Err(RecvTimeoutError::Disconnected) => {
322+
// Don't try receiving again.
323+
values_receiver_opt = None;
324+
}
325+
}
326+
}
327+
None => {
328+
std::thread::sleep(RETRY_DURATION);
329+
}
295330
}
296331
let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
297332
let tx = ReadTransaction {
298333
tx: ReadOnlyRusqliteTransaction { conn: tx },
299334
};
300-
Self::punch_values(&dir, &values, &tx)?;
335+
pending_values = Self::punch_values(&dir, pending_values, &tx)?;
301336
}
302337
Ok(())
303338
}
@@ -306,24 +341,25 @@ impl Handle {
306341
/// offsets above the targeted values, ongoing writes should not be affected.
307342
pub(crate) fn punch_values(
308343
dir: &Dir,
309-
values: &[NonzeroValueLocation],
344+
values: Vec<NonzeroValueLocation>,
310345
transaction: &ReadTransactionOwned,
311-
) -> PubResult<()> {
346+
) -> PubResult<Vec<NonzeroValueLocation>> {
347+
let mut failed = Vec::with_capacity(values.len());
312348
for v in values {
313349
let NonzeroValueLocation {
314350
file_id,
315351
file_offset,
316352
length,
317353
..
318-
} = v;
354+
} = &v;
319355
let value_length = length;
320356
let msg = format!(
321357
"deleting value at {:?} {} {}",
322358
file_id, file_offset, value_length
323359
);
324360
debug!("{}", msg);
325361
// self.handle.clones.lock().unwrap().remove(&file_id);
326-
punch_value(PunchValueOptions {
362+
if !punch_value(PunchValueOptions {
327363
dir: dir.path(),
328364
file_id,
329365
offset: *file_offset,
@@ -332,9 +368,12 @@ impl Handle {
332368
block_size: dir.block_size(),
333369
constraints: Default::default(),
334370
})
335-
.context(msg)?;
371+
.context(msg)?
372+
{
373+
failed.push(v);
374+
}
336375
}
337-
Ok(())
376+
Ok(failed)
338377
}
339378

340379
pub(crate) fn send_values_for_delete(&self, values: Vec<NonzeroValueLocation>) {

src/lib.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ pub struct Value {
383383
}
384384

385385
/// Storage location info for a non-zero-length value.
386-
#[derive(Debug, Clone, PartialEq)]
386+
#[derive(Debug, Clone, PartialEq, Ord, PartialOrd, Eq)]
387387
pub struct NonzeroValueLocation {
388388
pub file_id: FileId,
389389
pub file_offset: u64,
@@ -552,7 +552,13 @@ where
552552
let file_offset = file_offset + pos;
553553
// Getting lazy: Using positioned-io's ReadAt because it works on Windows.
554554
let res = file.read_at(file_offset, buf);
555-
debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read_at");
555+
debug!(
556+
?file,
557+
?file_offset,
558+
len = buf.len(),
559+
?res,
560+
"snapshot value read_at"
561+
);
556562
res
557563
}
558564
}
@@ -602,7 +608,13 @@ where
602608
let file = &mut file_clone.file;
603609
file.seek(Start(file_offset))?;
604610
let res = file.read(buf);
605-
debug!(?file, ?file_offset, len=buf.len(), ?res, "snapshot value read");
611+
debug!(
612+
?file,
613+
?file_offset,
614+
len = buf.len(),
615+
?res,
616+
"snapshot value read"
617+
);
606618
res.map_err(Into::into)
607619
}
608620
}
@@ -915,7 +927,7 @@ struct PunchValueOptions<'a> {
915927
}
916928

917929
// Can't do this as &mut self for dumb Rust reasons.
918-
fn punch_value(opts: PunchValueOptions) -> Result<()> {
930+
fn punch_value(opts: PunchValueOptions) -> Result<bool> {
919931
let PunchValueOptions {
920932
dir,
921933
file_id,
@@ -941,7 +953,7 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> {
941953
// Punching values probably requires write permission.
942954
let mut file = match OpenOptions::new().write(true).open(&file_path) {
943955
// The file could have already been deleted by a previous punch.
944-
Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(()),
956+
Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(true),
945957
Err(err) => return Err(err).context("opening value file"),
946958
Ok(ok) => ok,
947959
};
@@ -970,10 +982,10 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> {
970982
// because there are no values in this file to clone.
971983
if offset == 0 && allow_remove {
972984
remove_file(file_path).context("removing value file")?;
973-
return Ok(());
985+
return Ok(true);
974986
} else if allow_truncate {
975987
file.set_len(offset as u64)?;
976-
return Ok(());
988+
return Ok(true);
977989
}
978990
file_end
979991
} else if cloning_lock_aware {
@@ -997,14 +1009,14 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> {
9971009
// full block.
9981010
assert!(length >= -block_size);
9991011
if length <= 0 {
1000-
return Ok(());
1012+
return Ok(true);
10011013
}
10021014
assert_eq!(offset % block_size, 0);
10031015
if !file.lock_segment(LockExclusiveNonblock, Some(length as u64), offset as u64)? {
10041016
// TODO: If we can't delete immediately, we should schedule to try again later. Maybe
10051017
// spinning up a thread, or putting in a slow queue.
10061018
warn!(%file_id, %offset, %length, "can't punch, file segment locked");
1007-
return Ok(());
1019+
return Ok(false);
10081020
}
10091021
debug!(?file, %offset, %length, "punching");
10101022
punchfile(
@@ -1013,14 +1025,14 @@ fn punch_value(opts: PunchValueOptions) -> Result<()> {
10131025
length.try_into().unwrap(),
10141026
)
10151027
.with_context(|| format!("length {}", length))?;
1016-
// fcntl(file.as_raw_fd(), nix::fcntl::F_FULLFSYNC)?;
1028+
// nix::fcntl::fcntl(file.as_raw_fd(), nix::fcntl::F_FULLFSYNC)?;
10171029
// file.flush()?;
10181030
if check_holes {
10191031
if let Err(err) = check_hole(&mut file, offset as u64, length as u64) {
10201032
warn!("checking hole: {}", err);
10211033
}
10221034
}
1023-
Ok(())
1035+
Ok(true)
10241036
}
10251037

10261038
/// Checks that there's no data allocated in the region provided.

src/sys/flock/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ mod tests {
4848
// This won't work with flock, because the entire file is exclusively locked, not just a
4949
// different segment.
5050
if !flocking() {
51-
assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0, )?);
51+
assert!(file_reader.lock_segment(LockSharedNonblock, Some(1), 0,)?);
5252
}
5353
Ok(())
5454
}

src/tests.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,33 @@ fn test_replace_keys() -> Result<()> {
7272
handle.read_single(&a).unwrap().unwrap().new_reader(),
7373
a_value.as_slice(),
7474
);
75-
let entries = handle.walk_dir()?;
75+
76+
let dir = handle.dir.clone();
77+
let values_punched = Arc::clone(&handle.value_puncher_done);
78+
drop(handle);
79+
// Wait for it to recv, which should be a disconnect when the value_puncher hangs up.
80+
values_punched.lock().unwrap().recv();
81+
82+
let entries = dir.walk_dir()?;
7683
let values_files: Vec<_> = entries
7784
.iter()
7885
.filter(|entry| entry.entry_type == walk::EntryType::ValuesFile)
7986
.collect();
80-
// Make sure there's only a single values file.
81-
assert_eq!(values_files.len(), 1);
82-
let value_file = values_files[0];
83-
let mut file = File::open(&value_file.path)?;
87+
8488
let mut allocated_space = 0;
85-
for region in seekhole::Iter::new(&mut file) {
86-
let region = region?;
87-
if matches!(region.region_type, seekhole::RegionType::Data) {
88-
allocated_space += region.length();
89+
// There can be multiple value files if the value puncher is holding onto a file when another
90+
// write occurs.
91+
for value_file in values_files {
92+
let mut file = File::open(&value_file.path)?;
93+
for region in seekhole::Iter::new(&mut file) {
94+
let region = region?;
95+
if matches!(region.region_type, seekhole::RegionType::Data) {
96+
allocated_space += region.length();
97+
}
8998
}
9099
}
91100
assert!(
92-
[2, 3]
93-
.map(|num_blocks| num_blocks * block_size as seekhole::RegionOffset)
101+
[2].map(|num_blocks| num_blocks * block_size as seekhole::RegionOffset)
94102
.contains(&allocated_space),
95103
"block_size={}, allocated_space={}",
96104
block_size,

0 commit comments

Comments
 (0)