diff --git a/arq/worker.py b/arq/worker.py index 398409b5..73a6158c 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -483,6 +483,7 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 abort_job = False function_name, enqueue_time_ms = '', 0 + function: Optional[Union[Function, CronJob]] = None args: Tuple[Any, ...] = () kwargs: Dict[Any, Any] = {} @@ -502,7 +503,7 @@ async def job_failed(exc: BaseException) -> None: serializer=self.job_serializer, queue_name=self.queue_name, ) - await asyncio.shield(self.finish_failed_job(job_id, result_data_)) + await asyncio.shield(self.finish_failed_job(job_id, result_data_, function)) if not v: logger.warning('job %s expired', job_id) @@ -516,14 +517,15 @@ async def job_failed(exc: BaseException) -> None: logger.exception('deserializing job %s failed', job_id) return await job_failed(e) + with contextlib.suppress(KeyError): + function = self.functions[function_name] + if abort_job: t = (timestamp_ms() - enqueue_time_ms) / 1000 logger.info('%6.2fs ⊘ %s:%s aborted before start', t, job_id, function_name) return await job_failed(asyncio.CancelledError()) - try: - function: Union[Function, CronJob] = self.functions[function_name] - except KeyError: + if function is None: logger.warning('job %s, function %r not found', job_id, function_name) return await job_failed(JobExecutionFailed(f'function {function_name!r} not found')) @@ -558,7 +560,7 @@ async def job_failed(exc: BaseException) -> None: self.queue_name, serializer=self.job_serializer, ) - return await asyncio.shield(self.finish_failed_job(job_id, result_data)) + return await asyncio.shield(self.finish_failed_job(job_id, result_data, function)) result = no_result exc_extra = None @@ -701,7 +703,9 @@ async def finish_job( tr.delete(*delete_keys) # type: ignore[unused-coroutine] await tr.execute() - async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None: + async def finish_failed_job( + self, job_id: str, result_data: Optional[bytes], function: Optional[Union[Function, CronJob]] + ) -> None: async with self.pool.pipeline(transaction=True) as tr: tr.delete( # type: ignore[unused-coroutine] retry_key_prefix + job_id, @@ -710,8 +714,16 @@ async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> ) tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + + keep_result_forever = self.keep_result_forever + keep_result_s = self.keep_result_s + if function is not None: + if function.keep_result_forever is not None: + keep_result_forever = function.keep_result_forever + if function.keep_result_s is not None: + keep_result_s = function.keep_result_s + keep_result = keep_result_forever or keep_result_s > 0 # result_data would only be None if serializing the result fails - keep_result = self.keep_result_forever or self.keep_result_s > 0 if result_data is not None and keep_result: # pragma: no branch expire = 0 if self.keep_result_forever else self.keep_result_s tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]