Skip to content

Commit 5fcdd42

Browse files
authored
Merge branch 'api-manage-tasks' of 'https://github.com/jjmerchante/grimoirelab-core'
Merges #35 Closes #35
2 parents 165a193 + b133d82 commit 5fcdd42

File tree

4 files changed

+240
-5
lines changed

4 files changed

+240
-5
lines changed

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
from django.conf import settings
3131
from rq.registry import StartedJobRegistry
32+
from rq.command import send_stop_job_command
3233

3334
from grimoirelab_toolkit.datetime import datetime_utcnow
3435

@@ -104,16 +105,53 @@ def cancel_task(task_uuid: str) -> None:
104105
"""
105106
task = find_task(task_uuid)
106107

107-
_, job_class = get_registered_task_model(task.TASK_TYPE)
108-
109-
jobs = job_class.objects.filter(task=task).all()
108+
jobs = task.jobs.all()
110109
for job in jobs:
111-
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection(task.default_job_queue))
110+
connection = django_rq.get_connection(task.default_job_queue)
111+
try:
112+
job_rq = rq.job.Job.fetch(job.uuid, connection=connection)
113+
except rq.exceptions.NoSuchJobError:
114+
continue
115+
if job_rq.get_status() == rq.job.JobStatus.STARTED:
116+
send_stop_job_command(connection, job_rq.id)
112117
job_rq.delete()
113118

114119
task.delete()
115120

116121

122+
def reschedule_task(task_uuid: str) -> None:
123+
"""Reschedule a task
124+
125+
The task will be rescheduled to be executed as soon
126+
as possible. If it is running, it will be cancelled and
127+
rescheduled.
128+
129+
:param task_uuid: uuid of the task to be rescheduled.
130+
131+
:raises NotFoundError: when the task is not found.
132+
"""
133+
task = find_task(task_uuid)
134+
135+
if task.status == SchedulerStatus.ENQUEUED:
136+
# Cancel the enqueued job and force the execution
137+
job = task.jobs.order_by('-scheduled_at').first()
138+
try:
139+
job_rq = rq.job.Job.fetch(job.uuid, connection=django_rq.get_connection(task.default_job_queue))
140+
job_rq.delete()
141+
except (rq.exceptions.NoSuchJobError, rq.exceptions.InvalidJobOperation):
142+
pass
143+
_schedule_job(task, job, datetime_utcnow(), job.job_args)
144+
145+
elif task.status == SchedulerStatus.RUNNING:
146+
# Make sure it is running
147+
job = task.jobs.order_by('-scheduled_at').first()
148+
if _is_job_removed_or_stopped(job, task.default_job_queue):
149+
_schedule_job(task, job, datetime_utcnow(), job.job_args)
150+
151+
else:
152+
_enqueue_task(task)
153+
154+
117155
def maintain_tasks() -> None:
118156
"""Maintain the tasks that are scheduled to be executed.
119157

src/grimoirelab/core/scheduler/urls.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
urlpatterns = [
2626
re_path(r'^add_task', views.add_task),
27+
re_path(r'^reschedule_task', views.reschedule_task),
28+
re_path(r'^remove_task', views.remove_task),
2729
path('tasks/', api.EventizerTaskList.as_view()),
2830
path('tasks/<str:uuid>/', api.EventizerTaskDetail.as_view()),
2931
path('tasks/<str:task_id>/jobs/', api.EventizerJobList.as_view()),

src/grimoirelab/core/scheduler/views.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
from django.conf import settings
2222

2323
from .scheduler import (
24-
schedule_task
24+
cancel_task,
25+
schedule_task,
26+
reschedule_task as scheduler_reschedule_task
2527
)
2628

2729

@@ -71,3 +73,45 @@ def add_task(request):
7173
'message': f"Task {task.id} added correctly"
7274
}
7375
return Response(response, status=200)
76+
77+
78+
@api_view(['POST'])
79+
def reschedule_task(request):
80+
"""Reschedule a Task
81+
82+
The body should contain the task id to reschedule:
83+
{
84+
'taskId': 'task_id'
85+
}
86+
"""
87+
data = request.data
88+
task_id = data['taskId']
89+
90+
scheduler_reschedule_task(task_id)
91+
92+
response = {
93+
'status': 'ok',
94+
'message': f"Task {task_id} rescheduled correctly"
95+
}
96+
return Response(response, status=200)
97+
98+
99+
@api_view(['POST'])
100+
def remove_task(request):
101+
"""Remove a Task
102+
103+
The body should contain the task id to remove:
104+
{
105+
'taskId': 'task_id'
106+
}
107+
"""
108+
data = request.data
109+
task_id = data['taskId']
110+
111+
cancel_task(task_id)
112+
113+
response = {
114+
'status': 'ok',
115+
'message': f"Task {task_id} removed correctly"
116+
}
117+
return Response(response, status=200)

tests/scheduler/test_scheduler.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
schedule_task,
3737
cancel_task,
3838
maintain_tasks,
39+
reschedule_task,
3940
_enqueue_task,
4041
_on_success_callback,
4142
_on_failure_callback
@@ -511,6 +512,156 @@ def test_no_task_found(self):
511512
cancel_task('non-existent-task-uuid')
512513

513514

