5
5
6
6
import croniter
7
7
from django .apps import apps
8
+ from django .conf import settings as django_settings
8
9
from django .contrib import admin
9
10
from django .contrib .contenttypes .fields import GenericRelation
10
11
from django .core .exceptions import ValidationError
12
+ from django .core .mail import mail_admins
11
13
from django .db import models
12
14
from django .templatetags .tz import utc
13
15
from django .urls import reverse
24
26
SCHEDULER_INTERVAL = settings .SCHEDULER_CONFIG ['SCHEDULER_INTERVAL' ]
25
27
26
28
27
- def callback_save_job (job , connection , result , * args , ** kwargs ):
29
+ def failure_callback (job , connection , result , * args , ** kwargs ):
30
+ model_name = job .meta .get ('task_type' , None )
31
+ scheduled_task_id = job .meta .get ('scheduled_task_id' , None )
32
+ if model_name is None or scheduled_task_id :
33
+ return
34
+ model = apps .get_model (app_label = 'scheduler' , model_name = model_name )
35
+ task = model .objects .filter (id = scheduled_task_id ).first ()
36
+ mail_admins (f'Task { task .id } /{ task .name } has failed' ,
37
+ 'See django-admin for logs' , )
38
+ pass
39
+
40
+
41
+ def success_callback (job , connection , result , * args , ** kwargs ):
28
42
model_name = job .meta .get ('task_type' , None )
29
43
if model_name is None :
30
44
return
31
45
model = apps .get_model (app_label = 'scheduler' , model_name = model_name )
32
46
task = model .objects .filter (job_id = job .id ).first ()
33
- if task is not None :
34
- task .force_schedule ()
47
+ if task is None :
48
+ return
49
+ task .schedule ()
35
50
36
51
37
52
class BaseTask (models .Model ):
@@ -107,12 +122,12 @@ def function_string(self) -> str:
107
122
return self .callable + f"({ ', ' .join (args_list + kwargs_list )} )"
108
123
109
124
def parse_args (self ):
110
- """Parse args for running job"""
125
+ """Parse args for running the job"""
111
126
args = self .callable_args .all ()
112
127
return [arg .value () for arg in args ]
113
128
114
129
def parse_kwargs (self ):
115
- """Parse kwargs for running job"""
130
+ """Parse kwargs for running the job"""
116
131
kwargs = self .callable_kwargs .all ()
117
132
return dict ([kwarg .value () for kwarg in kwargs ])
118
133
@@ -122,12 +137,12 @@ def _next_job_id(self):
122
137
return f'{ self .queue } :{ name } :{ addition } '
123
138
124
139
def _enqueue_args (self ) -> Dict :
125
- """args for DjangoQueue.enqueue.
140
+ """Args for DjangoQueue.enqueue.
126
141
Set all arguments for DjangoQueue.enqueue/enqueue_at.
127
142
Particularly:
128
143
- set job timeout and ttl
129
- - ensure a callback to reschedule job next iteration.
130
- - set job-id to proper format
144
+ - ensure a callback to reschedule the job next iteration.
145
+ - Set job-id to proper format
131
146
- set job meta
132
147
"""
133
148
res = dict (
@@ -136,8 +151,8 @@ def _enqueue_args(self) -> Dict:
136
151
task_type = self .TASK_TYPE ,
137
152
scheduled_task_id = self .id ,
138
153
),
139
- on_success = callback_save_job ,
140
- on_failure = callback_save_job ,
154
+ on_success = success_callback ,
155
+ on_failure = failure_callback ,
141
156
job_id = self ._next_job_id (),
142
157
)
143
158
if self .at_front :
@@ -150,37 +165,31 @@ def _enqueue_args(self) -> Dict:
150
165
151
166
@property
152
167
def rqueue (self ) -> DjangoQueue :
153
- """Returns django-queue for job
154
- """
168
+ """Returns redis-queue for job"""
155
169
return get_queue (self .queue )
156
170
157
171
def ready_for_schedule (self ) -> bool :
158
- """Is task ready to be scheduled?
172
+ """Is the task ready to be scheduled?
159
173
160
- If task is already scheduled or disabled, then it is not
174
+ If the task is already scheduled or disabled, then it is not
161
175
ready to be scheduled.
162
176
163
- :returns: True if task is ready to be scheduled.
177
+ :returns: True if the task is ready to be scheduled.
164
178
"""
165
179
if self .is_scheduled ():
166
- logger .debug (f'Job { self .name } already scheduled' )
180
+ logger .debug (f'Task { self .name } already scheduled' )
167
181
return False
168
182
if not self .enabled :
169
- logger .debug (f'Job { str (self )} disabled, enable job before scheduling' )
183
+ logger .debug (f'Task { str (self )} disabled, enable task before scheduling' )
170
184
return False
171
185
return True
172
186
173
187
def schedule (self ) -> bool :
174
- """Schedule job to run.
175
- :returns: True if job was scheduled, False otherwise.
188
+ """Schedule the next execution for the task to run.
189
+ :returns: True if a job was scheduled, False otherwise.
176
190
"""
177
191
if not self .ready_for_schedule ():
178
192
return False
179
- self .force_schedule ()
180
- return True
181
-
182
- def force_schedule (self ):
183
- """Schedule task regardless of its current status"""
184
193
schedule_time = self ._schedule_time ()
185
194
kwargs = self ._enqueue_args ()
186
195
job = self .rqueue .enqueue_at (
@@ -190,6 +199,7 @@ def force_schedule(self):
190
199
** kwargs , )
191
200
self .job_id = job .id
192
201
super (BaseTask , self ).save ()
202
+ return True
193
203
194
204
def enqueue_to_run (self ) -> bool :
195
205
"""Enqueue job to run now."""
@@ -218,7 +228,7 @@ def unschedule(self) -> bool:
218
228
return True
219
229
220
230
def _schedule_time (self ):
221
- return utc (self .scheduled_time )
231
+ return utc (self .scheduled_time ) if django_settings . USE_TZ else self . scheduled_time
222
232
223
233
def to_dict (self ) -> Dict :
224
234
"""Export model to dictionary, so it can be saved as external file backup"""
@@ -258,10 +268,10 @@ def save(self, **kwargs):
258
268
update_fields = kwargs .get ('update_fields' , None )
259
269
if update_fields :
260
270
kwargs ['update_fields' ] = set (update_fields ).union ({'modified' })
261
- super (BaseTask , self ).save (** kwargs )
262
271
if schedule_job :
263
- self .schedule ()
264
- super (BaseTask , self ).save ()
272
+ self .schedule () # schedule() already calls save()
273
+ else :
274
+ super (BaseTask , self ).save (** kwargs )
265
275
266
276
def delete (self , ** kwargs ):
267
277
self .unschedule ()
@@ -371,16 +381,20 @@ def _enqueue_args(self):
371
381
res ['meta' ]['interval' ] = self .interval_seconds ()
372
382
return res
373
383
384
+ def _schedule_time (self ):
385
+ _now = timezone .now ()
386
+ if self .scheduled_time >= _now :
387
+ return super ()._schedule_time ()
388
+ gap = math .ceil ((_now .timestamp () - self .scheduled_time .timestamp ()) / self .interval_seconds ())
389
+ if self .repeat is None or self .repeat >= gap :
390
+ self .scheduled_time += timedelta (seconds = self .interval_seconds () * gap )
391
+ self .repeat = (self .repeat - gap ) if self .repeat is not None else None
392
+ return super ()._schedule_time ()
393
+
374
394
def ready_for_schedule (self ):
375
395
if super (RepeatableTask , self ).ready_for_schedule () is False :
376
396
return False
377
- if self .scheduled_time < timezone .now ():
378
- gap = math .ceil ((timezone .now ().timestamp () - self .scheduled_time .timestamp ()) / self .interval_seconds ())
379
- if self .repeat is None or self .repeat >= gap :
380
- self .scheduled_time += timedelta (seconds = self .interval_seconds () * gap )
381
- self .repeat = (self .repeat - gap ) if self .repeat is not None else None
382
-
383
- if self .scheduled_time < timezone .now ():
397
+ if self ._schedule_time () < timezone .now ():
384
398
return False
385
399
return True
386
400
@@ -411,7 +425,8 @@ def clean_cron_string(self):
411
425
raise ValidationError ({'cron_string' : ValidationError (_ (str (e )), code = 'invalid' )})
412
426
413
427
def _schedule_time (self ):
414
- return tools .get_next_cron_time (self .cron_string )
428
+ self .scheduled_time = tools .get_next_cron_time (self .cron_string )
429
+ return super ()._schedule_time ()
415
430
416
431
class Meta :
417
432
verbose_name = _ ('Cron Task' )
0 commit comments