Skip to content

Commit cbb0cfc

Browse files
committed
EncoderWriter: no longer adhere to ‘at most one write’
The wording around Write::write method is changing with requirement that it maps to ‘at most one write’ being removed¹. With that, change EncoderWriter::write so that it flushes entire output buffer at the beginning and then proceeds to process new input. This eliminates returning Ok(0) which is effectively an error. Also, change accounting for the occupied portion of the output buffer. Rather than just having occupied length, track occupied range which means moving data to front is no longer necessary. ¹ rust-lang/rust#107200
1 parent 92e94d2 commit cbb0cfc

File tree

2 files changed

+98
-124
lines changed

2 files changed

+98
-124
lines changed

src/write/encoder.rs

+76-110
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use crate::engine::Engine;
2-
use std::{
3-
cmp, fmt, io,
4-
io::{ErrorKind, Result},
5-
};
2+
use std::{cmp, fmt, io};
63

74
pub(crate) const BUF_SIZE: usize = 1024;
85
/// The most bytes whose encoding will fit in `BUF_SIZE`
@@ -74,21 +71,24 @@ pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
7471
/// Buffer to encode into. May hold leftover encoded bytes from a previous write call that the underlying writer
7572
/// did not write last time.
7673
output: [u8; BUF_SIZE],
77-
/// How much of `output` is occupied with encoded data that couldn't be written last time
78-
output_occupied_len: usize,
74+
/// Occupied portion of output.
75+
output_range: std::ops::Range<usize>,
7976
/// panic safety: don't write again in destructor if writer panicked while we were writing to it
8077
panicked: bool,
8178
}
8279

