Skip to content

Commit 1c73f6e

Browse files
committed
Revert "vmm: bugfix of blocking task in spawn_blocking"
This reverts commit eda0c7c.
1 parent 0ecff42 commit 1c73f6e

File tree

3 files changed

+48
-38
lines changed

3 files changed

+48
-38
lines changed

vmm/sandbox/src/client.rs

+40-30
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ limitations under the License.
1515
*/
1616

1717
use std::{
18-
io::{BufRead, BufReader, Write},
19-
os::unix::{
20-
io::{IntoRawFd, RawFd},
21-
net::UnixStream,
22-
},
18+
os::unix::io::{IntoRawFd, RawFd},
2319
time::Duration,
2420
};
2521

@@ -34,12 +30,17 @@ use nix::{
3430
time::{clock_gettime, ClockId},
3531
unistd::close,
3632
};
37-
use tokio::time::timeout;
33+
use tokio::{
34+
io::{AsyncReadExt, AsyncWriteExt},
35+
net::UnixStream,
36+
time::timeout,
37+
};
3838
use ttrpc::{context::with_timeout, r#async::Client};
3939
use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient};
4040

4141
use crate::network::{NetworkInterface, Route};
4242

43+
const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10;
4344
const TIME_SYNC_PERIOD: u64 = 60;
4445
const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10;
4546

@@ -67,11 +68,7 @@ async fn new_ttrpc_client(address: &str) -> Result<Client> {
6768

6869
let client = timeout(Duration::from_secs(ctx_timeout), fut)
6970
.await
70-
.map_err(|_| {
71-
let e = anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err);
72-
error!("{}", e);
73-
e
74-
})?;
71+
.map_err(|_| anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err))?;
7572
Ok(client)
7673
}
7774

@@ -165,28 +162,41 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
165162
if v.len() < 2 {
166163
return Err(anyhow!("hvsock address {} should not less than 2", address).into());
167164
}
168-
(v[0].to_string(), v[1].to_string())
165+
(v[0], v[1])
169166
};
170167

171-
tokio::task::spawn_blocking(move || {
172-
let mut stream =
173-
UnixStream::connect(&addr).map_err(|e| anyhow!("failed to connect hvsock: {}", e))?;
174-
stream
175-
.write_all(format!("CONNECT {}\n", port).as_bytes())
176-
.map_err(|e| anyhow!("hvsock connected but failed to write CONNECT: {}", e))?;
177-
178-
let mut response = String::new();
179-
BufReader::new(&stream)
180-
.read_line(&mut response)
181-
.map_err(|e| anyhow!("CONNECT sent but failed to get response: {}", e))?;
182-
if response.starts_with("OK") {
183-
Ok(stream.into_raw_fd())
184-
} else {
185-
Err(anyhow!("CONNECT sent but response is not OK: {}", response).into())
168+
let fut = async {
169+
let mut stream = UnixStream::connect(addr).await?;
170+
171+
match stream.write(format!("CONNECT {}\n", port).as_bytes()).await {
172+
Ok(_) => {
173+
let mut buf = [0; 4096];
174+
match stream.read(&mut buf).await {
175+
Ok(0) => Err(anyhow!("stream closed")),
176+
Ok(n) => {
177+
if String::from_utf8(buf[..n].to_vec())
178+
.unwrap_or_default()
179+
.contains("OK")
180+
{
181+
return Ok(stream.into_std()?.into_raw_fd());
182+
}
183+
Err(anyhow!("failed to connect"))
184+
}
185+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
186+
Err(anyhow!("{}", e))
187+
}
188+
Err(e) => Err(anyhow!("failed to read from hvsock: {}", e)),
189+
}
190+
}
191+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Err(anyhow!("{}", e)),
192+
Err(e) => Err(anyhow!("failed to write CONNECT to hvsock: {}", e)),
186193
}
187-
})
188-
.await
189-
.map_err(|e| anyhow!("failed to spawn blocking task: {}", e))?
194+
.map_err(Error::Other)
195+
};
196+
197+
timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut)
198+
.await
199+
.map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))?
190200
}
191201

192202
pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result<UnixAddr> {

vmm/sandbox/src/cloud_hypervisor/client.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ limitations under the License.
1616

1717
use std::{
1818
os::unix::net::UnixStream,
19+
path::Path,
1920
thread::sleep,
2021
time::{Duration, SystemTime},
2122
};
2223

2324
use anyhow::anyhow;
2425
use api_client::{simple_api_command, simple_api_full_command_with_fds_and_response};
2526
use containerd_sandbox::error::Result;
26-
use log::error;
27+
use log::{error, trace};
2728

2829
use crate::{
2930
cloud_hypervisor::devices::{block::DiskConfig, AddDeviceResponse, RemoveDeviceRequest},
@@ -37,26 +38,25 @@ pub struct ChClient {
3738
}
3839

3940
impl ChClient {
40-
pub async fn new(socket_path: String) -> Result<Self> {
41+
pub fn new<P: AsRef<Path>>(socket_path: P) -> Result<Self> {
4142
let start_time = SystemTime::now();
42-
tokio::task::spawn_blocking(move || loop {
43+
loop {
4344
match UnixStream::connect(&socket_path) {
4445
Ok(socket) => {
4546
return Ok(Self { socket });
4647
}
4748
Err(e) => {
49+
trace!("failed to create client: {:?}", e);
4850
if start_time.elapsed().unwrap().as_secs()
4951
> CLOUD_HYPERVISOR_START_TIMEOUT_IN_SEC
5052
{
51-
error!("failed to connect api server: {:?}", e);
53+
error!("failed to create client: {:?}", e);
5254
return Err(anyhow!("timeout connect client, {}", e).into());
5355
}
5456
sleep(Duration::from_millis(10));
5557
}
5658
}
57-
})
58-
.await
59-
.map_err(|e| anyhow!("failed to spawn a task {}", e))?
59+
}
6060
}
6161

6262
pub fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<String> {

vmm/sandbox/src/cloud_hypervisor/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl CloudHypervisorVM {
113113
}
114114

115115
async fn create_client(&self) -> Result<ChClient> {
116-
ChClient::new(self.config.api_socket.to_string()).await
116+
ChClient::new(&self.config.api_socket)
117117
}
118118

119119
fn get_client(&mut self) -> Result<&mut ChClient> {

0 commit comments

Comments
 (0)