Skip to content

Commit 2fc1b10

Browse files
authored
Merge pull request #68 from George9Waller/feature-exclude-future-jobs-from-queue-depth
Feature to exclude future jobs from queue depth
2 parents 86bfc20 + 57ba3cd commit 2fc1b10

File tree

4 files changed

+74
-8
lines changed

4 files changed

+74
-8
lines changed

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ queue_depths = Job.get_queue_depths()
288288
print(queue_depths) # {"default": 1, "other_queue": 1}
289289
```
290290

291+
You can also exclude jobs which exist but are scheduled to be run in the future from the queue depths, where `run_after` is set to a future time from now. To do this set the `exclude_future_jobs` kwarg like so:
292+
```python
293+
queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
294+
```
295+
291296
**Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included
292297
in the dict returned by this method.
293298

@@ -312,6 +317,8 @@ manage.py worker [queue_name] [--rate_limit]
312317
If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any
313318
jobs in the "NEW" or "READY" states will be returned.
314319

320+
If you wish to exclude jobs which are scheduled to be run in the future you can add `--exclude_future_jobs` to the command.
321+
315322
**Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned.
316323

317324
### Gotcha: `bulk_create`

django_dbq/management/commands/queue_depth.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ class Command(BaseCommand):
88

99
def add_arguments(self, parser):
1010
parser.add_argument("queue_name", nargs="*", default=["default"], type=str)
11+
parser.add_argument("--exclude_future_jobs", default=False, type=bool)
1112

1213
def handle(self, *args, **options):
1314
queue_names = options["queue_name"]
14-
queue_depths = Job.get_queue_depths()
15+
queue_depths = Job.get_queue_depths(
16+
exclude_future_jobs=options["exclude_future_jobs"]
17+
)
1518

1619
queue_depths_string = " ".join(
1720
[

django_dbq/models.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
get_failure_hook_name,
99
get_creation_hook_name,
1010
)
11-
from django.db.models import JSONField, UUIDField, Count, TextChoices
11+
from django.db.models import JSONField, UUIDField, Count, TextChoices, Q
1212
import datetime
1313
import logging
1414
import uuid
@@ -173,10 +173,17 @@ def run_creation_hook(self):
173173
creation_hook_function(self)
174174

175175
@staticmethod
176-
def get_queue_depths():
176+
def get_queue_depths(*, exclude_future_jobs=False):
177+
jobs_waiting_in_queue = Job.objects.filter(
178+
state__in=(Job.STATES.READY, Job.STATES.NEW)
179+
)
180+
if exclude_future_jobs:
181+
jobs_waiting_in_queue = jobs_waiting_in_queue.filter(
182+
Q(run_after__isnull=True) | Q(run_after__lte=timezone.now())
183+
)
184+
177185
annotation_dicts = (
178-
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
179-
.values("queue_name")
186+
jobs_waiting_in_queue.values("queue_name")
180187
.order_by("queue_name")
181188
.annotate(Count("queue_name"))
182189
)

django_dbq/tests.py

+52-3
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,17 @@ def test_worker_with_queue_name(self):
6363
self.assertTrue("test_queue" in output)
6464

6565

66+
@freezegun.freeze_time("2025-01-01T12:00:00Z")
6667
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
6768
class JobModelMethodTestCase(TestCase):
6869
def test_get_queue_depths(self):
6970
Job.objects.create(name="testjob", queue_name="default")
7071
Job.objects.create(name="testjob", queue_name="testworker")
71-
Job.objects.create(name="testjob", queue_name="testworker")
72+
Job.objects.create(
73+
name="testjob",
74+
queue_name="testworker",
75+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
76+
)
7277
Job.objects.create(
7378
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
7479
)
@@ -79,16 +84,38 @@ def test_get_queue_depths(self):
7984
queue_depths = Job.get_queue_depths()
8085
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2})
8186

87+
def test_get_queue_depths_exclude_future_jobs(self):
88+
Job.objects.create(name="testjob", queue_name="default")
89+
Job.objects.create(name="testjob", queue_name="testworker")
90+
Job.objects.create(
91+
name="testjob",
92+
queue_name="testworker",
93+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
94+
)
95+
Job.objects.create(
96+
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
97+
)
98+
Job.objects.create(
99+
name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE
100+
)
101+
102+
queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
103+
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1})
104+
82105

106+
@freezegun.freeze_time("2025-01-01T12:00:00Z")
83107
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
84108
class QueueDepthTestCase(TestCase):
85109
def test_queue_depth(self):
86-
87110
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
88111
Job.objects.create(name="testjob", state=Job.STATES.NEW)
89112
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
90113
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
91-
Job.objects.create(name="testjob", state=Job.STATES.READY)
114+
Job.objects.create(
115+
name="testjob",
116+
state=Job.STATES.READY,
117+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
118+
)
92119
Job.objects.create(
93120
name="testjob", queue_name="testqueue", state=Job.STATES.READY
94121
)
@@ -101,6 +128,28 @@ def test_queue_depth(self):
101128
output = stdout.getvalue()
102129
self.assertEqual(output.strip(), "event=queue_depths default=2")
103130

131+
def test_queue_depth_exclude_future_jobs(self):
132+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
133+
Job.objects.create(name="testjob", state=Job.STATES.NEW)
134+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
135+
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
136+
Job.objects.create(
137+
name="testjob",
138+
state=Job.STATES.READY,
139+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
140+
)
141+
Job.objects.create(
142+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
143+
)
144+
Job.objects.create(
145+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
146+
)
147+
148+
stdout = StringIO()
149+
call_command("queue_depth", exclude_future_jobs=True, stdout=stdout)
150+
output = stdout.getvalue()
151+
self.assertEqual(output.strip(), "event=queue_depths default=1")
152+
104153
def test_queue_depth_multiple_queues(self):
105154

106155
Job.objects.create(name="testjob", state=Job.STATES.FAILED)

0 commit comments

Comments
 (0)