Skip to content

Commit ec2fc2d

Browse files
committed
Allow readers to read consistently
This commit exposes a `read_at` on the block cache allowing callers to read the same data from offset X over and over again. This means that when we expose data from a block cache in the logrotate filesystem we are able to correctly do `wc -l` and similar. Signed-off-by: Brian L. Troutwine <[email protected]>
1 parent 8c02a22 commit ec2fc2d

File tree

3 files changed

+93
-49
lines changed

3 files changed

+93
-49
lines changed

Diff for: lading/src/bin/payloadtool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fn generate_and_check(
4646
let start = Instant::now();
4747
let blocks =
4848
match block::Cache::fixed(&mut rng, total_bytes, max_block_size.get_bytes(), config)? {
49-
block::Cache::Fixed { blocks, idx: _ } => blocks,
49+
block::Cache::Fixed { blocks, .. } => blocks,
5050
};
5151
info!("Payload generation took {:?}", start.elapsed());
5252
debug!("Payload: {:#?}", blocks);

Diff for: lading/src/generator/file_gen/model.rs

+15-45
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,6 @@ pub struct File {
4747
}
4848

4949
impl File {
50-
/// Returns the number of bytes available to be read at instance `now`.
51-
///
52-
/// This function returns the number of bytes that have been "written" to
53-
/// the `File` and are available to be read. For instance, `modified_tick`
54-
/// may be in the past but sufficient bytes have accumulated in the file for
55-
/// non-zero reads to remain possible. Bytes will not be noted as consumed
56-
/// until the caller calls [`File::read`].
57-
///
58-
/// Call to this file will advance `bytes_written` if `now` >
59-
/// `modified_tick`.
60-
///
61-
/// Returns 0 if `bytes_written` == `bytes_read`.
62-
///
63-
/// # Panics
64-
///
65-
/// Function will panic if `bytes_written` < `bytes_read`. This indicates a
66-
/// catastrophic programming error.
67-
pub fn available_to_read(&mut self, now: Tick) -> u64 {
68-
self.advance_time(now);
69-
70-
assert!(self.bytes_written >= self.bytes_read);
71-
self.bytes_written.saturating_sub(self.bytes_read)
72-
}
73-
7450
/// Register a read.
7551
///
7652
/// This function is pair to [`File::available_to_read`]. It's possible that
@@ -89,16 +65,6 @@ impl File {
8965
self.status_tick = now;
9066
}
9167

92-
/// Register a read-only open.
93-
///
94-
/// This function updates `access_time` to `now`. Time is advanced which may
95-
/// result in more bytes being available in-file.
96-
pub fn ro_open(&mut self, now: Tick) {
97-
self.advance_time(now);
98-
99-
self.access_tick = now;
100-
}
101-
10268
/// Run the clock forward in the `File`.
10369
///
10470
/// This function runs the clock forward to `now`, updating `modified_tick`
@@ -347,19 +313,23 @@ impl State {
347313
name: _,
348314
ref mut file,
349315
}) => {
350-
let available = file.available_to_read(now);
351-
if available == 0 {
352-
return None;
353-
}
316+
let bytes_written = usize::try_from(file.bytes_written)
317+
.expect("more bytes written than machine word");
354318

355-
let block_len = self.block_cache.peek_next().total_bytes.get() as usize;
356-
if block_len <= size {
357-
let block = self.block_cache.next_block();
358-
file.read(block_len as u64, now);
359-
Some(block.bytes.clone())
360-
} else {
361-
None
319+
if offset >= bytes_written {
320+
// Offset beyond EOF
321+
return Some(Bytes::new());
362322
}
323+
324+
let available = bytes_written.saturating_sub(offset);
325+
let to_read = available.min(size);
326+
327+
// Get data from block_cache without worrying about blocks
328+
let data = self.block_cache.read_at(offset as u64, to_read);
329+
330+
file.read(to_read as u64, now);
331+
332+
Some(data)
363333
}
364334
Some(Node::Directory { .. }) | None => None,
365335
}

Diff for: lading_payload/src/block.rs

+77-3
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ pub enum Cache {
152152
idx: usize,
153153
/// The store of blocks.
154154
blocks: Vec<Block>,
155+
/// The amount of data stored in one cycle, or all blocks
156+
total_cycle_size: u64,
155157
},
156158
}
157159

