@@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
14
14
limitations under the License.
15
15
*/
16
16
17
- use std:: { os:: unix:: io:: RawFd , process:: Stdio } ;
17
+ use std:: { os:: unix:: io:: RawFd , process:: Stdio , time :: Duration } ;
18
18
19
19
use anyhow:: anyhow;
20
20
use async_trait:: async_trait;
21
21
use containerd_sandbox:: error:: { Error , Result } ;
22
- use log:: { debug, error, info} ;
22
+ use log:: { debug, error, info, warn} ;
23
+ use nix:: { errno:: Errno :: ESRCH , sys:: signal, unistd:: Pid } ;
23
24
use serde:: { Deserialize , Serialize } ;
24
25
use time:: OffsetDateTime ;
25
26
use tokio:: {
@@ -41,10 +42,10 @@ use crate::{
41
42
} ,
42
43
} ,
43
44
device:: { BusType , DeviceInfo } ,
44
- impl_recoverable , load_config,
45
+ load_config,
45
46
param:: ToCmdLineParams ,
46
47
sandbox:: KuasarSandboxer ,
47
- utils:: { read_std, set_cmd_fd, set_cmd_netns, wait_pid, write_file_atomic} ,
48
+ utils:: { read_std, set_cmd_fd, set_cmd_netns, wait_channel , wait_pid, write_file_atomic} ,
48
49
vm:: { Pids , VcpuThreads , VM } ,
49
50
} ;
50
51
@@ -145,13 +146,25 @@ impl CloudHypervisorVM {
145
146
self . fds . push ( fd) ;
146
147
self . fds . len ( ) - 1 + 3
147
148
}
149
+
150
+ async fn wait_stop ( & mut self , t : Duration ) -> Result < ( ) > {
151
+ if let Some ( rx) = self . wait_channel ( ) . await {
152
+ let ( _, ts) = * rx. borrow ( ) ;
153
+ if ts == 0 {
154
+ wait_channel ( t, rx) . await ?;
155
+ }
156
+ }
157
+ Ok ( ( ) )
158
+ }
148
159
}
149
160
150
161
#[ async_trait]
151
162
impl VM for CloudHypervisorVM {
152
163
async fn start ( & mut self ) -> Result < u32 > {
153
164
create_dir_all ( & self . base_dir ) . await ?;
154
165
let virtiofsd_pid = self . start_virtiofsd ( ) . await ?;
166
+ // TODO: add child virtiofsd process
167
+ self . pids . affiliated_pids . push ( virtiofsd_pid) ;
155
168
let mut params = self . config . to_cmdline_params ( "--" ) ;
156
169
for d in self . devices . iter ( ) {
157
170
params. extend ( d. to_cmdline_params ( "--" ) ) ;
@@ -174,31 +187,57 @@ impl VM for CloudHypervisorVM {
174
187
. spawn ( )
175
188
. map_err ( |e| anyhow ! ( "failed to spawn cloud hypervisor command: {}" , e) ) ?;
176
189
let pid = child. id ( ) ;
190
+ self . pids . vmm_pid = pid;
177
191
let pid_file = format ! ( "{}/pid" , self . base_dir) ;
178
- let ( tx, rx) = tokio:: sync:: watch:: channel ( ( 0u32 , 0i128 ) ) ;
192
+ let ( tx, rx) = channel ( ( 0u32 , 0i128 ) ) ;
193
+ self . wait_chan = Some ( rx) ;
179
194
spawn_wait (
180
195
child,
181
196
format ! ( "cloud-hypervisor {}" , self . id) ,
182
197
Some ( pid_file) ,
183
198
Some ( tx) ,
184
199
) ;
185
- self . client = Some ( self . create_client ( ) . await ?) ;
186
- self . wait_chan = Some ( rx) ;
187
200
188
- // update vmm related pids
189
- self . pids . vmm_pid = pid;
190
- self . pids . affiliated_pids . push ( virtiofsd_pid) ;
191
- // TODO: add child virtiofsd process
201
+ match self . create_client ( ) . await {
202
+ Ok ( client) => self . client = Some ( client) ,
203
+ Err ( e) => {
204
+ if let Err ( re) = self . stop ( true ) . await {
205
+ warn ! ( "roll back in create clh api client: {}" , re) ;
206
+ return Err ( e) ;
207
+ }
208
+ return Err ( e) ;
209
+ }
210
+ } ;
192
211
Ok ( pid. unwrap_or_default ( ) )
193
212
}
194
213
195
214
async fn stop ( & mut self , force : bool ) -> Result < ( ) > {
196
- let pid = self . pid ( ) ?;
197
- if pid == 0 {
198
- return Ok ( ( ) ) ;
215
+ let signal = if force {
216
+ signal:: SIGKILL
217
+ } else {
218
+ signal:: SIGTERM
219
+ } ;
220
+
221
+ let pids = self . pids ( ) ;
222
+ if let Some ( vmm_pid) = pids. vmm_pid {
223
+ if vmm_pid > 0 {
224
+ // TODO: Consider pid reused
225
+ match signal:: kill ( Pid :: from_raw ( vmm_pid as i32 ) , signal) {
226
+ Err ( e) => {
227
+ if e != ESRCH {
228
+ return Err ( anyhow ! ( "kill vmm process {}: {}" , vmm_pid, e) . into ( ) ) ;
229
+ }
230
+ }
231
+ Ok ( _) => self . wait_stop ( Duration :: from_secs ( 10 ) ) . await ?,
232
+ }
233
+ }
234
+ }
235
+ for affiliated_pid in pids. affiliated_pids {
236
+ if affiliated_pid > 0 {
237
+ // affiliated process may exits automatically, so it's ok not handle error
238
+ signal:: kill ( Pid :: from_raw ( affiliated_pid as i32 ) , signal) . unwrap_or_default ( ) ;
239
+ }
199
240
}
200
- let signal = if force { 9 } else { 15 } ;
201
- unsafe { nix:: libc:: kill ( pid as i32 , signal) } ;
202
241
203
242
Ok ( ( ) )
204
243
}
@@ -288,7 +327,20 @@ impl VM for CloudHypervisorVM {
288
327
}
289
328
}
290
329
291
- impl_recoverable ! ( CloudHypervisorVM ) ;
330
+ #[ async_trait]
331
+ impl crate :: vm:: Recoverable for CloudHypervisorVM {
332
+ async fn recover ( & mut self ) -> Result < ( ) > {
333
+ self . client = Some ( self . create_client ( ) . await ?) ;
334
+ let pid = self . pid ( ) ?;
335
+ let ( tx, rx) = channel ( ( 0u32 , 0i128 ) ) ;
336
+ tokio:: spawn ( async move {
337
+ let wait_result = wait_pid ( pid as i32 ) . await ;
338
+ tx. send ( wait_result) . unwrap_or_default ( ) ;
339
+ } ) ;
340
+ self . wait_chan = Some ( rx) ;
341
+ Ok ( ( ) )
342
+ }
343
+ }
292
344
293
345
macro_rules! read_stdio {
294
346
( $stdio: expr, $cmd_name: ident) => {
0 commit comments