-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathworker.py
192 lines (158 loc) · 6.28 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from django.db import transaction
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from django.utils.module_loading import import_string
from django_dbq.models import Job
from time import sleep
import logging
import signal
logger = logging.getLogger(__name__)
DEFAULT_QUEUE_NAME = "default"
class Worker:
def __init__(self, name, rate_limit_in_seconds, shift_limit_in_seconds):
self.queue_name = name
self.rate_limit_in_seconds = rate_limit_in_seconds
self.shift_limit_in_seconds = shift_limit_in_seconds
self.shift_start = timezone.now()
self.alive = True
self.last_job_finished = None
self.current_job = None
self.init_signals()
def init_signals(self):
signal.signal(signal.SIGINT, self.shutdown)
# for Windows, which doesn't support the SIGQUIT signal
if hasattr(signal, "SIGQUIT"):
signal.signal(signal.SIGQUIT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
self.alive = False
if self.current_job:
self.current_job.state = Job.STATES.STOPPING
self.current_job.save(update_fields=["state"])
def run(self):
while self.alive and self._shift_availability():
self.process_job()
def process_job(self):
sleep(1)
if (
self.last_job_finished
and (timezone.now() - self.last_job_finished).total_seconds()
< self.rate_limit_in_seconds
):
return
self._process_job()
self.last_job_finished = timezone.now()
def _process_job(self):
with transaction.atomic():
job = Job.objects.get_ready_or_none(self.queue_name)
if not job:
return
logger.info(
'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s',
job.name,
self.queue_name,
job.pk,
job.state,
job.next_task,
)
job.state = Job.STATES.PROCESSING
job.save()
self.current_job = job
try:
task_function = import_string(job.next_task)
task_function(job)
job.update_next_task()
if not job.next_task:
job.state = Job.STATES.COMPLETE
else:
job.state = Job.STATES.READY
except Exception as exception:
logger.exception("Job id=%s failed", job.pk)
job.state = Job.STATES.FAILED
failure_hook_name = job.get_failure_hook_name()
if failure_hook_name:
logger.info(
"Running failure hook %s for job id=%s", failure_hook_name, job.pk
)
failure_hook_function = import_string(failure_hook_name)
failure_hook_function(job, exception)
else:
logger.info("No failure hook for job id=%s", job.pk)
logger.info(
'Updating job: name="%s" id=%s state=%s next_task=%s',
job.name,
job.pk,
job.state,
job.next_task or "none",
)
try:
job.save()
except:
logger.exception("Failed to save job: id=%s", job.pk)
raise
self.current_job = None
def _shift_availability(self):
"""
Setting a value for shift_limit_in_seconds enables the worker to be run via a CRON Job for a period of time,
whereby worker will seek further jobs if time remains in the shift. If the shift_limit_in_seconds is
exceeded once a job is started it will still run to completion, regardless of the time remaining.
Consequently, the duration of the CRON Interval should be greater than the anticipated duration of the
longest Job.
If shift_limit_in_seconds is not supplied, the default of 0 will be used and the worker will continue to run
until shutdown.
"""
if self.shift_limit_in_seconds <= 0:
return True
elif self.shift_limit_in_seconds > 0 and (timezone.now() - self.shift_start).total_seconds() < \
self.shift_limit_in_seconds:
return True
else:
return False
class Command(BaseCommand):
help = "Run a queue worker process"
def add_arguments(self, parser):
parser.add_argument("queue_name", nargs="?", default="default", type=str)
parser.add_argument(
"--rate_limit",
help="The rate limit in seconds. The default rate limit is 1 job per second.",
nargs="?",
default=1,
type=int,
)
parser.add_argument(
"--shift_limit",
help="The time limit in seconds within which the worker can process new jobs. The default rate "
"limit is 0 seconds, which disables this argument, allowing the worker to run indefinitely.",
nargs="?",
default=0,
type=int,
)
parser.add_argument(
"--dry-run",
action="store_true",
dest="dry_run",
default=False,
help="Don't actually start the worker. Used for testing.",
)
def handle(self, *args, **options):
if not args:
args = (DEFAULT_QUEUE_NAME,)
if len(args) != 1:
raise CommandError("Please supply a single queue job name")
queue_name = options["queue_name"]
rate_limit_in_seconds = options["rate_limit"]
shift_limit_in_seconds = options["shift_limit"]
if shift_limit_in_seconds:
self.stdout.write(
'Starting job worker for queue "%s" with rate limit of one job per %s second(s) and a shift constraint of %s seconds.'
% (queue_name, rate_limit_in_seconds, shift_limit_in_seconds)
)
else:
self.stdout.write(
'Starting job worker for queue "%s" with rate limit of one job per %s second(s).'
% (queue_name, rate_limit_in_seconds)
)
worker = Worker(queue_name, rate_limit_in_seconds, shift_limit_in_seconds)
if options["dry_run"]:
return
worker.run()