Skip to content

Commit 524ca99

Browse files
committed
Allow readers to read consistently
This commit exposes a `read_at` on the block cache allowing callers to read the same data from offset X over and over again. This means that when we expose data from a block cache in the logrotate filesystem we are able to correctly do `wc -l` and similar. Signed-off-by: Brian L. Troutwine <[email protected]>
1 parent 4206504 commit 524ca99

File tree

3 files changed

+93
-49
lines changed

3 files changed

+93
-49
lines changed

lading/src/bin/payloadtool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fn generate_and_check(
4646
let start = Instant::now();
4747
let blocks =
4848
match block::Cache::fixed(&mut rng, total_bytes, max_block_size.get_bytes(), config)? {
49-
block::Cache::Fixed { blocks, idx: _ } => blocks,
49+
block::Cache::Fixed { blocks, .. } => blocks,
5050
};
5151
info!("Payload generation took {:?}", start.elapsed());
5252
debug!("Payload: {:#?}", blocks);

lading/src/generator/file_gen/model.rs

+15-45
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,6 @@ pub struct File {
4747
}
4848

4949
impl File {
50-
/// Returns the number of bytes available to be read at instance `now`.
51-
///
52-
/// This function returns the number of bytes that have been "written" to
53-
/// the `File` and are available to be read. For instance, `modified_tick`
54-
/// may be in the past but sufficient bytes have accumulated in the file for
55-
/// non-zero reads to remain possible. Bytes will not be noted as consumed
56-
/// until the caller calls [`File::read`].
57-
///
58-
/// Call to this file will advance `bytes_written` if `now` >
59-
/// `modified_tick`.
60-
///
61-
/// Returns 0 if `bytes_written` == `bytes_read`.
62-
///
63-
/// # Panics
64-
///
65-
/// Function will panic if `bytes_written` < `bytes_read`. This indicates a
66-
/// catastrophic programming error.
67-
pub fn available_to_read(&mut self, now: Tick) -> u64 {
68-
self.advance_time(now);
69-
70-
assert!(self.bytes_written >= self.bytes_read);
71-
self.bytes_written.saturating_sub(self.bytes_read)
72-
}
73-
7450
/// Register a read.
7551
///
7652
/// This function is pair to [`File::available_to_read`]. It's possible that
@@ -89,16 +65,6 @@ impl File {
8965
self.status_tick = now;
9066
}
9167

92-
/// Register a read-only open.
93-
///
94-
/// This function updates `access_time` to `now`. Time is advanced which may
95-
/// result in more bytes being available in-file.
96-
pub fn ro_open(&mut self, now: Tick) {
97-
self.advance_time(now);
98-
99-
self.access_tick = now;
100-
}
101-
10268
/// Run the clock forward in the `File`.
10369
///
10470
/// This function runs the clock forward to `now`, updating `modified_tick`
@@ -347,19 +313,23 @@ impl State {
347313
name: _,
348314
ref mut file,
349315
}) => {
350-
let available = file.available_to_read(now);
351-
if available == 0 {
352-
return None;
353-
}
316+
let bytes_written = usize::try_from(file.bytes_written)
317+
.expect("more bytes written than machine word");
354318

355-
let block_len = self.block_cache.peek_next().total_bytes.get() as usize;
356-
if block_len <= size {
357-
let block = self.block_cache.next_block();
358-
file.read(block_len as u64, now);
359-
Some(block.bytes.clone())
360-
} else {
361-
None
319+
if offset >= bytes_written {
320+
// Offset beyond EOF
321+
return Some(Bytes::new());
362322
}
323+
324+
let available = bytes_written.saturating_sub(offset);
325+
let to_read = available.min(size);
326+
327+
// Get data from block_cache without worrying about blocks
328+
let data = self.block_cache.read_at(offset as u64, to_read);
329+
330+
file.read(to_read as u64, now);
331+
332+
Some(data)
363333
}
364334
Some(Node::Directory { .. }) | None => None,
365335
}

lading_payload/src/block.rs

+77-3
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub enum Cache {
155155
idx: usize,
156156
/// The store of blocks.
157157
blocks: Vec<Block>,
158+
/// The amount of data stored in one cycle, or all blocks
159+
total_cycle_size: u64,
158160
},
159161
}
160162

