Skip to content

Commit f28e85e

Browse files
committed
Add exclude_future_jobs arg to get_queue_depths
New arg exclude_future_jobs filters the READY and NEW jobs to only be ones without a run_after or the run_after is now/in the past
1 parent 01a506c commit f28e85e

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

django_dbq/models.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
get_failure_hook_name,
99
get_creation_hook_name,
1010
)
11-
from django.db.models import JSONField, UUIDField, Count, TextChoices
11+
from django.db.models import JSONField, UUIDField, Count, TextChoices, Q
1212
import datetime
1313
import logging
1414
import uuid
@@ -173,10 +173,17 @@ def run_creation_hook(self):
173173
creation_hook_function(self)
174174

175175
@staticmethod
176-
def get_queue_depths():
176+
def get_queue_depths(*, exclude_future_jobs=False):
177+
jobs_waiting_in_queue = Job.objects.filter(
178+
state__in=(Job.STATES.READY, Job.STATES.NEW)
179+
)
180+
if exclude_future_jobs:
181+
jobs_waiting_in_queue = jobs_waiting_in_queue.filter(
182+
Q(run_after__isnull=True) | Q(run_after__lte=timezone.now())
183+
)
184+
177185
annotation_dicts = (
178-
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
179-
.values("queue_name")
186+
jobs_waiting_in_queue.values("queue_name")
180187
.order_by("queue_name")
181188
.annotate(Count("queue_name"))
182189
)

django_dbq/tests.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,17 @@ def test_worker_with_queue_name(self):
7171
self.assertTrue("test_queue" in output)
7272

7373

74+
@freezegun.freeze_time("2025-01-01T12:00:00Z")
7475
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
7576
class JobModelMethodTestCase(TestCase):
7677
def test_get_queue_depths(self):
7778
Job.objects.create(name="testjob", queue_name="default")
7879
Job.objects.create(name="testjob", queue_name="testworker")
79-
Job.objects.create(name="testjob", queue_name="testworker")
80+
Job.objects.create(
81+
name="testjob",
82+
queue_name="testworker",
83+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
84+
)
8085
Job.objects.create(
8186
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
8287
)
@@ -87,6 +92,24 @@ def test_get_queue_depths(self):
8792
queue_depths = Job.get_queue_depths()
8893
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2})
8994

95+
def test_get_queue_depths_exclude_future_jobs(self):
96+
Job.objects.create(name="testjob", queue_name="default")
97+
Job.objects.create(name="testjob", queue_name="testworker")
98+
Job.objects.create(
99+
name="testjob",
100+
queue_name="testworker",
101+
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
102+
)
103+
Job.objects.create(
104+
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
105+
)
106+
Job.objects.create(
107+
name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE
108+
)
109+
110+
queue_depths = Job.get_queue_depths(exclude_future_jobs=True)
111+
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1})
112+
90113

91114
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
92115
class QueueDepthTestCase(TestCase):

0 commit comments

Comments
 (0)