Skip to content

Commit 2f20ec9

Browse files
committed
Cleanup iostream handling a bit
1 parent 3362205 commit 2f20ec9

File tree

2 files changed

+50
-56
lines changed

2 files changed

+50
-56
lines changed

src/docker/iostream.rs

+17-48
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ use async_stream::try_stream;
33
use bollard::container::LogOutput;
44
use bollard::errors::Error;
55
use bytes::Bytes;
6-
use std::future::pending;
76
use std::io;
87
use std::pin::Pin;
9-
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8+
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
109
use tokio::signal::unix::{signal, SignalKind};
1110
use tokio::task::JoinHandle;
1211
use tokio_stream::{empty, Stream, StreamExt};
13-
use tokio_util::io::{ReaderStream, StreamReader};
12+
use tokio_util::io::ReaderStream;
1413

1514
pub(super) enum IoStreamSource {
1615
Container(String),
@@ -38,13 +37,22 @@ impl IoStream {
3837
while let Some(output) = self.output.next().await {
3938
result.push_str(&output?.to_string());
4039
}
41-
return Ok(result);
40+
Ok(result)
4241
}
4342

4443
pub fn pipe_std(self) -> JoinHandle<()> {
45-
let stdin = stdin();
46-
let stdout = stdout();
47-
let stderr = stderr();
44+
let stdin = Box::pin(crate::util::tty_mode_guard::TtyModeGuard::new(
45+
tokio::io::stdin(),
46+
|mode| {
47+
// Switch input to raw mode, but don't touch output modes (as it can also be connected
48+
// to stdout and stderr).
49+
let outmode = mode.output_modes;
50+
mode.make_raw();
51+
mode.output_modes = outmode;
52+
},
53+
));
54+
let stdout = Box::pin(tokio::io::stdout());
55+
let stderr = Box::pin(tokio::io::stderr());
4856

4957
let resize_stream = try_stream! {
5058
let mut stream = signal(SignalKind::window_change())?;
@@ -144,45 +152,6 @@ impl IoStream {
144152
}
145153
}
146154

147-
fn stdin() -> Pin<Box<dyn tokio::io::AsyncRead + Send>> {
148-
if let Ok(stdin) = try_stdin() {
149-
stdin
150-
} else {
151-
let stream = async_stream::stream! {
152-
yield pending::<std::io::Result<bytes::BytesMut>>().await;
153-
};
154-
Box::pin(StreamReader::new(stream))
155-
}
156-
}
157-
158-
fn try_stdin() -> Result<Pin<Box<dyn AsyncRead + Send>>> {
159-
let mut stdin = crate::util::tty_mode_guard::TtyModeGuard::new(tokio::io::stdin(), |mode| {
160-
// Switch input to raw mode, but don't touch output modes (as it can also be connected
161-
// to stdout and stderr).
162-
let outmode = mode.output_modes;
163-
mode.make_raw();
164-
mode.output_modes = outmode;
165-
})?;
166-
let stream = async_stream::stream! {
167-
loop {
168-
let mut buf = bytes::BytesMut::with_capacity(1024);
169-
match stdin.read_buf(&mut buf).await {
170-
Ok(_) => yield Ok(buf),
171-
Err(err) => yield Err(err),
172-
}
173-
}
174-
};
175-
Ok(Box::pin(StreamReader::new(stream)))
176-
}
177-
178-
fn stdout() -> Pin<Box<dyn tokio::io::AsyncWrite + Send>> {
179-
Box::pin(tokio::io::stdout())
180-
}
181-
182-
fn stderr() -> Pin<Box<dyn tokio::io::AsyncWrite + Send>> {
183-
Box::pin(tokio::io::stderr())
184-
}
185-
186155
async fn resize_tty(
187156
docker: &bollard::Docker,
188157
source: &IoStreamSource,
@@ -194,14 +163,14 @@ async fn resize_tty(
194163
height: rows,
195164
width: cols,
196165
};
197-
docker.resize_container_tty(&id, options).await?;
166+
docker.resize_container_tty(id, options).await?;
198167
}
199168
IoStreamSource::Exec(id) => {
200169
let options = bollard::exec::ResizeExecOptions {
201170
height: rows,
202171
width: cols,
203172
};
204-
docker.resize_exec(&id, options).await?;
173+
docker.resize_exec(id, options).await?;
205174
}
206175
};
207176
Ok(())

src/util/tty_mode_guard.rs

+33-8
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
use std::io::{IsTerminal, Result};
66
use std::ops::{Deref, DerefMut};
77
use std::os::fd::{AsFd, BorrowedFd};
8+
use std::pin::Pin;
89

910
use rustix::termios::{self, Termios};
11+
use tokio::io::AsyncRead;
1012

1113
/// TTY mode guard over any file descriptor.
1214
///
@@ -37,28 +39,51 @@ impl<T: AsFd> AsFd for TtyModeGuard<T> {
3739
}
3840
}
3941

42+
impl<T: AsFd + AsyncRead> AsyncRead for TtyModeGuard<T> {
43+
fn poll_read(
44+
self: Pin<&mut Self>,
45+
cx: &mut std::task::Context<'_>,
46+
buf: &mut tokio::io::ReadBuf<'_>,
47+
) -> std::task::Poll<std::io::Result<()>> {
48+
// SAFETY: structural projection
49+
unsafe { self.map_unchecked_mut(|x| &mut x.fd) }.poll_read(cx, buf)
50+
}
51+
}
52+
4053
impl<T: AsFd> TtyModeGuard<T> {
41-
pub fn new(fd: T, mode: impl FnOnce(&mut Termios)) -> Result<Self> {
54+
pub fn new(fd: T, mode: impl FnOnce(&mut Termios)) -> Self {
4255
let termios = if fd.as_fd().is_terminal() {
43-
let termios = termios::tcgetattr(&fd)?;
56+
let termios: Result<_> = (|| {
57+
let termios = termios::tcgetattr(&fd)?;
58+
59+
let mut new_termios = termios.clone();
60+
mode(&mut new_termios);
61+
termios::tcsetattr(&fd, termios::OptionalActions::Now, &new_termios)?;
4462

45-
let mut new_termios = termios.clone();
46-
mode(&mut new_termios);
47-
termios::tcsetattr(&fd, termios::OptionalActions::Now, &new_termios)?;
63+
Ok(termios)
64+
})();
4865

49-
Some(termios)
66+
match termios {
67+
Ok(termios) => Some(termios),
68+
Err(err) => {
69+
log::warn!("Failed to set terminal mode: {}", err);
70+
None
71+
}
72+
}
5073
} else {
5174
None
5275
};
5376

54-
Ok(Self { termios, fd })
77+
Self { termios, fd }
5578
}
5679
}
5780

5881
impl<T: AsFd> Drop for TtyModeGuard<T> {
5982
fn drop(&mut self) {
6083
if let Some(termios) = self.termios.as_ref() {
61-
termios::tcsetattr(&self.fd, termios::OptionalActions::Now, termios).unwrap();
84+
if let Err(err) = termios::tcsetattr(&self.fd, termios::OptionalActions::Now, termios) {
85+
log::warn!("Failed to restore terminal mode: {}", err);
86+
}
6287
}
6388
}
6489
}

0 commit comments

Comments
 (0)