Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paramatrize requeue_failed.py script to change inteval and max_attempts #96

Open
wants to merge 1 commit into
base: api_v5
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 88 additions & 34 deletions oorq/bin/requeue_failed.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python
#Example
# python requeue_failed.py redis://localhost/0 --max-attempts 20 --interval 3500

from __future__ import print_function
import sys
import argparse
import times
from redis import from_url
from rq import use_connection, requeue_job, Queue
Expand All @@ -11,36 +14,87 @@
MAX_ATTEMPTS = 5
PERMANENT_FAILED = 'permanent'

redis_conn = from_url(sys.argv[1])
use_connection(redis_conn)

all_queues = Queue().all()
pfq = FailedJobRegistry(PERMANENT_FAILED)
pq = Queue(name=PERMANENT_FAILED)

for queue in all_queues:
fq = FailedJobRegistry(queue.name)
for job_id in fq.get_job_ids():
job = Job.fetch(job_id)
if not job.meta.get('requeue', True):
continue
job.meta.setdefault('attempts', 0)
if job.meta['attempts'] > MAX_ATTEMPTS:
print("Job %s %s attempts. MAX ATTEMPTS %s limit exceeded on %s" % (
job.id, job.meta['attempts'], MAX_ATTEMPTS, job.origin
))
print(job.description)
print(job.exc_info)
print()
fq.remove(job)
pq.enqueue_job(job)
print("Moved to %s FailedJobRegistry" % PERMANENT_FAILED)
else:
ago = (times.now() - job.enqueued_at).seconds
if ago >= INTERVAL:
print("%s: attemps: %s enqueued: %ss ago on %s (Requeue)" % (
job.id, job.meta['attempts'], ago, job.origin
))
job.meta['attempts'] += 1
job.save()
requeue_job(job.id, connection=redis_conn)

def main(redis_conn, interval, max_attempts, permanent_failed):
use_connection(redis_conn)

all_queues = Queue().all()
pfq = FailedJobRegistry(permanent_failed)
pq = Queue(name=permanent_failed)
print("Try to requeu jobs")
for queue in all_queues:
if queue.name == 'jobspool-autoworker':
continue
fq = FailedJobRegistry(queue.name)
for job_id in fq.get_job_ids():
try:
job = Job.fetch(job_id)
except:
print("Job {} not exist anymore. We will delete from FailedJobRegistry".format(job_id))
try:
key_registry = fq.key
redis_conn.zrem(key_registry,job_id)
except Exception as e:
print("We cannot delete job in FailedJobRegistry")
print(job_id)
print(e)
if not job.meta.get('requeue', True):
continue
job.meta.setdefault('attempts', 0)
if job.meta['attempts'] > max_attempts:
print("Job %s %s attempts. MAX ATTEMPTS %s limit exceeded on %s" % (
job.id, job.meta['attempts'], max_attempts, job.origin
))
print(job.description)
print(job.exc_info)
print()
fq.remove(job)
pq.enqueue_job(job)
print("Moved to %s FailedJobRegistry" % permanent_failed)
else:
ago = (times.now() - job.enqueued_at).seconds
if ago >= interval:
print("%s: attemps: %s enqueued: %ss ago on %s (Requeue)" % (
job.id, job.meta['attempts'], ago, job.origin
))
job.meta['attempts'] += 1
job.save()
requeue_job(job.id, connection=redis_conn)

if __name__ == '__main__':

parser = argparse.ArgumentParser(
description="Requeue failed jobs"
)

parser.add_argument(
'redis_conn',
type=str,
help="Connection address to Redis",
)
parser.add_argument(
'--interval',
dest='interval',
default=INTERVAL,
type=int,
help="Interval before requeu (in seconds)",
)
parser.add_argument(
'--max-attempts',
dest='max_attempts',
default=MAX_ATTEMPTS,
type=int,
help="Max attemps before move to permanent failed",
)
parser.add_argument(
'--permanent',
dest='permanent',
default=PERMANENT_FAILED,
type=str,
help="Name of permanent failed queue",
)

args = parser.parse_args()
main(from_url(args.redis_conn), args.interval, args.max_attempts, args.permanent)

# vim: et ts=4 sw=4