515+
class TestRescheduleTask(GrimoireLabTestCase):
516+
def setUp(self):
517+
GRIMOIRELAB_TASK_MODELS.clear()
518+
task_class, job_class = register_task_model('test_task', SchedulerTestTask)
519+
520+
def cleanup_test_model():
521+
with django.db.connection.schema_editor() as schema_editor:
522+
schema_editor.delete_model(job_class)
523+
schema_editor.delete_model(task_class)
524+
525+
with django.db.connection.schema_editor() as schema_editor:
526+
schema_editor.create_model(task_class)
527+
schema_editor.create_model(job_class)
528+
529+
self.addCleanup(cleanup_test_model)
530+
super().setUp()
531+
532+
def test_reschedule_task_completed(self):
533+
"""Test a task is rescheduled correctly"""
534+
535+
task_args = {
536+
'a': 1,
537+
'b': 2,
538+
}
539+
540+
# Enqueue the task
541+
enqueued_at = grimoirelab_toolkit.datetime.datetime_utcnow()
542+
task = schedule_task('test_task', task_args)
543+
544+
# Check if a new job is created and the task is enqueued
545+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
546+
self.assertGreaterEqual(task.scheduled_at, enqueued_at)
547+
548+
# Run the job
549+
before_run_call_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
550+
worker = django_rq.workers.get_worker('testing')
551+
processed = worker.work(burst=True, with_scheduler=True)
552+
after_run_call_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
553+
554+
self.assertEqual(processed, True)
555+
556+
# Check task and task state after execution
557+
task.refresh_from_db()
558+
self.assertEqual(task.status, SchedulerStatus.COMPLETED)
559+
self.assertEqual(task.runs, 1)
560+
self.assertEqual(task.failures, 0)
561+
self.assertGreater(task.last_run, before_run_call_dt)
562+
self.assertLess(task.last_run, after_run_call_dt)
563+
564+
# Reschedule the completed task
565+
enqueued_at = grimoirelab_toolkit.datetime.datetime_utcnow()
566+
reschedule_task(task.uuid)
567+
568+
# Check if the task is rescheduled
569+
task.refresh_from_db()
570+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
571+
self.assertGreater(task.scheduled_at, enqueued_at)
572+
573+
# Run the job
574+
before_run_call_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
575+
worker = django_rq.workers.get_worker('testing')
576+
processed = worker.work(burst=True, with_scheduler=True)
577+
after_run_call_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
578+
579+
self.assertEqual(processed, True)
580+
581+
# Check task and task state after execution
582+
task.refresh_from_db()
583+
self.assertEqual(task.status, SchedulerStatus.COMPLETED)
584+
self.assertEqual(task.runs, 2)
585+
self.assertEqual(task.failures, 0)
586+
self.assertGreater(task.last_run, before_run_call_dt)
587+
self.assertLess(task.last_run, after_run_call_dt)
588+
589+
def test_reschedule_task_enqueued(self):
590+
"""Test if rescheduling a task enqueued change the scheduled time"""
591+
592+
# Enqueue the task
593+
task_args = {
594+
'a': 1,
595+
'b': 2,
596+
}
597+
first_schedule_time = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc)
598+
599+
task = SchedulerTestTask.create_task(task_args, 360, 10)
600+
job_db = _enqueue_task(task, scheduled_at=first_schedule_time)
601+
602+
# Check initial state of the task
603+
self.assertEqual(task.task_id, f'grimoire:task:{task.uuid}')
604+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
605+
self.assertEqual(task.scheduled_at, first_schedule_time)
606+
607+
# Reschedule the task
608+
reschedule_task(task.uuid)
609+
second_schedule_time = grimoirelab_toolkit.datetime.datetime_utcnow()
610+
611+
# Check if the task is rescheduled
612+
task.refresh_from_db()
613+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
614+
self.assertLess(task.scheduled_at, second_schedule_time)
615+
616+
def test_reschedule_inconsistent_task(self):
617+
"""Test if reschedules a task with an inconsistent state"""
618+
619+
# Enqueue the task
620+
task_args = {
621+
'a': 1,
622+
'b': 2,
623+
}
624+
625+
task = SchedulerTestTask.create_task(task_args, 360, 10)
626+
job_db = _enqueue_task(task)
627+
628+
# Check initial state of the task
629+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
630+
631+
# Delete job manually to create the inconsistent state
632+
job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
633+
job_rq.delete()
634+
635+
# Make sure the job was deleted
636+
with self.assertRaises(rq.exceptions.NoSuchJobError):
637+
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
638+
639+
# Reschedule the task
640+
scheduled_date = grimoirelab_toolkit.datetime.datetime_utcnow()
641+
reschedule_task(task.uuid)
642+
643+
# Check if the task is rescheduled
644+
task.refresh_from_db()
645+
job_db = task.jobs.order_by('-scheduled_at').first()
646+
647+
self.assertEqual(task.status, SchedulerStatus.ENQUEUED)
648+
self.assertEqual(job_db.status, SchedulerStatus.ENQUEUED)
649+
self.assertGreater(task.scheduled_at, scheduled_date)
650+
651+
# Run the job
652+
worker = django_rq.workers.get_worker('testing')
653+
processed = worker.work(burst=True, with_scheduler=True)
654+
655+
self.assertEqual(processed, True)
656+
657+
# Check task and task state after execution
658+
task.refresh_from_db()
659+
self.assertEqual(task.status, SchedulerStatus.COMPLETED)
660+
self.assertEqual(task.runs, 1)
661+
self.assertEqual(task.failures, 0)
662+
self.assertGreater(task.last_run, scheduled_date)
663+
664+
514665
class TestOnSuccessCallback(GrimoireLabTestCase):
515666
"""Unit tests for the default on_success_callback function"""
516667

0 commit comments

Comments
 (0)