@@ -84,6 +84,7 @@ def _init(self):
84
84
self .reseted = False
85
85
self .localprocess = {}
86
86
self .cloudjob = {}
87
+ self .sge_jobid = {}
87
88
self .jobsgraph = dag .DAG ()
88
89
self .has_success = []
89
90
self .__add_depency_for_wait ()
@@ -97,7 +98,6 @@ def _init(self):
97
98
self .conf .max_check or 3 ).limit_denominator ()
98
99
self .sub_rate = Fraction (
99
100
self .conf .max_submit or 30 ).limit_denominator ()
100
- self .sge_jobid = {}
101
101
self .maxjob = int (self .maxjob or len (self .jobs ))
102
102
self .jobqueue = JobQueue (maxsize = min (max (self .maxjob , 1 ), 1000 ))
103
103
self .init_time_stamp = now ()
@@ -232,11 +232,15 @@ def add_callback(self, callback=""):
232
232
def log_status (self , job ):
233
233
name = job .jobname
234
234
if name in self .cloudjob :
235
- name = self .cloudjob [name ]
235
+ name += " (task-id: {})" .format (self .cloudjob [name ])
236
+ elif name in self .sge_jobid :
237
+ name += " (job-id: {})" .format (self .sge_jobid [name ])
238
+ elif name in self .localprocess :
239
+ name += " (pid: {})" .format (self .localprocess [name ].pid )
236
240
if job .is_fail :
237
241
level = "error"
238
242
elif job .status == "resubmit" :
239
- level = "warn "
243
+ level = "warning "
240
244
else :
241
245
level = "info"
242
246
if not job .is_wait :
@@ -470,7 +474,6 @@ def submit(self, job):
470
474
logcmd .write (style ("\n -------- retry --------\n " ,
471
475
fore = "red" , mode = "bold" ) + job .rawstring + "\n " )
472
476
job .set_status ("resubmit" )
473
- self .log_status (job )
474
477
logcmd .write ("[%s] " % datetime .today ().strftime ("%F %X" ))
475
478
logcmd .flush ()
476
479
if job .host is not None and job .host in ["localhost" , "local" ]:
@@ -490,7 +493,7 @@ def submit(self, job):
490
493
self .localprocess [job .name ] = p
491
494
elif job .host == "sge" :
492
495
job .raw2cmd (job .subtimes and abs (self .retry_ivs ) or 0 )
493
- call_cmd ([ " touch" , job .stat_file + ".submit" ] )
496
+ touch ( job .stat_file + ".submit" )
494
497
jobcpu = job .cpu or self .cpu
495
498
jobmem = job .mem or self .mem
496
499
job .update_queue (self .queue )
@@ -517,6 +520,7 @@ def submit(self, job):
517
520
task .name , task .id , job .subtimes + 1 )
518
521
logcmd .write (info )
519
522
self .cloudjob [task .name ] = task .id
523
+ self .log_status (job )
520
524
self .logger .debug ("%s job submit %s times" , job .name , job .subtimes )
521
525
job .subtimes += 1
522
526
self .submited = True
0 commit comments