@@ -323,7 +325,17 @@ impl Cache {
323325
construct_block_cache_inner(rng, &pyld, maximum_block_bytes, total_bytes.get())?
324326
}
325327
};
326-
Ok(Self::Fixed { idx: 0, blocks })
328+
329+
let total_cycle_size = blocks
330+
.iter()
331+
.map(|block| u64::from(block.total_bytes.get()))
332+
.sum();
333+
334+
Ok(Self::Fixed {
335+
idx: 0,
336+
blocks,
337+
total_cycle_size,
338+
})
327339
}
328340

329341
/// Run `Cache` forward on the user-provided mpsc sender.
@@ -339,7 +351,9 @@ impl Cache {
339351
#[allow(clippy::needless_pass_by_value)]
340352
pub fn spin(self, snd: Sender<Block>) -> Result<(), SpinError> {
341353
match self {
342-
Self::Fixed { mut idx, blocks } => loop {
354+
Self::Fixed {
355+
mut idx, blocks, ..
356+
} => loop {
343357
snd.blocking_send(blocks[idx].clone())?;
344358
idx = (idx + 1) % blocks.len();
345359
},
@@ -354,7 +368,7 @@ impl Cache {
354368
#[must_use]
355369
pub fn peek_next(&self) -> &Block {
356370
match self {
357-
Self::Fixed { idx, blocks } => &blocks[*idx],
371+
Self::Fixed { idx, blocks, .. } => &blocks[*idx],
358372
}
359373
}
360374

@@ -367,13 +381,73 @@ impl Cache {
367381
Self::Fixed {
368382
ref mut idx,
369383
blocks,
384+
..
370385
} => {
371386
let block = &blocks[*idx];
372387
*idx = (*idx + 1) % blocks.len();
373388
block
374389
}
375390
}
376391
}
392+
393+
/// Read data starting from a given offset and up to the specified size.
394+
///
395+
/// # Panics
396+
///
397+
/// Function will panic if reads are larger than machine word bytes wide.
398+
pub fn read_at(&self, offset: u64, size: usize) -> Bytes {
399+
let mut data = BytesMut::with_capacity(size);
400+
401+
let (blocks, total_cycle_size) = match self {
402+
Cache::Fixed {
403+
blocks,
404+
total_cycle_size,
405+
..
406+
} => (
407+
blocks,
408+
usize::try_from(*total_cycle_size)
409+
.expect("cycle size larger than machine word bytes"),
410+
),
411+
};
412+
413+
let mut remaining = size;
414+
let mut current_offset =
415+
usize::try_from(offset).expect("offset larger than machine word bytes");
416+
417+
while remaining > 0 {
418+
// Compute offset within the cycle
419+
let offset_within_cycle = current_offset % total_cycle_size;
420+
421+
// Find which block this offset falls into
422+
let mut block_start = 0;
423+
for block in blocks {
424+
let block_size = block.total_bytes.get() as usize;
425+
if offset_within_cycle < block_start + block_size {
426+
// Offset is within this block
427+
let block_offset = offset_within_cycle - block_start;
428+
let bytes_in_block = (block_size - block_offset).min(remaining);
429+
430+
data.extend_from_slice(
431+
&block.bytes[block_offset..block_offset + bytes_in_block],
432+
);
433+
434+
remaining -= bytes_in_block;
435+
current_offset += bytes_in_block;
436+
break;
437+
}
438+
block_start += block_size;
439+
}
440+
441+
// If we couldn't find a block this suggests something seriously
442+
// wacky has happened.
443+
if remaining > 0 && block_start >= total_cycle_size {
444+
error!("Offset exceeds total cycle size");
445+
break;
446+
}
447+
}
448+
449+
data.freeze()
450+
}
377451
}
378452

379453
/// Construct a new block cache of form defined by `serializer`.

0 commit comments

Comments
 (0)