From f4395bfd0653e2b631d34fd8ba028e971ad05490 Mon Sep 17 00:00:00 2001 From: collinr3 Date: Sat, 13 Aug 2022 13:04:41 +0100 Subject: [PATCH 1/6] Add concept of shift duration for a worker --- django_dbq/management/commands/worker.py | 40 +++++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 5f8d7b2..4e99322 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -15,9 +15,11 @@ class Worker: - def __init__(self, name, rate_limit_in_seconds): + def __init__(self, name, rate_limit_in_seconds, shift_limit_in_seconds): self.queue_name = name self.rate_limit_in_seconds = rate_limit_in_seconds + self.shift_limit_in_seconds = shift_limit_in_seconds + self.shift_start = timezone.now() self.alive = True self.last_job_finished = None self.current_job = None @@ -39,7 +41,7 @@ def shutdown(self, signum, frame): self.current_job.save(update_fields=["state"]) def run(self): - while self.alive: + while self.alive and self._shift_availability(): self.process_job() def process_job(self): @@ -111,6 +113,24 @@ def _process_job(self): self.current_job = None + def _shift_availability(self): + """ + Setting a value for shift_limit_in_seconds enables the worker to be run via a CRON Job for a period of time, + whereby worker will seek further jobs if time remains in the shift. If the shift_limit_in_seconds is + exceeded once a job is started it will still run to completion, regardless of the time remaining. + Consequently, the duration of the CRON Interval should be greater than the anticipated duration of the + longest Job. + If shift_limit_in_seconds is not supplied, the default of 0 will be used and the worker will continue to run + until shutdown. + """ + if self.shift_limit_in_seconds <= 0: + return True + elif self.shift_limit_in_seconds > 0 and (timezone.now() - self.shift_start).total_seconds() < \ + self.shift_limit_in_seconds: + return True + else: + return False + class Command(BaseCommand): @@ -125,6 +145,15 @@ def add_arguments(self, parser): default=1, type=int, ) + parser.add_argument( + "--shift_limit", + help="The time limit in seconds within which the worker can process new jobs. The default rate " + "limit is 0 seconds, which disables this argument, allowing the worker to run indefinitely.", + nargs="?", + default=0, + type=int, + ) + parser.add_argument( "--dry-run", action="store_true", @@ -142,13 +171,14 @@ def handle(self, *args, **options): queue_name = options["queue_name"] rate_limit_in_seconds = options["rate_limit"] + shift_limit_in_seconds = options["shift_limit"] self.stdout.write( - 'Starting job worker for queue "%s" with rate limit %s/s' - % (queue_name, rate_limit_in_seconds) + 'Starting job worker for queue "%s" with rate limit %s/s and a shift constraint of %s seconds.' + % (queue_name, rate_limit_in_seconds, shift_limit_in_seconds) ) - worker = Worker(queue_name, rate_limit_in_seconds) + worker = Worker(queue_name, rate_limit_in_seconds, shift_limit_in_seconds) if options["dry_run"]: return From 2143669d0b377d66b9a4c651fb760e0d15fa6eaa Mon Sep 17 00:00:00 2001 From: collinr3 Date: Sat, 13 Aug 2022 13:20:10 +0100 Subject: [PATCH 2/6] Add Model Admin --- django_dbq/admin.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 django_dbq/admin.py diff --git a/django_dbq/admin.py b/django_dbq/admin.py new file mode 100644 index 0000000..61e6670 --- /dev/null +++ b/django_dbq/admin.py @@ -0,0 +1,10 @@ +from django.contrib import admin + +# Register your models here. + +from .models import Job + + +@admin.register(Job) +class JobAdmin(admin.ModelAdmin): + list_display = ('id', 'name', 'state', 'queue_name', 'priority', 'run_after') \ No newline at end of file From ba57ee26392cd84f516c5377f2fb9a0ef38aafa3 Mon Sep 17 00:00:00 2001 From: collinr3 Date: Sat, 13 Aug 2022 13:41:32 +0100 Subject: [PATCH 3/6] Updated README to add `--shift-limit` information --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1baaad3..42e1edd 100644 --- a/README.md +++ b/README.md @@ -274,11 +274,16 @@ to ensure the jobs table remains at a reasonable size. To start a worker: ``` -manage.py worker [queue_name] [--rate_limit] +manage.py worker [queue_name] [--rate_limit] [--shift_limit] ``` - `queue_name` is optional, and will default to `default` - The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run. +- The `--shift_limit` flag is optional, and will default to `0`. Setting a value for `--shift_limit` enables the worker to be run via a CRON Job for a limited period of time, whereby + the worker will seek further jobs if time remains in the shift. If the `--shift_limit` is exceeded once a job is + started, the job will still run to completion, regardless of the time remaining. Consequently, the duration of the + CRON Interval should be greater than the anticipated duration of the longest Job. If `--shift_limit` is + not supplied, the default of `0` will be used and the worker will continue to run until shutdown. ##### manage.py queue_depth If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any From 0a96a24383583cae084b5fd0946d5f22e38c020e Mon Sep 17 00:00:00 2001 From: collinr3 Date: Mon, 15 Aug 2022 16:38:45 +0100 Subject: [PATCH 4/6] Added Tests and removed admin.py --- README.md | 12 +++-- django_dbq/admin.py | 10 ---- django_dbq/management/commands/worker.py | 14 ++++-- django_dbq/tests.py | 63 ++++++++++++++++++++++-- 4 files changed, 76 insertions(+), 23 deletions(-) delete mode 100644 django_dbq/admin.py diff --git a/README.md b/README.md index 42e1edd..3bfd6be 100644 --- a/README.md +++ b/README.md @@ -279,11 +279,13 @@ manage.py worker [queue_name] [--rate_limit] [--shift_limit] - `queue_name` is optional, and will default to `default` - The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run. -- The `--shift_limit` flag is optional, and will default to `0`. Setting a value for `--shift_limit` enables the worker to be run via a CRON Job for a limited period of time, whereby - the worker will seek further jobs if time remains in the shift. If the `--shift_limit` is exceeded once a job is - started, the job will still run to completion, regardless of the time remaining. Consequently, the duration of the - CRON Interval should be greater than the anticipated duration of the longest Job. If `--shift_limit` is - not supplied, the default of `0` will be used and the worker will continue to run until shutdown. +- The `--shift_limit` flag is optional, and will default to `0`. It is the maximum number of seconds that the worker + can seek new jobs to process. The worker will seek further jobs if time remains in the shift. If the + `--shift_limit` is exceeded once + a job is started, the job will still run to completion, regardless of the time remaining. One use case for `--shift_limit` is to run the worker process via a + CRON Job. See `tests.py` for illustrative effects of `[--rate_limit]` and `[--shift_limit]` combinations on processed + jobs. If `--shift_limit` is not supplied, the default of `0` will be used and the worker will continue to run until + the process is shutdown. ##### manage.py queue_depth If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any diff --git a/django_dbq/admin.py b/django_dbq/admin.py deleted file mode 100644 index 61e6670..0000000 --- a/django_dbq/admin.py +++ /dev/null @@ -1,10 +0,0 @@ -from django.contrib import admin - -# Register your models here. - -from .models import Job - - -@admin.register(Job) -class JobAdmin(admin.ModelAdmin): - list_display = ('id', 'name', 'state', 'queue_name', 'priority', 'run_after') \ No newline at end of file diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 4e99322..194ad4f 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -173,10 +173,16 @@ def handle(self, *args, **options): rate_limit_in_seconds = options["rate_limit"] shift_limit_in_seconds = options["shift_limit"] - self.stdout.write( - 'Starting job worker for queue "%s" with rate limit %s/s and a shift constraint of %s seconds.' - % (queue_name, rate_limit_in_seconds, shift_limit_in_seconds) - ) + if shift_limit_in_seconds: + self.stdout.write( + 'Starting job worker for queue "%s" with rate limit %s/s and a shift constraint of %s seconds.' + % (queue_name, rate_limit_in_seconds, shift_limit_in_seconds) + ) + else: + self.stdout.write( + 'Starting job worker for queue "%s" with rate limit %s/s.' + % (queue_name, rate_limit_in_seconds) + ) worker = Worker(queue_name, rate_limit_in_seconds, shift_limit_in_seconds) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 3ae7ab9..743f3d5 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -26,6 +26,10 @@ def failing_task(job): raise Exception("uh oh") +def shift_task(job): + job.workspace["message"] = f"{job.id} ran" + + def failure_hook(job, exception): job.workspace["output"] = "failure hook ran" @@ -163,7 +167,7 @@ def test_process_job_previous_job_long_time_ago(self, mock_sleep): class ShutdownTestCase(TestCase): def test_shutdown_sets_state_to_stopping(self): job = Job.objects.create(name="testjob") - worker = Worker("default", 1) + worker = Worker("default", 1, 0) worker.current_job = job worker.shutdown(None, None) @@ -279,7 +283,7 @@ def test_task_sequence(self): class ProcessJobTestCase(TestCase): def test_process_job(self): job = Job.objects.create(name="testjob") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.COMPLETE) @@ -288,7 +292,7 @@ def test_process_job_wrong_queue(self): Processing a different queue shouldn't touch our other job """ job = Job.objects.create(name="testjob", queue_name="lol") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.NEW) @@ -327,7 +331,7 @@ def test_creation_hook_only_runs_on_create(self): class JobFailureHookTestCase(TestCase): def test_failure_hook(self): job = Job.objects.create(name="testjob") - Worker("default", 1)._process_job() + Worker("default", 1, 0)._process_job() job = Job.objects.get() self.assertEqual(job.state, Job.STATES.FAILED) self.assertEqual(job.workspace["output"], "failure hook ran") @@ -361,3 +365,54 @@ def test_delete_old_jobs(self): self.assertEqual(Job.objects.count(), 2) self.assertTrue(j4 in Job.objects.all()) self.assertTrue(j5 in Job.objects.all()) + + +@override_settings(JOBS={"testshift": {"tasks": ["django_dbq.tests.shift_task"]}}) +class ShiftTestCase(TestCase): + """ Tests various combinations of rate_limit and shift_limit in terms of their impact on processing jobs""" + def test_rate_with_shorter_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=2, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit 2/s and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) + + def test_rate_with_equal_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit 1/s and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) + + def test_rate_with_longer_shift_limit(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit 1/s and a shift constraint of 2 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 0) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 2) + + def test_rate_with_two_workers(self): + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + Job.objects.create(name="testshift") + stdout = StringIO() + call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit 1/s and a shift constraint of 2 seconds" in output) + call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) + output = stdout.getvalue() + self.assertTrue("rate limit 1/s and a shift constraint of 1 seconds" in output) + self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) + self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 3) + + From d85abd5cb90978013bf58cf74eea666bfdb7f1c9 Mon Sep 17 00:00:00 2001 From: collinr3 Date: Tue, 16 Aug 2022 15:00:11 +0100 Subject: [PATCH 5/6] Updated to include resoultion to issue #53 System check identified no issues (0 silenced). Ran 29 tests in 7.164s OK --- django_dbq/management/commands/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 194ad4f..39b7938 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -175,12 +175,12 @@ def handle(self, *args, **options): if shift_limit_in_seconds: self.stdout.write( - 'Starting job worker for queue "%s" with rate limit %s/s and a shift constraint of %s seconds.' + 'Starting job worker for queue "%s" with rate limit of one job per %s second(s) and a shift constraint of %s seconds.' % (queue_name, rate_limit_in_seconds, shift_limit_in_seconds) ) else: self.stdout.write( - 'Starting job worker for queue "%s" with rate limit %s/s.' + 'Starting job worker for queue "%s" with rate limit of one job per %s second(s).' % (queue_name, rate_limit_in_seconds) ) From d269e80023e3e1af3934fd451b9b8ca3365cb86c Mon Sep 17 00:00:00 2001 From: collinr3 Date: Wed, 14 Sep 2022 18:50:25 +0100 Subject: [PATCH 6/6] Updated to correct tests relating to resolution to issue #53 System check identified no issues (0 silenced). ............................. ---------------------------------------------------------------------- Ran 29 tests in 7.158s OK --- django_dbq/tests.py | 10 +++++----- test-requirements.txt | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 743f3d5..1affd47 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -376,7 +376,7 @@ def test_rate_with_shorter_shift_limit(self): stdout = StringIO() call_command("worker", rate_limit=2, shift_limit=1, stdout=stdout) output = stdout.getvalue() - self.assertTrue("rate limit 2/s and a shift constraint of 1 seconds" in output) + self.assertTrue("rate limit of one job per 2 second(s) and a shift constraint of 1 seconds" in output) self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) @@ -386,7 +386,7 @@ def test_rate_with_equal_shift_limit(self): stdout = StringIO() call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) output = stdout.getvalue() - self.assertTrue("rate limit 1/s and a shift constraint of 1 seconds" in output) + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output) self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 1) @@ -396,7 +396,7 @@ def test_rate_with_longer_shift_limit(self): stdout = StringIO() call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) output = stdout.getvalue() - self.assertTrue("rate limit 1/s and a shift constraint of 2 seconds" in output) + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output) self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 0) self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 2) @@ -408,10 +408,10 @@ def test_rate_with_two_workers(self): stdout = StringIO() call_command("worker", rate_limit=1, shift_limit=2, stdout=stdout) output = stdout.getvalue() - self.assertTrue("rate limit 1/s and a shift constraint of 2 seconds" in output) + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 2 seconds" in output) call_command("worker", rate_limit=1, shift_limit=1, stdout=stdout) output = stdout.getvalue() - self.assertTrue("rate limit 1/s and a shift constraint of 1 seconds" in output) + self.assertTrue("rate limit of one job per 1 second(s) and a shift constraint of 1 seconds" in output) self.assertEqual(Job.objects.filter(state=Job.STATES.NEW).count(), 1) self.assertEqual(Job.objects.filter(state=Job.STATES.COMPLETE).count(), 3) diff --git a/test-requirements.txt b/test-requirements.txt index a1452df..a40e03b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,4 +3,4 @@ freezegun==0.3.12 mock==3.0.5 dj-database-url==0.5.0 psycopg2==2.8.4 -black==21.12b0 +black==22.6.0