Skip to content

Commit da26e5d

Browse files
committed
Tidy up iostream handling
1 parent 2f20ec9 commit da26e5d

File tree

1 file changed

+38
-67
lines changed

1 file changed

+38
-67
lines changed

Diff for: src/docker/iostream.rs

+38-67
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use async_stream::try_stream;
33
use bollard::container::LogOutput;
44
use bollard::errors::Error;
55
use bytes::Bytes;
6-
use std::io;
7-
use std::pin::Pin;
6+
use std::pin::{pin, Pin};
87
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
98
use tokio::signal::unix::{signal, SignalKind};
109
use tokio::task::JoinHandle;
11-
use tokio_stream::{empty, Stream, StreamExt};
10+
use tokio_stream::{Stream, StreamExt};
1211
use tokio_util::io::ReaderStream;
1312

1413
pub(super) enum IoStreamSource {
@@ -28,7 +27,6 @@ enum StreamData {
2827
StdIn(Bytes),
2928
StdOut(Bytes),
3029
StdErr(Bytes),
31-
Stop,
3230
}
3331

3432
impl IoStream {
@@ -40,19 +38,16 @@ impl IoStream {
4038
Ok(result)
4139
}
4240

43-
pub fn pipe_std(self) -> JoinHandle<()> {
44-
let stdin = Box::pin(crate::util::tty_mode_guard::TtyModeGuard::new(
45-
tokio::io::stdin(),
46-
|mode| {
47-
// Switch input to raw mode, but don't touch output modes (as it can also be connected
48-
// to stdout and stderr).
49-
let outmode = mode.output_modes;
50-
mode.make_raw();
51-
mode.output_modes = outmode;
52-
},
53-
));
54-
let stdout = Box::pin(tokio::io::stdout());
55-
let stderr = Box::pin(tokio::io::stderr());
41+
pub fn pipe_std(self) -> JoinHandle<Result<()>> {
42+
let stdin = crate::util::tty_mode_guard::TtyModeGuard::new(tokio::io::stdin(), |mode| {
43+
// Switch input to raw mode, but don't touch output modes (as it can also be connected
44+
// to stdout and stderr).
45+
let outmode = mode.output_modes;
46+
mode.make_raw();
47+
mode.output_modes = outmode;
48+
});
49+
let stdout = tokio::io::stdout();
50+
let stderr = tokio::io::stderr();
5651

5752
let resize_stream = try_stream! {
5853
let mut stream = signal(SignalKind::window_change())?;
@@ -70,56 +65,39 @@ impl IoStream {
7065

7166
pub fn pipe(
7267
self,
73-
stdin: Pin<Box<dyn AsyncRead + Send + 'static>>,
74-
mut stdout: Pin<Box<dyn AsyncWrite + Send + 'static>>,
75-
mut stderr: Pin<Box<dyn AsyncWrite + Send + 'static>>,
76-
resize_stream: impl Stream<Item = io::Result<(u16, u16)>> + Send + 'static,
77-
) -> JoinHandle<()> {
68+
stdin: impl AsyncRead + Send + 'static,
69+
stdout: impl AsyncWrite + Send + 'static,
70+
stderr: impl AsyncWrite + Send + 'static,
71+
resize_stream: impl Stream<Item = std::io::Result<(u16, u16)>> + Send + 'static,
72+
) -> JoinHandle<Result<()>> {
7873
let mut input = self.input;
79-
let mut output = self.output;
8074
let docker = self.docker;
8175
let source = self.source;
8276

83-
let resize_stream = async_stream::stream! {
84-
tokio::pin!(resize_stream);
85-
while let Some(data) = resize_stream.next().await {
86-
yield match data {
87-
Ok((rows, cols)) => Ok(StreamData::Resize(rows, cols)),
88-
Err(err) => Err(err).context("Listening for tty resize"),
89-
};
90-
}
91-
};
77+
let resize_stream = resize_stream.map(|data| {
78+
let (rows, cols) = data.context("Listening for tty resize")?;
79+
Ok(StreamData::Resize(rows, cols))
80+
});
9281

93-
let input_stream = async_stream::stream! {
94-
let mut stdin = ReaderStream::new(stdin);
95-
while let Some(data) = stdin.next().await {
96-
yield match data {
97-
Ok(buf) => Ok(StreamData::StdIn(buf)),
98-
Err(err) => Err(err).context("Reading container input stream"),
99-
};
100-
}
101-
};
82+
let input_stream = ReaderStream::new(stdin).map(|data| {
83+
Ok(StreamData::StdIn(
84+
data.context("Reading container input stream")?,
85+
))
86+
});
10287

103-
let output_stream = async_stream::stream! {
104-
while let Some(output) = output.next().await {
105-
yield match output {
106-
Ok(LogOutput::Console{message}) => Ok(StreamData::StdOut(message)),
107-
Ok(LogOutput::StdOut{message}) => Ok(StreamData::StdOut(message)),
108-
Ok(LogOutput::StdErr{message}) => Ok(StreamData::StdErr(message)),
109-
Err(err) => Err(err).context("Reading container output stream"),
110-
_ => continue,
111-
};
112-
}
113-
yield Ok(StreamData::Stop);
114-
};
88+
let output_stream = self.output.filter_map(|output| match output {
89+
Ok(LogOutput::Console { message }) => Some(Ok(StreamData::StdOut(message))),
90+
Ok(LogOutput::StdOut { message }) => Some(Ok(StreamData::StdOut(message))),
91+
Ok(LogOutput::StdErr { message }) => Some(Ok(StreamData::StdErr(message))),
92+
Err(err) => Some(Err(err).context("Reading container output stream")),
93+
_ => None,
94+
});
11595

116-
let streams = empty()
117-
.merge(resize_stream)
118-
.merge(input_stream)
119-
.merge(output_stream);
96+
tokio::spawn(async move {
97+
let mut streams = pin!(resize_stream.merge(input_stream).merge(output_stream));
98+
let mut stdout = pin!(stdout);
99+
let mut stderr = pin!(stderr);
120100

121-
let stream = async_stream::try_stream! {
122-
tokio::pin!(streams);
123101
while let Some(data) = streams.next().await {
124102
match data? {
125103
StreamData::Resize(rows, cols) => {
@@ -137,17 +115,10 @@ impl IoStream {
137115
stderr.write_all_buf(&mut buf).await?;
138116
stdout.flush().await?;
139117
}
140-
StreamData::Stop => {
141-
break
142-
}
143118
};
144-
yield ();
145119
}
146-
};
147120

148-
tokio::spawn(async move {
149-
tokio::pin!(stream);
150-
let _ = stream.all(|_: Result<()>| true).await;
121+
Ok(())
151122
})
152123
}
153124
}

0 commit comments

Comments
 (0)