@@ -15,11 +15,7 @@ limitations under the License.
15
15
*/
16
16
17
17
use std:: {
18
- io:: { BufRead , BufReader , Write } ,
19
- os:: unix:: {
20
- io:: { IntoRawFd , RawFd } ,
21
- net:: UnixStream ,
22
- } ,
18
+ os:: unix:: io:: { IntoRawFd , RawFd } ,
23
19
time:: Duration ,
24
20
} ;
25
21
@@ -34,23 +30,28 @@ use nix::{
34
30
time:: { clock_gettime, ClockId } ,
35
31
unistd:: close,
36
32
} ;
37
- use tokio:: time:: timeout;
33
+ use tokio:: {
34
+ io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ,
35
+ net:: UnixStream ,
36
+ time:: timeout,
37
+ } ;
38
38
use ttrpc:: { context:: with_timeout, r#async:: Client } ;
39
39
use vmm_common:: api:: { sandbox:: * , sandbox_ttrpc:: SandboxServiceClient } ;
40
40
41
41
use crate :: network:: { NetworkInterface , Route } ;
42
42
43
+ const HVSOCK_RETRY_TIMEOUT_IN_MS : u64 = 10 ;
44
+ // TODO: reduce to 10s
45
+ const NEW_TTRPC_CLIENT_TIMEOUT : u64 = 45 ;
43
46
const TIME_SYNC_PERIOD : u64 = 60 ;
44
47
const TIME_DIFF_TOLERANCE_IN_MS : u64 = 10 ;
45
48
46
49
pub ( crate ) async fn new_sandbox_client ( address : & str ) -> Result < SandboxServiceClient > {
47
- let client = new_ttrpc_client ( address) . await ?;
50
+ let client = new_ttrpc_client_with_timeout ( address, NEW_TTRPC_CLIENT_TIMEOUT ) . await ?;
48
51
Ok ( SandboxServiceClient :: new ( client) )
49
52
}
50
53
51
- async fn new_ttrpc_client ( address : & str ) -> Result < Client > {
52
- let ctx_timeout = 10 ;
53
-
54
+ async fn new_ttrpc_client_with_timeout ( address : & str , t : u64 ) -> Result < Client > {
54
55
let mut last_err = Error :: Other ( anyhow ! ( "" ) ) ;
55
56
56
57
let fut = async {
@@ -62,16 +63,17 @@ async fn new_ttrpc_client(address: &str) -> Result<Client> {
62
63
}
63
64
Err ( e) => last_err = e,
64
65
}
66
+ // In case that the address doesn't exist, the executed function in this loop are all
67
+ // sync, making the first time of future poll in timeout hang forever. As a result, the
68
+ // timeout will hang too. To solve this, add a async function in this loop or call
69
+ // `tokio::task::yield_now()` to give up current cpu time slice.
70
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
65
71
}
66
72
} ;
67
73
68
- let client = timeout ( Duration :: from_secs ( ctx_timeout ) , fut)
74
+ let client = timeout ( Duration :: from_secs ( t ) , fut)
69
75
. await
70
- . map_err ( |_| {
71
- let e = anyhow ! ( "{}s timeout connecting socket: {}" , ctx_timeout, last_err) ;
72
- error ! ( "{}" , e) ;
73
- e
74
- } ) ?;
76
+ . map_err ( |_| anyhow ! ( "{}s timeout connecting socket: {}" , t, last_err) ) ?;
75
77
Ok ( client)
76
78
}
77
79
@@ -165,28 +167,31 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
165
167
if v. len ( ) < 2 {
166
168
return Err ( anyhow ! ( "hvsock address {} should not less than 2" , address) . into ( ) ) ;
167
169
}
168
- ( v[ 0 ] . to_string ( ) , v[ 1 ] . to_string ( ) )
170
+ ( v[ 0 ] , v[ 1 ] )
169
171
} ;
170
172
171
- tokio:: task:: spawn_blocking ( move || {
172
- let mut stream =
173
- UnixStream :: connect ( & addr) . map_err ( |e| anyhow ! ( "failed to connect hvsock: {}" , e) ) ?;
173
+ let fut = async {
174
+ let mut stream = UnixStream :: connect ( addr) . await ?;
174
175
stream
175
176
. write_all ( format ! ( "CONNECT {}\n " , port) . as_bytes ( ) )
177
+ . await
176
178
. map_err ( |e| anyhow ! ( "hvsock connected but failed to write CONNECT: {}" , e) ) ?;
177
179
178
180
let mut response = String :: new ( ) ;
179
- BufReader :: new ( & stream)
181
+ BufReader :: new ( & mut stream)
180
182
. read_line ( & mut response)
183
+ . await
181
184
. map_err ( |e| anyhow ! ( "CONNECT sent but failed to get response: {}" , e) ) ?;
182
185
if response. starts_with ( "OK" ) {
183
- Ok ( stream. into_raw_fd ( ) )
186
+ Ok ( stream. into_std ( ) ? . into_raw_fd ( ) )
184
187
} else {
185
188
Err ( anyhow ! ( "CONNECT sent but response is not OK: {}" , response) . into ( ) )
186
189
}
187
- } )
188
- . await
189
- . map_err ( |e| anyhow ! ( "failed to spawn blocking task: {}" , e) ) ?
190
+ } ;
191
+
192
+ timeout ( Duration :: from_millis ( HVSOCK_RETRY_TIMEOUT_IN_MS ) , fut)
193
+ . await
194
+ . map_err ( |_| anyhow ! ( "hvsock retry {}ms timeout" , HVSOCK_RETRY_TIMEOUT_IN_MS ) ) ?
190
195
}
191
196
192
197
pub fn unix_sock ( r#abstract : bool , socket_path : & str ) -> Result < UnixAddr > {
@@ -302,3 +307,16 @@ pub(crate) async fn client_sync_clock(client: &SandboxServiceClient, id: &str) {
302
307
}
303
308
} ) ;
304
309
}
310
+
311
+ #[ cfg( test) ]
312
+ mod tests {
313
+ use crate :: client:: new_ttrpc_client_with_timeout;
314
+
315
+ #[ tokio:: test]
316
+ async fn test_new_ttrpc_client_timeout ( ) {
317
+ // Expect new_ttrpc_client would return timeout error, instead of blocking.
318
+ assert ! ( new_ttrpc_client_with_timeout( "hvsock://fake.sock:1024" , 1 )
319
+ . await
320
+ . is_err( ) ) ;
321
+ }
322
+ }
0 commit comments