Skip to content

Commit 2f2b2cc

Browse files
djugeidjc
authored andcommitted
Introduce de-jittering for seeks.
On the first seek it gets enabled, displaying the progress as the max of the last 10 updates. If there ever are more than 5 consecutive reads and writes without seek it gets disabled again, keeping the performance impact low.
1 parent fc4dddb commit 2f2b2cc

File tree

3 files changed

+204
-21
lines changed

3 files changed

+204
-21
lines changed

src/iter.rs

Lines changed: 191 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ where
6262
pub struct ProgressBarIter<T> {
6363
pub(crate) it: T,
6464
pub progress: ProgressBar,
65+
pub(crate) dejitter: MaxSeekHeuristic,
6566
}
6667

6768
impl<T> ProgressBarIter<T> {
@@ -155,25 +156,37 @@ impl<T: FusedIterator> FusedIterator for ProgressBarIter<T> {}
155156
impl<R: io::Read> io::Read for ProgressBarIter<R> {
156157
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
157158
let inc = self.it.read(buf)?;
158-
self.progress.inc(inc as u64);
159+
self.progress.set_position(
160+
self.dejitter
161+
.update_seq(self.progress.position(), inc as u64),
162+
);
159163
Ok(inc)
160164
}
161165

162166
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
163167
let inc = self.it.read_vectored(bufs)?;
164-
self.progress.inc(inc as u64);
168+
self.progress.set_position(
169+
self.dejitter
170+
.update_seq(self.progress.position(), inc as u64),
171+
);
165172
Ok(inc)
166173
}
167174

168175
fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
169176
let inc = self.it.read_to_string(buf)?;
170-
self.progress.inc(inc as u64);
177+
self.progress.set_position(
178+
self.dejitter
179+
.update_seq(self.progress.position(), inc as u64),
180+
);
171181
Ok(inc)
172182
}
173183

174184
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
175185
self.it.read_exact(buf)?;
176-
self.progress.inc(buf.len() as u64);
186+
self.progress.set_position(
187+
self.dejitter
188+
.update_seq(self.progress.position(), buf.len() as u64),
189+
);
177190
Ok(())
178191
}
179192
}
@@ -185,15 +198,24 @@ impl<R: io::BufRead> io::BufRead for ProgressBarIter<R> {
185198

186199
fn consume(&mut self, amt: usize) {
187200
self.it.consume(amt);
188-
self.progress.inc(amt as u64);
201+
self.progress.set_position(
202+
self.dejitter
203+
.update_seq(self.progress.position(), amt.try_into().unwrap()),
204+
);
189205
}
190206
}
191207

192208
impl<S: io::Seek> io::Seek for ProgressBarIter<S> {
193209
fn seek(&mut self, f: io::SeekFrom) -> io::Result<u64> {
194210
self.it.seek(f).map(|pos| {
195-
self.progress.set_position(pos);
196-
pos
211+
// this kind of seek is used to find the current position, but does not alter it
212+
// generally equivalent to stream_position()
213+
if let io::SeekFrom::Current(0) = f {
214+
pos
215+
} else {
216+
self.progress.set_position(self.dejitter.update_seek(pos));
217+
pos
218+
}
197219
})
198220
}
199221
// Pass this through to preserve optimizations that the inner I/O object may use here
@@ -203,6 +225,123 @@ impl<S: io::Seek> io::Seek for ProgressBarIter<S> {
203225
}
204226
}
205227

