Skip to content

Commit 322f111

Browse files
authored
Merge pull request #43 from dabapps/run-after
Add support for scheduling jobs to run in the future
2 parents 0254ac1 + d161028 commit 322f111

File tree

4 files changed

+50
-1
lines changed

4 files changed

+50
-1
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ Job.objects.create(name='critical_job', priority=2)
129129

130130
Jobs will be ordered by their `priority` (highest to lowest) and then the time which they were created (oldest to newest) and processed in that order.
131131

132+
### Scheduling jobs
133+
If you'd like to create a job but have it run at some time in the future, you can use the `run_after` field on the Job model:
134+
135+
```python
136+
Job.objects.create(name='scheduled_job', run_after=timezone.now() + timedelta(minutes=10))
137+
```
138+
139+
Of course, the scheduled job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed.
140+
141+
It's also worth noting that, by default, scheduled jobs run as part of the same queue as all other jobs, and so if a job is already being processed at the time when your scheduled job is due to run, it won't run until that job has finished. If increased precision is important, you might consider using the `queue_name` feature to run a separate worker dedicated to only running scheduled jobs.
142+
132143
## Terminology
133144

134145
### Job
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Generated by Django 3.2rc1 on 2021-11-04 03:32
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("django_dbq", "0004_auto_20210818_0247"),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name="job",
15+
name="run_after",
16+
field=models.DateTimeField(db_index=True, null=True),
17+
),
18+
]

django_dbq/models.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ def delete_old(self):
6868

6969
def to_process(self, queue_name):
7070
return self.select_for_update().filter(
71-
queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW)
71+
models.Q(queue_name=queue_name)
72+
& models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW))
73+
& models.Q(
74+
models.Q(run_after__isnull=True)
75+
| models.Q(run_after__lte=timezone.now())
76+
)
7277
)
7378

7479

@@ -91,6 +96,7 @@ class STATES(TextChoices):
9196
workspace = JSONField(null=True)
9297
queue_name = models.CharField(max_length=20, default="default", db_index=True)
9398
priority = models.SmallIntegerField(default=0, db_index=True)
99+
run_after = models.DateTimeField(null=True, db_index=True)
94100

95101
class Meta:
96102
ordering = ["-priority", "created"]

django_dbq/tests.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,20 @@ def test_gets_jobs_in_priority_and_date_order(self):
210210
self.assertEqual(Job.objects.get_ready_or_none("default"), job_1)
211211
self.assertFalse(Job.objects.to_process("default").filter(id=job_2.id).exists())
212212

213+
def test_ignores_jobs_until_run_after_is_in_the_past(self):
214+
job_1 = Job.objects.create(name="testjob")
215+
job_2 = Job.objects.create(name="testjob", run_after=datetime(2021, 11, 4, 8))
216+
217+
with freezegun.freeze_time(datetime(2021, 11, 4, 7)):
218+
self.assertEqual(
219+
{job for job in Job.objects.to_process("default")}, {job_1}
220+
)
221+
222+
with freezegun.freeze_time(datetime(2021, 11, 4, 9)):
223+
self.assertEqual(
224+
{job for job in Job.objects.to_process("default")}, {job_1, job_2}
225+
)
226+
213227
def test_get_next_ready_job_created(self):
214228
"""
215229
Created jobs should be picked too.

0 commit comments

Comments
 (0)