@@ -31,7 +31,7 @@ use nix::{
31
31
unistd:: close,
32
32
} ;
33
33
use tokio:: {
34
- io:: { AsyncReadExt , AsyncWriteExt } ,
34
+ io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ,
35
35
net:: UnixStream ,
36
36
time:: timeout,
37
37
} ;
@@ -41,17 +41,17 @@ use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient};
41
41
use crate :: network:: { NetworkInterface , Route } ;
42
42
43
43
const HVSOCK_RETRY_TIMEOUT_IN_MS : u64 = 10 ;
44
+ // TODO: reduce to 10s
45
+ const NEW_TTRPC_CLIENT_TIMEOUT : u64 = 45 ;
44
46
const TIME_SYNC_PERIOD : u64 = 60 ;
45
47
const TIME_DIFF_TOLERANCE_IN_MS : u64 = 10 ;
46
48
47
49
pub ( crate ) async fn new_sandbox_client ( address : & str ) -> Result < SandboxServiceClient > {
48
- let client = new_ttrpc_client ( address) . await ?;
50
+ let client = new_ttrpc_client_with_timeout ( address, NEW_TTRPC_CLIENT_TIMEOUT ) . await ?;
49
51
Ok ( SandboxServiceClient :: new ( client) )
50
52
}
51
53
52
- async fn new_ttrpc_client ( address : & str ) -> Result < Client > {
53
- let ctx_timeout = 10 ;
54
-
54
+ async fn new_ttrpc_client_with_timeout ( address : & str , t : u64 ) -> Result < Client > {
55
55
let mut last_err = Error :: Other ( anyhow ! ( "" ) ) ;
56
56
57
57
let fut = async {
@@ -63,12 +63,17 @@ async fn new_ttrpc_client(address: &str) -> Result<Client> {
63
63
}
64
64
Err ( e) => last_err = e,
65
65
}
66
+ // In case that the address doesn't exist, the executed function in this loop are all
67
+ // sync, making the first time of future poll in timeout hang forever. As a result, the
68
+ // timeout will hang too. To solve this, add a async function in this loop or call
69
+ // `tokio::task::yield_now()` to give up current cpu time slice.
70
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
66
71
}
67
72
} ;
68
73
69
- let client = timeout ( Duration :: from_secs ( ctx_timeout ) , fut)
74
+ let client = timeout ( Duration :: from_secs ( t ) , fut)
70
75
. await
71
- . map_err ( |_| anyhow ! ( "{}s timeout connecting socket: {}" , ctx_timeout , last_err) ) ?;
76
+ . map_err ( |_| anyhow ! ( "{}s timeout connecting socket: {}" , t , last_err) ) ?;
72
77
Ok ( client)
73
78
}
74
79
@@ -167,31 +172,21 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
167
172
168
173
let fut = async {
169
174
let mut stream = UnixStream :: connect ( addr) . await ?;
170
-
171
- match stream. write ( format ! ( "CONNECT {}\n " , port) . as_bytes ( ) ) . await {
172
- Ok ( _) => {
173
- let mut buf = [ 0 ; 4096 ] ;
174
- match stream. read ( & mut buf) . await {
175
- Ok ( 0 ) => Err ( anyhow ! ( "stream closed" ) ) ,
176
- Ok ( n) => {
177
- if String :: from_utf8 ( buf[ ..n] . to_vec ( ) )
178
- . unwrap_or_default ( )
179
- . contains ( "OK" )
180
- {
181
- return Ok ( stream. into_std ( ) ?. into_raw_fd ( ) ) ;
182
- }
183
- Err ( anyhow ! ( "failed to connect" ) )
184
- }
185
- Err ( ref e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock => {
186
- Err ( anyhow ! ( "{}" , e) )
187
- }
188
- Err ( e) => Err ( anyhow ! ( "failed to read from hvsock: {}" , e) ) ,
189
- }
190
- }
191
- Err ( ref e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock => Err ( anyhow ! ( "{}" , e) ) ,
192
- Err ( e) => Err ( anyhow ! ( "failed to write CONNECT to hvsock: {}" , e) ) ,
175
+ stream
176
+ . write_all ( format ! ( "CONNECT {}\n " , port) . as_bytes ( ) )
177
+ . await
178
+ . map_err ( |e| anyhow ! ( "hvsock connected but failed to write CONNECT: {}" , e) ) ?;
179
+
180
+ let mut response = String :: new ( ) ;
181
+ BufReader :: new ( & mut stream)
182
+ . read_line ( & mut response)
183
+ . await
184
+ . map_err ( |e| anyhow ! ( "CONNECT sent but failed to get response: {}" , e) ) ?;
185
+ if response. starts_with ( "OK" ) {
186
+ Ok ( stream. into_std ( ) ?. into_raw_fd ( ) )
187
+ } else {
188
+ Err ( anyhow ! ( "CONNECT sent but response is not OK: {}" , response) . into ( ) )
193
189
}
194
- . map_err ( Error :: Other )
195
190
} ;
196
191
197
192
timeout ( Duration :: from_millis ( HVSOCK_RETRY_TIMEOUT_IN_MS ) , fut)
@@ -312,3 +307,16 @@ pub(crate) async fn client_sync_clock(client: &SandboxServiceClient, id: &str) {
312
307
}
313
308
} ) ;
314
309
}
310
+
311
+ #[ cfg( test) ]
312
+ mod tests {
313
+ use crate :: client:: new_ttrpc_client_with_timeout;
314
+
315
+ #[ tokio:: test]
316
+ async fn test_new_ttrpc_client_timeout ( ) {
317
+ // Expect new_ttrpc_client would return timeout error, instead of blocking.
318
+ assert ! ( new_ttrpc_client_with_timeout( "hvsock://fake.sock:1024" , 1 )
319
+ . await
320
+ . is_err( ) ) ;
321
+ }
322
+ }
0 commit comments