228+
/// Calculates a more stable visual position from jittery seeks to show to the user.
229+
///
230+
/// It does so by holding the maximum position encountered out of the last HISTORY read/write positions.
231+
/// As an optimization it deallocates the history when only sequential operations are performed RESET times in a row.
232+
#[derive(Debug, Default)]
233+
pub(crate) struct MaxSeekHeuristic<const RESET: u8 = 5, const HISTORY: usize = 10> {
234+
buf: Option<(Box<MaxRingBuf<HISTORY>>, u8)>,
235+
}
236+
237+
impl<const RESET: u8, const HISTORY: usize> MaxSeekHeuristic<RESET, HISTORY> {
238+
fn update_seq(&mut self, prev_pos: u64, delta: u64) -> u64 {
239+
let new_pos = prev_pos + delta;
240+
if let Some((buf, seq)) = &mut self.buf {
241+
*seq += 1;
242+
if *seq >= RESET {
243+
self.buf = None;
244+
return new_pos;
245+
}
246+
247+
buf.update(new_pos);
248+
buf.max()
249+
} else {
250+
new_pos
251+
}
252+
}
253+
254+
fn update_seek(&mut self, newpos: u64) -> u64 {
255+
let (b, seq) = self
256+
.buf
257+
.get_or_insert_with(|| (Box::new(MaxRingBuf::<HISTORY>::default()), 0));
258+
*seq = 0;
259+
b.update(newpos);
260+
b.max()
261+
}
262+
}
263+
264+
/// Ring buffer that remembers the maximum contained value.
265+
///
266+
/// can be used to quickly calculate the maximum value of a history of data points.
267+
#[derive(Debug)]
268+
struct MaxRingBuf<const HISTORY: usize = 10> {
269+
history: [u64; HISTORY],
270+
// invariant_h: always a valid index into history
271+
head: u8,
272+
// invariant_m: always a valid index into history
273+
max_pos: u8,
274+
}
275+
276+
impl<const HISTORY: usize> MaxRingBuf<HISTORY> {
277+
/// Adds a value to the history.
278+
/// Updates internal bookkeeping to remember the maximum value.
279+
///
280+
/// # Performance:
281+
/// amortized O(1):
282+
/// each regular update is O(1).
283+
/// Only updates that overwrite the position the maximum was stored in with a smaller number do a seek of the buffer,
284+
/// searching for the new maximum.
285+
/// This only happens on average each 1/HISTORY and has a cost of HISTORY,
286+
/// therefore amortizing to O(1).
287+
///
288+
/// In case there is some linear increase with jitter,
289+
/// as expected in this specific use-case,
290+
/// as long as there is one bigger update each HISTORY updates the scan is never triggered at all.
291+
///
292+
/// Worst case would be linearly decreasing values, which is still O(1).
293+
fn update(&mut self, new: u64) {
294+
// exploit invariant_h to eliminate bounds checks & panic code path
295+
let head = usize::from(self.head) % self.history.len();
296+
// exploit invariant_m to eliminate bounds checks & panic code path
297+
let max_pos = usize::from(self.max_pos) % self.history.len();
298+
299+
// save max now in case it gets overwritten in the next line
300+
let prev_max = self.history[max_pos];
301+
self.history[head] = new;
302+
303+
if new > prev_max {
304+
// This is now the new maximum
305+
self.max_pos = self.head;
306+
} else if self.max_pos == self.head && new < prev_max {
307+
// This was the maximum and may not be anymore
308+
// do a linear seek to find the new maximum
309+
let (idx, _val) = self
310+
.history
311+
.iter()
312+
.enumerate()
313+
.max_by_key(|(_, v)| *v)
314+
.expect("array has fixded size > 0");
315+
// invariant_m: idx is from an enumeration of history
316+
self.max_pos = idx.try_into().expect("history.len() <= u8::MAX");
317+
}
318+
319+
// invariant_h: head is kept in bounds by %-ing with history.len()
320+
// it is a ring buffer so wrapping around is expected behaviour.
321+
self.head = (self.head + 1) % (self.history.len() as u8);
322+
}
323+
324+
/// Returns the maximum value out of the memorized entries
325+
fn max(&self) -> u64 {
326+
// exploit invariant_m to eliminate bounds checks & panic code path
327+
self.history[self.max_pos as usize % self.history.len()]
328+
}
329+
}
330+
331+
impl<const HISTORY: usize> Default for MaxRingBuf<HISTORY> {
332+
fn default() -> Self {
333+
assert!(HISTORY <= u8::MAX.into());
334+
assert!(HISTORY > 0);
335+
Self {
336+
history: [0; HISTORY],
337+
// invariant_h: we asserted that history has at least one element, therefore index 0 is valid
338+
head: 0,
339+
// invariant_m: we asserted that history has at least one element, therefore index 0 is valid
340+
max_pos: 0,
341+
}
342+
}
343+
}
344+
206345
#[cfg(feature = "tokio")]
207346
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
208347
impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for ProgressBarIter<W> {
@@ -213,7 +352,9 @@ impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for ProgressBarIter
213352
) -> Poll<io::Result<usize>> {
214353
Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| {
215354
poll.map(|inc| {
216-
self.progress.inc(inc as u64);
355+
let oldprog = self.progress.position();
356+
let newprog = self.dejitter.update_seq(oldprog, inc.try_into().unwrap());
357+
self.progress.set_position(newprog);
217358
inc
218359
})
219360
})
@@ -237,12 +378,14 @@ impl<W: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for ProgressBarIter<W
237378
buf: &mut ReadBuf<'_>,
238379
) -> Poll<io::Result<()>> {
239380
let prev_len = buf.filled().len() as u64;
240-
if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) {
241-
self.progress.inc(buf.filled().len() as u64 - prev_len);
242-
Poll::Ready(e)
243-
} else {
244-
Poll::Pending
381+
let poll = Pin::new(&mut self.it).poll_read(cx, buf);
382+
if let Poll::Ready(_e) = &poll {
383+
let inc = buf.filled().len() as u64 - prev_len;
384+
let oldprog = self.progress.position();
385+
let newprog = self.dejitter.update_seq(oldprog, inc);
386+
self.progress.set_position(newprog);
245387
}
388+
poll
246389
}
247390
}
248391

