Skip to content

Commit d259f74

Browse files
committed
Get start_transaction working with spawned thread
Finish getting tests to work with shuttle
1 parent 1df00d7 commit d259f74

File tree

10 files changed

+86
-39
lines changed

10 files changed

+86
-39
lines changed

.cargo/config.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[build]
22
# Setting cfg here means our IDE and CLI both use the same values.
3-
#rustflags = "--cfg shuttle"
3+
#rustflags = "--cfg loom"

Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tracing = { version = "0.1.40", features = ["log"] }
4343
twox-hash = { version = "1.6.3", optional = true }
4444
once_cell = "1.19.0"
4545
ctx-thread = "0.1.1"
46+
shuttle = { version = "0.7.1", optional = true }
4647

4748
[target.'cfg(windows)'.dependencies.windows]
4849
version = "0.52.0"
@@ -66,12 +67,10 @@ test-log = "0.2.14"
6667
[target.'cfg(loom)'.dev-dependencies]
6768
loom = "0.7"
6869

69-
[target.'cfg(shuttle)'.dependencies]
70-
shuttle = "0.7.1"
71-
7270
[features]
7371
default = []
7472
testing = ["dep:fdlimit", "dep:rayon", "dep:twox-hash"]
73+
shuttle = ["dep:shuttle"]
7574

7675
[[bench]]
7776
name = "possum"

src/concurrency/mod.rs

+47-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,54 @@
11
pub(crate) mod sync;
22

3-
#[cfg(not(shuttle))]
3+
#[cfg(not(feature = "shuttle"))]
44
pub use std::thread;
55

66
// This isn't available in loom or shuttle yet. Unfortunately for shuttle it means threads are
77
// spawned outside its control, and it doesn't work.
8-
#[cfg(shuttle)]
8+
#[cfg(feature = "shuttle")]
99
pub use shuttle::thread;
10+
11+
#[cfg(not(feature = "shuttle"))]
12+
pub(crate) fn run_blocking<F, R>(f: F) -> R
13+
where
14+
F: FnOnce() -> R + Send,
15+
R: Send,
16+
{
17+
if false {
18+
let (sender, receiver) = std::sync::mpsc::channel();
19+
let tx_thread = std::thread::scope(|scope| {
20+
scope.spawn(|| {
21+
let res = f();
22+
sender.send(res).unwrap();
23+
});
24+
receiver.recv().unwrap()
25+
});
26+
tx_thread
27+
} else {
28+
f()
29+
}
30+
}
31+
32+
#[cfg(feature = "shuttle")]
33+
pub(crate) fn run_blocking<F, R>(f: F) -> R
34+
where
35+
F: FnOnce() -> R + Send,
36+
R: Send,
37+
{
38+
use std::sync::mpsc;
39+
let (sender, receiver) = mpsc::channel();
40+
let tx_thread = std::thread::scope(|scope| {
41+
scope.spawn(||{
42+
let res = f();
43+
sender.send(res).unwrap();
44+
});
45+
loop {
46+
shuttle::thread::yield_now();
47+
match receiver.try_recv() {
48+
Err(mpsc::TryRecvError::Empty) => continue,
49+
default => return default.unwrap()
50+
}
51+
}
52+
});
53+
tx_thread
54+
}

src/concurrency/sync.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::StableDeref;
22
use std::ops::{Deref, DerefMut};
33

4-
#[cfg(shuttle)]
4+
#[cfg(feature = "shuttle")]
55
use shuttle::sync;
6-
#[cfg(not(shuttle))]
6+
#[cfg(not(feature = "shuttle"))]
77
use std::sync;
88

99
use sync::Mutex as InnerMutex;

src/exclusive_file.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,6 @@ impl ExclusiveFile {
131131

132132
impl Drop for ExclusiveFile {
133133
fn drop(&mut self) {
134-
debug!("dropping exclusive file {}", self.id.deref());
134+
debug!("dropping exclusive file {}", self.id);
135135
}
136136
}

src/file_id.rs

-8
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@ impl Debug for FileId {
1515
}
1616
}
1717

