-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathutils.py
265 lines (230 loc) · 10.9 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import os
import sys
import pdb
import psutil
import logging
import argparse
import threading
from collections import Counter
from subprocess import call, PIPE
from ratelimiter import RateLimiter
from ._version import __version__
if sys.version_info[0] < 3:
from Queue import Queue
else:
from queue import Queue
RUNSTAT = " && echo [`date +'%F %T'`] SUCCESS || echo [`date +'%F %T'`] ERROR"
class QsubError(Exception):
pass
class myQueue(object):
def __init__(self, maxsize=0):
self._content = set()
self._queue = Queue(maxsize=maxsize)
self.sm = threading.Semaphore(maxsize)
self.lock = threading.Lock()
@property
def length(self):
return self._queue.qsize()
def put(self, v, **kwargs):
self._queue.put(v, **kwargs)
# self.sm.acquire()
if v not in self._content:
with self.lock:
self._content.add(v)
def get(self, v=None):
self._queue.get()
# self.sm.release()
if v is None:
with self.lock:
o = self._content.pop()
return o
else:
if v in self._content:
with self.lock:
self._content.remove(v)
return v
@property
def queue(self):
return self._content.copy()
def isEmpty(self):
return self._queue.empty()
def isFull(self):
return self._queue.full()
def Mylog(logfile=None, level="info", name=None):
logger = logging.getLogger(name)
if level.lower() == "info":
logger.setLevel(logging.INFO)
f = logging.Formatter(
'[%(levelname)s %(asctime)s] %(message)s')
elif level.lower() == "debug":
logger.setLevel(logging.DEBUG)
f = logging.Formatter(
'[%(levelname)s %(threadName)s %(asctime)s %(funcName)s(%(lineno)d)] %(message)s')
if logfile is None:
h = logging.StreamHandler(sys.stdout) # default: sys.stderr
else:
h = logging.FileHandler(logfile, mode='w')
h.setFormatter(f)
logger.addHandler(h)
return logger
def cleanAll(clear=False, qjobs=None, sumj=True):
if qjobs is None:
return
stillrunjob = qjobs.jobqueue.queue
if clear:
pid = os.getpid()
gid = os.getpgid(pid)
for jn in stillrunjob:
if jn.status in ["error", "success"]:
continue
jn.status = "killed"
qjobs.logger.info("job %s status killed", jn.name)
call_cmd(['qdel, "*_%d"' % os.getpid()])
else:
for jn in stillrunjob:
if jn.status in ["error", "success"]:
continue
jn.status += "-but-exit"
qjobs.logger.info("job %s status %s", jn.name, jn.status)
if sumj:
sumJobs(qjobs)
def sumJobs(qjobs):
run_jobs = qjobs.jobs
has_success_jobs = qjobs.has_success
error_jobs = [j for j in run_jobs if j.status == "error"]
success_jobs = [j for j in run_jobs if j.status == 'success']
logger = logging.getLogger()
status = "All tesks(total(%d), actual(%d), actual_success(%d), actual_error(%d)) in file (%s) finished" % (len(
run_jobs) + len(has_success_jobs), len(run_jobs), len(success_jobs), len(error_jobs), os.path.abspath(qjobs.jfile))
SUCCESS = True
if len(success_jobs) == len(run_jobs):
status += " successfully."
else:
status += ", but there are Unsuccessful tesks."
SUCCESS = False
logger.info(status)
qjobs.writestates(os.path.join(qjobs.logdir, "job.status.txt"))
logger.info(str(dict(Counter([j.status for j in run_jobs]))))
return SUCCESS
def style(string, mode='', fore='', back=''):
STYLE = {
'fore': {'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34, 'purple': 35, 'cyan': 36, 'white': 37},
'back': {'black': 40, 'red': 41, 'green': 42, 'yellow': 43, 'blue': 44, 'purple': 45, 'cyan': 46, 'white': 47},
'mode': {'mormal': 0, 'bold': 1, 'underline': 4, 'blink': 5, 'invert': 7, 'hide': 8},
'default': {'end': 0},
}
mode = '%s' % STYLE["mode"].get(mode, "")
fore = '%s' % STYLE['fore'].get(fore, "")
back = '%s' % STYLE['back'].get(back, "")
style = ';'.join([s for s in [mode, fore, back] if s])
style = '\033[%sm' % style if style else ''
end = '\033[%sm' % STYLE['default']['end'] if style else ''
return '%s%s%s' % (style, string, end)
def get_job_state(state):
s = state.lower() if state else state
if s == 'running':
return style(state, fore="cyan")
if s == 'finished':
return style(state, fore="green")
elif s == 'waiting':
return style(state, fore="white")
elif s == 'failed':
return style(state, fore="red")
elif s == 'stopped':
return style(state, fore="yellow")
else:
return style(state, fore="white")
def terminate_process(pid):
try:
pproc = psutil.Process(pid)
for cproc in pproc.children(recursive=True):
# cproc.terminate() # SIGTERM
cproc.kill() # SIGKILL
# pproc.terminate()
pproc.kill()
except:
pass
def call_cmd(cmd, verbose=False):
shell = True
if isinstance(cmd, list):
shell = False
if verbose:
print(cmd)
call(cmd, shell=shell, stdout=PIPE, stderr=PIPE)
else:
with open(os.devnull, "w") as fo:
call(cmd, shell=shell, stdout=fo, stderr=fo)
def runsgeArgparser():
parser = argparse.ArgumentParser(
description="For multi-run your shell scripts localhost, qsub or BatchCompute.")
parser.add_argument("-wd", "--workdir", type=str, help="work dir, default: %s" %
os.path.abspath(os.getcwd()), default=os.path.abspath(os.getcwd()), metavar="<workdir>")
parser.add_argument("-N", "--jobname", type=str,
help="job name", metavar="<jobname>")
parser.add_argument("-lg", "--logdir", type=str,
help='the output log dir, default: "%s/runjob_*_log_dir"' % os.getcwd(), metavar="<logdir>")
parser.add_argument("-n", "--num", type=int,
help="the max job number runing at the same time. default: all in your job file, max 1000", metavar="<int>")
parser.add_argument("-s", "--startline", type=int,
help="which line number(1-base) be used for the first job tesk. default: 1", metavar="<int>", default=1)
parser.add_argument("-e", "--endline", type=int,
help="which line number (include) be used for the last job tesk. default: all in your job file", metavar="<int>")
parser.add_argument("-g", "--groups", type=int, default=1,
help="groups number of lines to a new jobs", metavar="<int>")
parser.add_argument('-d', '--debug', action='store_true',
help='log debug info', default=False)
parser.add_argument("-l", "--log", type=str,
help='append log info to file, sys.stdout by default', metavar="<file>")
parser.add_argument('-r', '--resub', help="rebsub you job when error, 0 or minus means do not re-submit, 0 by default",
type=int, default=0, metavar="<int>")
parser.add_argument('--init', help="initial command before all task if set, will be running in localhost",
type=str, metavar="<cmd>")
parser.add_argument('--call-back', help="callback command if set, will be running in localhost",
type=str, metavar="<cmd>")
parser.add_argument('--mode', type=str, default="sge", choices=[
"sge", "local", "localhost", "batchcompute"], help="the mode to submit your jobs, 'sge' by default")
parser.add_argument('-ivs', '--resubivs', help="rebsub interval seconds, 2 by default",
type=int, default=2, metavar="<int>")
parser.add_argument('-ini', '--ini',
help="input configfile for configurations search.", metavar="<configfile>")
parser.add_argument("-config", '--config', action='store_true',
help="show configurations and exit.", default=False)
parser.add_argument("-f", "--force", default=False, action="store_true",
help="force to submit jobs ingore already successed jobs, skip by default")
parser.add_argument("--local", default=False, action="store_true",
help="submit your jobs in localhost, same as '--mode local'")
parser.add_argument("--strict", action="store_true", default=False,
help="use strict to run. Means if any errors occur, clean all jobs and exit programe. off by default")
parser.add_argument('-v', '--version',
action='version', version="v" + __version__)
sge = parser.add_argument_group("sge arguments")
sge.add_argument("-q", "--queue", type=str, help="the queue your job running, multi queue can be sepreated by whitespace, all access queue by default",
nargs="*", metavar="<queue>")
sge.add_argument("-m", "--memory", type=int,
help="the memory used per command (GB), default: 1", default=1, metavar="<int>")
sge.add_argument("-c", "--cpu", type=int,
help="the cpu numbers you job used, default: 1", default=1, metavar="<int>")
batchcmp = parser.add_argument_group("batchcompute arguments")
batchcmp.add_argument("-om", "--out-maping", type=str,
help='the oss output directory if your mode is "batchcompute", all output file will be mapping to you OSS://BUCKET-NAME. if not set, any output will be reserved', metavar="<dir>")
batchcmp.add_argument('--access-key-id', type=str,
help="AccessKeyID while access oss", metavar="<str>")
batchcmp.add_argument('--access-key-secret', type=str,
help="AccessKeySecret while access oss", metavar="<str>")
batchcmp.add_argument('--regin', type=str, default="BEIJING", choices=['BEIJING', 'HANGZHOU', 'HUHEHAOTE', 'SHANGHAI',
'ZHANGJIAKOU', 'CHENGDU', 'HONGKONG', 'QINGDAO', 'SHENZHEN'], help="batch compute regin, BEIJING by default")
parser.add_argument("-j", "--jobfile", type=str,
help="the input jobfile", metavar="<jobfile>")
return parser
def shellJobArgparser(arglist):
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("-q", "--queue", type=str, nargs="*")
parser.add_argument("-m", "--memory", type=int)
parser.add_argument("-c", "--cpu", type=int)
parser.add_argument("-g", "--groups", type=int)
parser.add_argument("-n", "--jobname", type=str)
parser.add_argument("-om", "--out-maping", type=str)
parser.add_argument("-wd", "--workdir", type=str)
parser.add_argument('--mode', type=str)
parser.add_argument("--local", default=False, action="store_true")
return parser.parse_known_args(arglist)[0]