diff --git a/oorq/bin/requeue_failed.py b/oorq/bin/requeue_failed.py index c288706..fe58c8d 100755 --- a/oorq/bin/requeue_failed.py +++ b/oorq/bin/requeue_failed.py @@ -1,6 +1,9 @@ #!/usr/bin/env python +#Example +# python requeue_failed.py redis://localhost/0 --max-attempts 20 --interval 3500 + from __future__ import print_function -import sys +import argparse import times from redis import from_url from rq import use_connection, requeue_job, Queue @@ -11,36 +14,87 @@ MAX_ATTEMPTS = 5 PERMANENT_FAILED = 'permanent' -redis_conn = from_url(sys.argv[1]) -use_connection(redis_conn) - -all_queues = Queue().all() -pfq = FailedJobRegistry(PERMANENT_FAILED) -pq = Queue(name=PERMANENT_FAILED) - -for queue in all_queues: - fq = FailedJobRegistry(queue.name) - for job_id in fq.get_job_ids(): - job = Job.fetch(job_id) - if not job.meta.get('requeue', True): - continue - job.meta.setdefault('attempts', 0) - if job.meta['attempts'] > MAX_ATTEMPTS: - print("Job %s %s attempts. MAX ATTEMPTS %s limit exceeded on %s" % ( - job.id, job.meta['attempts'], MAX_ATTEMPTS, job.origin - )) - print(job.description) - print(job.exc_info) - print() - fq.remove(job) - pq.enqueue_job(job) - print("Moved to %s FailedJobRegistry" % PERMANENT_FAILED) - else: - ago = (times.now() - job.enqueued_at).seconds - if ago >= INTERVAL: - print("%s: attemps: %s enqueued: %ss ago on %s (Requeue)" % ( - job.id, job.meta['attempts'], ago, job.origin - )) - job.meta['attempts'] += 1 - job.save() - requeue_job(job.id, connection=redis_conn) + +def main(redis_conn, interval, max_attempts, permanent_failed): + use_connection(redis_conn) + + all_queues = Queue().all() + pfq = FailedJobRegistry(permanent_failed) + pq = Queue(name=permanent_failed) + print("Try to requeu jobs") + for queue in all_queues: + if queue.name == 'jobspool-autoworker': + continue + fq = FailedJobRegistry(queue.name) + for job_id in fq.get_job_ids(): + try: + job = Job.fetch(job_id) + except: + print("Job {} not exist anymore. We will delete from FailedJobRegistry".format(job_id)) + try: + key_registry = fq.key + redis_conn.zrem(key_registry,job_id) + except Exception as e: + print("We cannot delete job in FailedJobRegistry") + print(job_id) + print(e) + if not job.meta.get('requeue', True): + continue + job.meta.setdefault('attempts', 0) + if job.meta['attempts'] > max_attempts: + print("Job %s %s attempts. MAX ATTEMPTS %s limit exceeded on %s" % ( + job.id, job.meta['attempts'], max_attempts, job.origin + )) + print(job.description) + print(job.exc_info) + print() + fq.remove(job) + pq.enqueue_job(job) + print("Moved to %s FailedJobRegistry" % permanent_failed) + else: + ago = (times.now() - job.enqueued_at).seconds + if ago >= interval: + print("%s: attemps: %s enqueued: %ss ago on %s (Requeue)" % ( + job.id, job.meta['attempts'], ago, job.origin + )) + job.meta['attempts'] += 1 + job.save() + requeue_job(job.id, connection=redis_conn) + +if __name__ == '__main__': + + parser = argparse.ArgumentParser( + description="Requeue failed jobs" + ) + + parser.add_argument( + 'redis_conn', + type=str, + help="Connection address to Redis", + ) + parser.add_argument( + '--interval', + dest='interval', + default=INTERVAL, + type=int, + help="Interval before requeu (in seconds)", + ) + parser.add_argument( + '--max-attempts', + dest='max_attempts', + default=MAX_ATTEMPTS, + type=int, + help="Max attemps before move to permanent failed", + ) + parser.add_argument( + '--permanent', + dest='permanent', + default=PERMANENT_FAILED, + type=str, + help="Name of permanent failed queue", + ) + + args = parser.parse_args() + main(from_url(args.redis_conn), args.interval, args.max_attempts, args.permanent) + +# vim: et ts=4 sw=4