Skip to content

Commit 2ab71c7

Browse files
committed
update jobcheck rate limiter
1 parent cd9ebde commit 2ab71c7

File tree

2 files changed

+38
-36
lines changed

2 files changed

+38
-36
lines changed

src/sge_run.py

+36-35
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def __init__(self, config=None):
7878
self.conf.jobqueue = self.jobqueue
7979
self.conf.logger = self.logger
8080
self.conf.cloudjob = self.cloudjob
81-
self.ncall, self.period = config.rate or 3, 1
81+
self.rate = Fraction(config.rate or 3).limit_denominator()
8282
self.sge_jobid = {}
8383

8484
def depency_jobs(self):
@@ -288,45 +288,46 @@ def jobstatus(self, job):
288288
datetime.today().strftime("%F %X"), job.status.upper()))
289289
return status
290290

291-
def set_rate(self, ncall=3, period=1):
292-
if ncall and period:
293-
self.ncall = ncall
294-
self.period = period
291+
def set_rate(self, rate=3):
292+
if rate:
293+
self.rate = Fraction(rate).limit_denominator()
295294

296295
def jobcheck(self):
297296
if self.mode == "batchcompute":
298-
self.set_rate(1, 1)
299-
rate_limiter = RateLimiter(max_calls=self.ncall, period=self.period)
297+
self.set_rate(1)
298+
rate_limiter = RateLimiter(
299+
max_calls=self.rate.numerator, period=self.rate.denominator)
300300
while True:
301-
with rate_limiter:
302-
for jb in self.jobqueue.queue:
303-
with rate_limiter:
304-
try:
305-
js = self.jobstatus(jb)
306-
except:
307-
continue
308-
if js == "success":
309-
self.deletejob(jb)
301+
for jb in self.jobqueue.queue:
302+
with rate_limiter:
303+
try:
304+
js = self.jobstatus(jb)
305+
except:
306+
self.logger.error(
307+
"check job status error: %s", jb.name)
308+
continue
309+
if js == "success":
310+
self.deletejob(jb)
311+
self.jobqueue.get(jb)
312+
self.jobsgraph.delete_node_if_exists(jb.jobname)
313+
elif js == "error":
314+
self.deletejob(jb)
315+
if jb.subtimes >= self.times + 1:
316+
if self.strict:
317+
self.throw("Error jobs return (submit %d times), %s" % (
318+
jb.subtimes, jb.logfile))
310319
self.jobqueue.get(jb)
311-
self.jobsgraph.delete_node_if_exists(jb.jobname)
312-
elif js == "error":
313-
self.deletejob(jb)
314-
if jb.subtimes >= self.times + 1:
315-
if self.strict:
316-
self.throw("Error jobs return (submit %d times), %s" % (
317-
jb.subtimes, jb.logfile))
318-
self.jobqueue.get(jb)
319-
self.jobsgraph.delete_node_if_exists(
320-
jb.jobname)
321-
else:
322-
self.jobqueue.get(jb)
323-
self.submit(jb)
324-
elif js == "exit":
325-
self.deletejob(jb)
320+
self.jobsgraph.delete_node_if_exists(
321+
jb.jobname)
322+
else:
326323
self.jobqueue.get(jb)
327-
self.jobsgraph.delete_node_if_exists(jb.jobname)
328-
if self.strict:
329-
self.throw("Error job: %s, exit" % jb.jobname)
324+
self.submit(jb)
325+
elif js == "exit":
326+
self.deletejob(jb)
327+
self.jobqueue.get(jb)
328+
self.jobsgraph.delete_node_if_exists(jb.jobname)
329+
if self.strict:
330+
self.throw("Error job: %s, exit" % jb.jobname)
330331

331332
def qdel(self, name="", jobname=""):
332333
if name:
@@ -471,7 +472,7 @@ def run(self, sec=2, times=0, resubivs=2):
471472
break
472473
for j in sorted(subjobs):
473474
jb = self.totaljobdict[j]
474-
if jb in self.jobqueue.queue:
475+
if jb in self.jobqueue._queue:
475476
continue
476477
self.submit(jb)
477478
time.sleep(sec)

src/utils.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from threading import Thread
1515
from datetime import datetime
16+
from fractions import Fraction
1617
from collections import Counter
1718
from functools import total_ordering
1819
from subprocess import check_output, call, Popen, PIPE
@@ -216,7 +217,7 @@ def common_parser():
216217
common.add_argument('-ivs', '--resubivs', help="rebsub interval seconds, default: 2",
217218
type=int, default=2, metavar="<int>")
218219
common.add_argument('--rate', help="rate limite for job status checking per second, default: 3",
219-
type=int, default=3, metavar="<int>")
220+
type=float, default=3, metavar="<float>")
220221
common.add_argument("-f", "--force", default=False, action="store_true",
221222
help="force to submit jobs even if already successed")
222223
common.add_argument("--local", default=False, action="store_true",

0 commit comments

Comments
 (0)