8380
impl<'e, E: Engine, W: io::Write> fmt::Debug for EncoderWriter<'e, E, W> {
8481
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
82+
let range = self.output_range.clone();
83+
let truncated_len = range.len().min(5);
84+
let truncated_range = range.start..range.start + truncated_len;
8585
write!(
8686
f,
87-
"extra_input: {:?} extra_input_occupied_len:{:?} output[..5]: {:?} output_occupied_len: {:?}",
88-
self.extra_input,
89-
self.extra_input_occupied_len,
90-
&self.output[0..5],
91-
self.output_occupied_len
87+
"extra_input: {:?} occupied output[..{}]: {:?} output_range: {:?}",
88+
&self.extra_input[..self.extra_input_occupied_len],
89+
truncated_len,
90+
&self.output[truncated_range],
91+
range,
9292
)
9393
}
9494
}
@@ -102,7 +102,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
102102
extra_input: [0u8; MIN_ENCODE_CHUNK_SIZE],
103103
extra_input_occupied_len: 0,
104104
output: [0u8; BUF_SIZE],
105-
output_occupied_len: 0,
105+
output_range: 0..0,
106106
panicked: false,
107107
}
108108
}
@@ -123,7 +123,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
123123
/// # Errors
124124
///
125125
/// The first error that is not of `ErrorKind::Interrupted` will be returned.
126-
pub fn finish(&mut self) -> Result<W> {
126+
pub fn finish(&mut self) -> io::Result<W> {
127127
// If we could consume self in finish(), we wouldn't have to worry about this case, but
128128
// finish() is retryable in the face of I/O errors, so we can't consume here.
129129
if self.delegate.is_none() {
@@ -138,91 +138,72 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
138138
}
139139

140140
/// Write any remaining buffered data to the delegate writer.
141-
fn write_final_leftovers(&mut self) -> Result<()> {
141+
fn write_final_leftovers(&mut self) -> io::Result<()> {
142142
if self.delegate.is_none() {
143143
// finish() has already successfully called this, and we are now in drop() with a None
144144
// writer, so just no-op
145145
return Ok(());
146146
}
147147

148-
self.write_all_encoded_output()?;
149-
150148
if self.extra_input_occupied_len > 0 {
149+
// Make sure output isn’t full so we can append to it.
150+
if self.output_range.end == self.output.len() {
151+
self.flush_output()?;
152+
}
153+
151154
let encoded_len = self
152155
.engine
153156
.encode_slice(
154157
&self.extra_input[..self.extra_input_occupied_len],
155-
&mut self.output[..],
158+
&mut self.output[self.output_range.end..],
156159
)
157160
.expect("buffer is large enough");
158161

159-
self.output_occupied_len = encoded_len;
160-
161-
self.write_all_encoded_output()?;
162-
163-
// write succeeded, do not write the encoding of extra again if finish() is retried
162+
self.output_range.end += encoded_len;
164163
self.extra_input_occupied_len = 0;
165164
}
166165

167-
Ok(())
166+
self.flush_output()
168167
}
169168

170-
/// Write as much of the encoded output to the delegate writer as it will accept, and store the
171-
/// leftovers to be attempted at the next write() call. Updates `self.output_occupied_len`.
169+
/// Flushes output buffer to the delegate.
172170
///
173-
/// # Errors
174-
///
175-
/// Errors from the delegate writer are returned. In the case of an error,
176-
/// `self.output_occupied_len` will not be updated, as errors from `write` are specified to mean
177-
/// that no write took place.
178-
fn write_to_delegate(&mut self, current_output_len: usize) -> Result<()> {
171+
/// Loops writing data to the delegate until output buffer is empty or
172+
/// delegate returns an error. A `Ok(0)` return from the delegate is
173+
/// treated as an error. Updates `output_range` accordingly.
174+
fn flush_output(&mut self) -> io::Result<()> {
175+
if self.output_range.is_empty() {
176+
return Ok(());
177+
}
178+
loop {
179+
match self.write_to_delegate(self.output_range.clone()) {
180+
Ok(0) => {
181+
break Err(io::Error::new(
182+
io::ErrorKind::WriteZero,
183+
"failed to write whole buffer",
184+
))
185+
}
186+
Ok(n) if n == self.output_range.len() => {
187+
self.output_range = 0..0;
188+
break Ok(());
189+
}
190+
Ok(n) => self.output_range.start += n,
191+
Err(err) => break Err(err),
192+
}
193+
}
194+
}
195+
196+
/// Writes given range of output buffer to the delegate. Performs exactly
197+
/// one write. Sets `panicked` to `true` if delegate panics.
198+
fn write_to_delegate(&mut self, range: std::ops::Range<usize>) -> io::Result<usize> {
179199
self.panicked = true;
180200
let res = self
181201
.delegate
182202
.as_mut()
183-
.expect("Writer must be present")
184-
.write(&self.output[..current_output_len]);
203+
.expect("Encoder has already had finish() called")
204+
.write(&self.output[range]);
185205
self.panicked = false;
186-
187-
res.map(|consumed| {
188-
debug_assert!(consumed <= current_output_len);
189-
190-
if consumed < current_output_len {
191-
self.output_occupied_len = current_output_len.checked_sub(consumed).unwrap();
192-
// If we're blocking on I/O, the minor inefficiency of copying bytes to the
193-
// start of the buffer is the least of our concerns...
194-
// TODO Rotate moves more than we need to; copy_within now stable.
195-
self.output.rotate_left(consumed);
196-
} else {
197-
self.output_occupied_len = 0;
198-
}
199-
})
200-
}
201-
202-
/// Write all buffered encoded output. If this returns `Ok`, `self.output_occupied_len` is `0`.
203-
///
204-
/// This is basically write_all for the remaining buffered data but without the undesirable
205-
/// abort-on-`Ok(0)` behavior.
206-
///
207-
/// # Errors
208-
///
209-
/// Any error emitted by the delegate writer abort the write loop and is returned, unless it's
210-
/// `Interrupted`, in which case the error is ignored and writes will continue.
211-
fn write_all_encoded_output(&mut self) -> Result<()> {
212-
while self.output_occupied_len > 0 {
213-
let remaining_len = self.output_occupied_len;
214-
match self.write_to_delegate(remaining_len) {
215-
// try again on interrupts ala write_all
216-
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
217-
// other errors return
218-
Err(e) => return Err(e),
219-
// success no-ops because remaining length is already updated
220-
Ok(_) => {}
221-
};
222-
}
223-
224-
debug_assert_eq!(0, self.output_occupied_len);
225-
Ok(())
206+
res
226207
}
227208

228209
/// Unwraps this `EncoderWriter`, returning the base writer it writes base64 encoded output
@@ -262,38 +243,22 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
262243
/// # Errors
263244
///
264245
/// Any errors emitted by the delegate writer are returned.
265-
fn write(&mut self, input: &[u8]) -> Result<usize> {
246+
fn write(&mut self, input: &[u8]) -> io::Result<usize> {
266247
if self.delegate.is_none() {
267248
panic!("Cannot write more after calling finish()");
268249
}
269250

251+
self.flush_output()?;
252+
debug_assert_eq!(0, self.output_range.len());
253+
270254
if input.is_empty() {
271255
return Ok(0);
272256
}
273257

274-
// The contract of `Write::write` places some constraints on this implementation:
275-
// - a call to `write()` represents at most one call to a wrapped `Write`, so we can't
276-
// iterate over the input and encode multiple chunks.
277-
// - Errors mean that "no bytes were written to this writer", so we need to reset the
278-
// internal state to what it was before the error occurred
279-
280-
// before reading any input, write any leftover encoded output from last time
281-
if self.output_occupied_len > 0 {
282-
let current_len = self.output_occupied_len;
283-
return self
284-
.write_to_delegate(current_len)
285-
// did not read any input
286-
.map(|_| 0);
287-
}
288-
289-
debug_assert_eq!(0, self.output_occupied_len);
290-
291258
// how many bytes, if any, were read into `extra` to create a triple to encode
292259
let mut extra_input_read_len = 0;
293260
let mut input = input;
294261

295-
let orig_extra_len = self.extra_input_occupied_len;
296-
297262
let mut encoded_size = 0;
298263
// always a multiple of MIN_ENCODE_CHUNK_SIZE
299264
let mut max_input_len = MAX_INPUT_LEN;
@@ -322,8 +287,10 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
322287

323288
input = &input[extra_input_read_len..];
324289

325-
// consider extra to be used up, since we encoded it
326-
self.extra_input_occupied_len = 0;
290+
// Note: Not updating self.extra_input_occupied_len yet. It’s
291+
// going to be zeroed at the end of the function if we
292+
// successfully write some data to delegate.
293+
327294
// don't clobber where we just encoded to
328295
encoded_size = 4;
329296
// and don't read more than can be encoded
@@ -367,29 +334,28 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
367334
&mut self.output[encoded_size..],
368335
);
369336

370-
// not updating `self.output_occupied_len` here because if the below write fails, it should
371-
// "never take place" -- the buffer contents we encoded are ignored and perhaps retried
372-
// later, if the consumer chooses.
337+
// Not updating `self.output_range` here because if the write fails, it
338+
// should "never take place" -- the buffer contents we encoded are
339+
// ignored and perhaps retried later, if the consumer chooses.
373340

374-
self.write_to_delegate(encoded_size)
375-
// no matter whether we wrote the full encoded buffer or not, we consumed the same
376-
// input
377-
.map(|_| extra_input_read_len + input_chunks_to_encode_len)
378-
.map_err(|e| {
379-
// in case we filled and encoded `extra`, reset extra_len
380-
self.extra_input_occupied_len = orig_extra_len;
381-
382-
e
383-
})
341+
self.write_to_delegate(0..encoded_size).map(|written| {
342+
if written < encoded_size {
343+
self.output_range = written..encoded_size;
344+
} else {
345+
debug_assert_eq!(0, self.output_range.len());
346+
}
347+
self.extra_input_occupied_len = 0;
348+
extra_input_read_len + input_chunks_to_encode_len
349+
})
384350
}
385351

386352
/// Because this is usually treated as OK to call multiple times, it will *not* flush any
387353
/// incomplete chunks of input or write padding.
388354
/// # Errors
389355
///
390356
/// The first error that is not of [`ErrorKind::Interrupted`] will be returned.
391-
fn flush(&mut self) -> Result<()> {
392-
self.write_all_encoded_output()?;
357+
fn flush(&mut self) -> io::Result<()> {
358+
self.flush_output()?;
393359
self.delegate
394360
.as_mut()
395361
.expect("Writer must be present")

src/write/encoder_tests.rs

+22-14
Original file line numberDiff line numberDiff line change
@@ -412,17 +412,21 @@ fn writes_that_only_write_part_of_input_and_sometimes_interrupt_produce_correct_
412412

413413
// retry on interrupt
414414
match res {
415-
Ok(len) => bytes_consumed += len,
416-
Err(e) => match e.kind() {
417-
io::ErrorKind::Interrupted => continue,
418-
_ => {
419-
panic!("should not see other errors");
420-
}
421-
},
415+
Ok(0) => assert_eq!(0, input_len),
416+
Ok(len) => {
417+
assert!(len <= input_len);
418+
bytes_consumed += len;
419+
}
420+
Err(e) => assert_eq!(io::ErrorKind::Interrupted, e.kind()),
422421
}
423422
}
424423

425-
let _ = stream_encoder.finish().unwrap();
424+
loop {
425+
match stream_encoder.finish() {
426+
Ok(_) => break,
427+
Err(e) => assert_eq!(io::ErrorKind::Interrupted, e.kind()),
428+
}
429+
}
426430

427431
assert_eq!(orig_len, bytes_consumed);
428432
}
@@ -506,15 +510,15 @@ struct InterruptingWriter<'a, W: 'a + Write, R: 'a + Rng> {
506510

507511
impl<'a, W: Write, R: Rng> Write for InterruptingWriter<'a, W, R> {
508512
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
509-
if self.rng.gen_range(0.0..1.0) <= self.fraction {
513+
if self.rng.gen_bool(self.fraction) {
510514
return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted"));
511515
}
512516

513517
self.w.write(buf)
514518
}
515519

516520
fn flush(&mut self) -> io::Result<()> {
517-
if self.rng.gen_range(0.0..1.0) <= self.fraction {
521+
if self.rng.gen_bool(self.fraction) {
518522
return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted"));
519523
}
520524

@@ -534,17 +538,21 @@ struct PartialInterruptingWriter<'a, W: 'a + Write, R: 'a + Rng> {
534538

535539
impl<'a, W: Write, R: Rng> Write for PartialInterruptingWriter<'a, W, R> {
536540
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
537-
if self.rng.gen_range(0.0..1.0) > self.no_interrupt_fraction {
541+
if !self.rng.gen_bool(self.no_interrupt_fraction) {
538542
return Err(io::Error::new(io::ErrorKind::Interrupted, "interrupted"));
539543
}
540544

541-
if self.rng.gen_range(0.0..1.0) <= self.full_input_fraction || buf.is_empty() {
545+
if buf.len() <= 1 || self.rng.gen_bool(self.full_input_fraction) {
542546
// pass through the buf untouched
543547
self.w.write(buf)
544548
} else {
545549
// only use a prefix of it
546-
self.w
547-
.write(&buf[0..(self.rng.gen_range(0..(buf.len() - 1)))])
550+
let end = if buf.len() == 2 {
551+
1
552+
} else {
553+
self.rng.gen_range(1..(buf.len() - 1))
554+
};
555+
self.w.write(&buf[..end])
548556
}
549557
}
550558

0 commit comments

Comments
 (0)