|
| 1 | +use core::{future, pin::Pin}; |
| 2 | + |
1 | 3 | use crate::{ |
| 4 | + error::{FlushError, FullError}, |
2 | 5 | io::{Destination, Seek, Truncate}, |
3 | | - result::FlushResult, |
| 6 | + result::{FlushResult, LostResult}, |
4 | 7 | }; |
5 | 8 |
|
6 | 9 | /// [`slice`] buffered sender. |
@@ -38,13 +41,52 @@ where |
38 | 41 | /// |
39 | 42 | /// May not send the full amount of bytes until either the buffer is full or |
40 | 43 | /// [`flush()`](Self::flush()) is called. |
41 | | - pub async fn send(&mut self, bytes: &[u8]) -> FlushResult { |
42 | | - todo!("{bytes:?}") // FIXME |
| 44 | + pub async fn send(&mut self, bytes: &[u8]) -> LostResult<usize> { |
| 45 | + let mut total_sent = 0; |
| 46 | + |
| 47 | + for byte in bytes.iter().cloned() { |
| 48 | + if self.cursor == BUF { |
| 49 | + self.cursor = 0; |
| 50 | + |
| 51 | + let sent = future::poll_fn(|cx| { |
| 52 | + Pin::new(&mut self.destination) |
| 53 | + .poll_send(cx, self.buffer.as_ref()) |
| 54 | + }) |
| 55 | + .await?; |
| 56 | + |
| 57 | + total_sent += sent; |
| 58 | + |
| 59 | + if sent != BUF { |
| 60 | + return Ok(total_sent); |
| 61 | + } |
| 62 | + } |
| 63 | + |
| 64 | + self.buffer[self.cursor] = byte; |
| 65 | + self.cursor += 1; |
| 66 | + } |
| 67 | + |
| 68 | + Ok(total_sent) |
43 | 69 | } |
44 | 70 |
|
45 | 71 | /// Send buffered data with the destination. |
46 | 72 | pub async fn flush(&mut self) -> FlushResult { |
47 | | - todo!() // FIXME |
| 73 | + let old_cursor = self.cursor; |
| 74 | + let sent = future::poll_fn(|cx| { |
| 75 | + Pin::new(&mut self.destination) |
| 76 | + .poll_send(cx, &self.buffer[..self.cursor]) |
| 77 | + }) |
| 78 | + .await?; |
| 79 | + |
| 80 | + self.cursor -= sent; |
| 81 | + |
| 82 | + if self.cursor != 0 { |
| 83 | + self.buffer.copy_within(sent..old_cursor, 0); |
| 84 | + return Err(FlushError::Full(FullError::from_remaining( |
| 85 | + self.cursor, |
| 86 | + ))); |
| 87 | + } |
| 88 | + |
| 89 | + Ok(()) |
48 | 90 | } |
49 | 91 | } |
50 | 92 |
|
|
0 commit comments