Skip to content

Commit 165a193

Browse files
authored
Merge branch 'sched-corner-cases' of 'https://github.com/jjmerchante/grimoirelab-core'
Merges #36 Closes #36
2 parents 868028d + d8c4e00 commit 165a193

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

src/grimoirelab/core/scheduler/scheduler.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rq.job
2929

3030
from django.conf import settings
31+
from rq.registry import StartedJobRegistry
3132

3233
from grimoirelab_toolkit.datetime import datetime_utcnow
3334

@@ -154,7 +155,15 @@ def _is_job_removed_or_stopped(job: Job, queue: str) -> bool:
154155
try:
155156
connection = django_rq.get_connection(queue)
156157
job_rq = rq.job.Job.fetch(job.uuid, connection=connection)
157-
return job_rq.get_status() in RQ_JOB_STOPPED_STATUS
158+
status = job_rq.get_status()
159+
if status == rq.job.JobStatus.STARTED:
160+
# Sometimes, the worker may be forcibly stopped, leaving the job
161+
# in the STARTED status. We need to check if the job has expired
162+
# due to a missing heartbeat.
163+
expiration_date = StartedJobRegistry(queue, connection).get_expiration_time(job_rq)
164+
return expiration_date < datetime.datetime.now()
165+
else:
166+
return status in RQ_JOB_STOPPED_STATUS
158167
except rq.exceptions.NoSuchJobError:
159168
return True
160169

@@ -228,6 +237,7 @@ def _schedule_job(
228237

229238
job.status = SchedulerStatus.ENQUEUED
230239
task.status = SchedulerStatus.ENQUEUED
240+
job.scheduled_at = scheduled_at
231241
task.scheduled_at = scheduled_at
232242
except Exception as e:
233243
logger.error(f"Error enqueuing job of task {task.task_id}. Not scheduled. Error: {e}")

src/grimoirelab/core/scheduler/tasks/models.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,13 @@ def prepare_job_parameters(self):
135135
job_args = args_gen.resuming_args(job.job_args['job_args'], progress)
136136
elif self.status == SchedulerStatus.RECOVERY:
137137
job = self.jobs.all().order_by('-job_num').first()
138-
progress = ChroniclerProgress.from_dict(job.progress)
139-
job_args = args_gen.recovery_args(job.job_args['job_args'], progress)
138+
if job and job.progress:
139+
progress = ChroniclerProgress.from_dict(job.progress)
140+
job_args = args_gen.recovery_args(job.job_args['job_args'], progress)
141+
else:
142+
job_args = args_gen.initial_args(self.task_args)
140143
else:
141-
job_args = self.task_args
144+
job_args = args_gen.initial_args(self.task_args)
142145

143146
task_args['job_args'] = job_args
144147

0 commit comments

Comments
 (0)