18-
impl Deref for FileId {
19-
type Target = FileIdInner;
20-
21-
fn deref(&self) -> &Self::Target {
22-
&self.0
23-
}
24-
}
25-
2618
impl std::str::FromStr for FileId {
2719
type Err = std::num::ParseIntError;
2820

src/handle.rs

+15-12
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,15 @@ impl Handle {
226226
) -> rusqlite::Result<OwnedTx> {
227227
Ok(self
228228
.start_transaction(|conn, handle| {
229-
let rtx = conn.transaction_with_behavior(behaviour)?;
229+
let tx_res = run_blocking(|| {
230+
// We're holding the write lock around the Connection, I think we're safe to
231+
// pass it to another thread. For some reason the return type, the
232+
// rusqlite::Transaction is what spits the dummy. It doesn't implement Send,
233+
// even though it only has a reference to Connection internally. So we put it
234+
// inside CanSend to return it from the thread then pop it out.
235+
conn.transaction_with_behavior(behaviour).map(CanSend)
236+
});
237+
let rtx = tx_res?.0;
230238
Ok(Transaction::new(rtx, handle))
231239
})?
232240
.into())
@@ -261,15 +269,6 @@ impl Handle {
261269
Ok(reader)
262270
}
263271

264-
// pub(crate) fn associated_read<'h, H>(handle: H) -> rusqlite::Result<Reader<'h, H>> where H: WithHandle {
265-
// let reader = Reader {
266-
// owned_tx: handle.as_ref().start_deferred_transaction()?,
267-
// handle,
268-
// reads: Default::default(),
269-
// };
270-
// Ok(reader)
271-
// }
272-
273272
pub fn read_single(&self, key: &[u8]) -> Result<Option<SnapshotValue<Value>>> {
274273
let mut reader = self.read()?;
275274
let Some(value) = reader.add(key)? else {
@@ -454,9 +453,9 @@ impl Handle {
454453
Ok(())
455454
}
456455

457-
pub fn delete_prefix(&self, prefix: &[u8]) -> PubResult<()> {
456+
pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> {
458457
let mut tx = self.start_deferred_transaction()?;
459-
for item in tx.list_items(prefix)? {
458+
for item in tx.list_items(prefix.as_ref())? {
460459
tx.delete_key(&item.key)?;
461460
}
462461
tx.commit()?.complete();
@@ -567,3 +566,7 @@ impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {
567566
self.deref()
568567
}
569568
}
569+
570+
struct CanSend<T>(T);
571+
572+
unsafe impl<T> Send for CanSend<T> {}

src/testing.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub mod torrent_storage;
22

33
use std::hash::Hasher;
4-
use std::io::{copy, SeekFrom, Write};
4+
use std::io::{BufReader, copy, SeekFrom, Write};
55

66
use anyhow::{ensure, Result};
77
use rand::Rng;
@@ -90,6 +90,7 @@ pub fn readable_repeated_bytes(byte: u8, limit: usize) -> Vec<u8> {
9090
pub fn condense_repeated_bytes(r: impl Read) -> (Option<u8>, u64) {
9191
let mut count = 0;
9292
let mut byte = None;
93+
let r = BufReader::new(r);
9394
for b in r.bytes() {
9495
let b = b.unwrap();
9596
match byte {
@@ -118,12 +119,12 @@ pub fn check_concurrency(
118119
loom::model(move || f().unwrap());
119120
Ok(())
120121
}
121-
#[cfg(shuttle)]
122+
#[cfg(feature = "shuttle")]
122123
{
123124
shuttle::check_random(move || f().unwrap(), iterations_hint);
124125
Ok(())
125126
}
126-
#[cfg(all(not(loom), not(shuttle)))]
127+
#[cfg(all(not(loom), not(feature = "shuttle")))]
127128
if false {
128129
for _ in 0..1000 {
129130
f()?

src/tests.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@ fn test_inc_array() {
5353
/// Show that replacing keys doesn't cause a key earlier in the same values file to be punched. This
5454
/// occurred because there were file_id values in the manifest file that had the wrong type, and so
5555
/// the query that looked for the starting offset for hole punching would punch out the whole file
56-
/// thinking it was empty.
56+
/// thinking it was empty. Note sometimes this test fails and there's extra values files floating
57+
/// around. I haven't figured out why.
5758
#[test]
5859
#[cfg(not(miri))]
5960
fn test_replace_keys() -> Result<()> {
6061
check_concurrency(
6162
|| {
6263
let tempdir = test_tempdir("test_replace_keys")?;
6364
let handle = Handle::new(tempdir.path.clone())?;
65+
handle.delete_prefix("")?;
6466
let a = "a".as_bytes().to_vec();
6567
let b = "b".as_bytes().to_vec();
6668
let block_size: usize = handle.block_size().try_into()?;
@@ -92,9 +94,13 @@ fn test_replace_keys() -> Result<()> {
9294
// There can be multiple value files if the value puncher is holding onto a file when another
9395
// write occurs.
9496
for value_file in values_files {
95-
let mut file = File::open(&value_file.path)?;
97+
let path = &value_file.path;
98+
eprintln!("{:?}", path);
99+
let mut file = File::open(path)?;
100+
// file.sync_all()?;
96101
for region in seekhole::Iter::new(&mut file) {
97102
let region = region?;
103+
eprintln!("{:?}", region);
98104
if matches!(region.region_type, seekhole::RegionType::Data) {
99105
allocated_space += region.length();
100106
}

tests/simple_tests.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ fn torrent_storage_small() -> Result<()> {
228228
}
229229
Ok(())
230230
},
231-
100,
231+
1,
232232
)
233233
}
234234

@@ -244,7 +244,7 @@ fn torrent_storage_big() -> Result<()> {
244244
view_snapshot_values: true,
245245
})
246246
},
247-
100,
247+
1,
248248
)
249249
}
250250

@@ -288,7 +288,7 @@ fn torrent_storage_inner(opts: TorrentStorageOpts) -> Result<()> {
288288
let piece_data = Arc::clone(&piece_data);
289289
let start_delay = Duration::from_micros(1000 * (index / 2) as u64);
290290
let handle = Arc::clone(&handle);
291-
join_handles.push(std::thread::spawn(move || -> Result<()> {
291+
join_handles.push(thread::spawn(move || -> Result<()> {
292292
let key = offset_key(offset);
293293
sleep(start_delay);
294294
debug!("starting block write");
@@ -481,13 +481,14 @@ fn reads_update_last_used() -> Result<()> {
481481
let uniform = UniformDuration::new(Duration::from_nanos(0), LAST_USED_RESOLUTION);
482482
for _ in 0..100 {
483483
let dither = uniform.sample(&mut rng);
484-
sleep(LAST_USED_RESOLUTION + dither);
484+
// This needs to be a real sleep or the timestamps sqlite generates don't progress.
485+
std::thread::sleep(LAST_USED_RESOLUTION + dither);
485486
let new_read_ts = handle.read_single(&key)?.unwrap().last_used();
486487
assert!(new_read_ts > read_ts);
487488
}
488489
Ok(())
489490
},
490-
100,
491+
10,
491492
)
492493
}
493494

0 commit comments

Comments
 (0)