@@ -254,7 +397,13 @@ impl<W: tokio::io::AsyncSeek + Unpin> tokio::io::AsyncSeek for ProgressBarIter<W
254397
}
255398

256399
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
257-
Pin::new(&mut self.it).poll_complete(cx)
400+
let poll = Pin::new(&mut self.it).poll_complete(cx);
401+
if let Poll::Ready(Ok(pos)) = &poll {
402+
let newpos = self.dejitter.update_seek(*pos);
403+
self.progress.set_position(newpos);
404+
}
405+
406+
poll
258407
}
259408
}
260409

@@ -270,7 +419,9 @@ impl<W: tokio::io::AsyncBufRead + Unpin + tokio::io::AsyncRead> tokio::io::Async
270419

271420
fn consume(mut self: Pin<&mut Self>, amt: usize) {
272421
Pin::new(&mut self.it).consume(amt);
273-
self.progress.inc(amt.try_into().unwrap());
422+
let oldprog = self.progress.position();
423+
let newprog = self.dejitter.update_seq(oldprog, amt.try_into().unwrap());
424+
self.progress.set_position(newprog);
274425
}
275426
}
276427

@@ -297,14 +448,20 @@ impl<S: futures_core::Stream + Unpin> futures_core::Stream for ProgressBarIter<S
297448
impl<W: io::Write> io::Write for ProgressBarIter<W> {
298449
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
299450
self.it.write(buf).map(|inc| {
300-
self.progress.inc(inc as u64);
451+
self.progress.set_position(
452+
self.dejitter
453+
.update_seq(self.progress.position(), inc as u64),
454+
);
301455
inc
302456
})
303457
}
304458