@@ -320,7 +322,17 @@ impl Cache {
320322
construct_block_cache_inner(rng, &pyld, maximum_block_bytes, total_bytes.get())?
321323
}
322324
};
323-
Ok(Self::Fixed { idx: 0, blocks })
325+
326+
let total_cycle_size = blocks
327+
.iter()
328+
.map(|block| u64::from(block.total_bytes.get()))
329+
.sum();
330+
331+
Ok(Self::Fixed {
332+
idx: 0,
333+
blocks,
334+
total_cycle_size,
335+
})
324336
}
325337

326338
/// Run `Cache` forward on the user-provided mpsc sender.
@@ -336,7 +348,9 @@ impl Cache {
336348
#[allow(clippy::needless_pass_by_value)]
337349
pub fn spin(self, snd: Sender<Block>) -> Result<(), SpinError> {
338350
match self {
339-
Self::Fixed { mut idx, blocks } => loop {
351+
Self::Fixed {
352+
mut idx, blocks, ..
353+
} => loop {
340354
snd.blocking_send(blocks[idx].clone())?;
341355
idx = (idx + 1) % blocks.len();
342356
},
@@ -351,7 +365,7 @@ impl Cache {
351365
#[must_use]
352366
pub fn peek_next(&self) -> &Block {
353367
match self {
354-
Self::Fixed { idx, blocks } => &blocks[*idx],
368+
Self::Fixed { idx, blocks, .. } => &blocks[*idx],
355369
}
356370
}
357371

@@ -364,13 +378,73 @@ impl Cache {
364378
Self::Fixed {
365379
ref mut idx,
366380
blocks,
381+
..
367382
} => {
368383
let block = &blocks[*idx];
369384
*idx = (*idx + 1) % blocks.len();
370385
block
371386
}
372387
}
373388
}
389+
390+
/// Read data starting from a given offset and up to the specified size.
391+
///
392+
/// # Panics
393+
///
394+
/// Function will panic if reads are larger than machine word bytes wide.
395+
pub fn read_at(&self, offset: u64, size: usize) -> Bytes {
396+
let mut data = BytesMut::with_capacity(size);
397+
398+
let (blocks, total_cycle_size) = match self {
399+
Cache::Fixed {
400+
blocks,
401+
total_cycle_size,
402+
..
403+
} => (
404+
blocks,
405+
usize::try_from(*total_cycle_size)
406+
.expect("cycle size larger than machine word bytes"),
407+
),
408+
};
409+
410+
let mut remaining = size;
411+
let mut current_offset =
412+
usize::try_from(offset).expect("offset larger than machine word bytes");
413+
414+
while remaining > 0 {
415+
// Compute offset within the cycle
416+
let offset_within_cycle = current_offset % total_cycle_size;
417+
418+
// Find which block this offset falls into
419+
let mut block_start = 0;
420+
for block in blocks {
421+
let block_size = block.total_bytes.get() as usize;
422+
if offset_within_cycle < block_start + block_size {
423+
// Offset is within this block
424+
let block_offset = offset_within_cycle - block_start;
425+
let bytes_in_block = (block_size - block_offset).min(remaining);
426+
427+
data.extend_from_slice(
428+
&block.bytes[block_offset..block_offset + bytes_in_block],
429+
);
430+
431+
remaining -= bytes_in_block;
432+
current_offset += bytes_in_block;
433+
break;
434+
}
435+
block_start += block_size;
436+
}
437+
438+
// If we couldn't find a block this suggests something seriously
439+
// wacky has happened.
440+
if remaining > 0 && block_start >= total_cycle_size {
441+
error!("Offset exceeds total cycle size");
442+
break;
443+
}
444+
}
445+
446+
data.freeze()
447+
}
374448
}
375449

376450
/// Construct a new block cache of form defined by `serializer`.

0 commit comments

Comments
 (0)