305459
fn write_vectored(&mut self, bufs: &[io::IoSlice]) -> io::Result<usize> {
306460
self.it.write_vectored(bufs).map(|inc| {
307-
self.progress.inc(inc as u64);
461+
self.progress.set_position(
462+
self.dejitter
463+
.update_seq(self.progress.position(), inc as u64),
464+
);
308465
inc
309466
})
310467
}
@@ -320,7 +477,11 @@ impl<W: io::Write> io::Write for ProgressBarIter<W> {
320477

321478
impl<S, T: Iterator<Item = S>> ProgressIterator for T {
322479
fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self> {
323-
ProgressBarIter { it: self, progress }
480+
ProgressBarIter {
481+
it: self,
482+
progress,
483+
dejitter: MaxSeekHeuristic::default(),
484+
}
324485
}
325486
}
326487

@@ -350,4 +511,16 @@ mod test {
350511
v.iter().progress_with_style(style)
351512
});
352513
}
514+
515+
#[test]
516+
fn test_max_ring_buf() {
517+
use crate::iter::MaxRingBuf;
518+
let mut max = MaxRingBuf::<10>::default();
519+
max.update(100);
520+
assert_eq!(max.max(), 100);
521+
for i in 0..10 {
522+
max.update(99 - i);
523+
}
524+
assert_eq!(max.max(), 99);
525+
}
353526
}

src/progress_bar.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use web_time::Instant;
1515
use crate::draw_target::ProgressDrawTarget;
1616
use crate::state::{AtomicPosition, BarState, ProgressFinish, Reset, TabExpandedString};
1717
use crate::style::ProgressStyle;
18-
use crate::{ProgressBarIter, ProgressIterator, ProgressState};
18+
use crate::{iter, ProgressBarIter, ProgressIterator, ProgressState};
1919

2020
/// A progress bar or spinner
2121
///
@@ -498,6 +498,7 @@ impl ProgressBar {
498498
ProgressBarIter {
499499
progress: self.clone(),
500500
it: read,
501+
dejitter: iter::MaxSeekHeuristic::default(),
501502
}
502503
}
503504

@@ -519,6 +520,7 @@ impl ProgressBar {
519520
ProgressBarIter {
520521
progress: self.clone(),
521522
it: write,
523+
dejitter: iter::MaxSeekHeuristic::default(),
522524
}
523525
}
524526

@@ -545,6 +547,7 @@ impl ProgressBar {
545547
ProgressBarIter {
546548
progress: self.clone(),
547549
it: write,
550+
dejitter: iter::MaxSeekHeuristic::default(),
548551
}
549552
}
550553

@@ -568,6 +571,7 @@ impl ProgressBar {
568571
ProgressBarIter {
569572
progress: self.clone(),
570573
it: read,
574+
dejitter: iter::MaxSeekHeuristic::default(),
571575
}
572576
}
573577

@@ -590,6 +594,7 @@ impl ProgressBar {
590594
ProgressBarIter {
591595
progress: self.clone(),
592596
it: stream,
597+
dejitter: iter::MaxSeekHeuristic::default(),
593598
}
594599
}
595600

src/rayon.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use rayon::iter::plumbing::{Consumer, Folder, Producer, ProducerCallback, UnindexedConsumer};
22
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
33

4-
use crate::{ProgressBar, ProgressBarIter};
4+
use crate::{iter::MaxSeekHeuristic, ProgressBar, ProgressBarIter};
55

66
/// Wraps a Rayon parallel iterator.
77
///
@@ -41,7 +41,11 @@ where
4141

4242
impl<S: Send, T: ParallelIterator<Item = S>> ParallelProgressIterator for T {
4343
fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self> {
44-
ProgressBarIter { it: self, progress }
44+
ProgressBarIter {
45+
it: self,
46+
progress,
47+
dejitter: MaxSeekHeuristic::default(),
48+
}
4549
}
4650
}
4751

@@ -99,6 +103,7 @@ impl<T, P: Producer<Item = T>> Producer for ProgressProducer<P> {
99103
ProgressBarIter {
100104
it: self.base.into_iter(),
101105
progress: self.progress,
106+
dejitter: MaxSeekHeuristic::default(),
102107
}
103108
}
104109

0 commit comments

Comments
 (0)