diff --git a/arq/connections.py b/arq/connections.py index 2e9d3db0..c74c6d3a 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -16,6 +16,9 @@ from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job from .utils import timestamp_ms, to_ms, to_unix_ms +if TYPE_CHECKING: + from redis.asyncio.client import Pipeline + logger = logging.getLogger('arq.connections') @@ -145,6 +148,7 @@ async def enqueue_job( defer_by_ms = to_ms(_defer_by) expires_ms = to_ms(_expires) + pipe: 'Pipeline[bytes]' async with self.pipeline(transaction=True) as pipe: await pipe.watch(job_key) if await pipe.exists(job_key, result_key_prefix + job_id): @@ -163,8 +167,8 @@ async def enqueue_job( job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() - pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call] - pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine] + pipe.psetex(job_key, expires_ms, job) + pipe.zadd(_queue_name, {job_id: score}) try: await pipe.execute() except WatchError: @@ -287,11 +291,12 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None: + pipe: 'Pipeline[bytes]' async with redis.pipeline(transaction=False) as pipe: - pipe.info(section='Server') # type: ignore[unused-coroutine] - pipe.info(section='Memory') # type: ignore[unused-coroutine] - pipe.info(section='Clients') # type: ignore[unused-coroutine] - pipe.dbsize() # type: ignore[unused-coroutine] + pipe.info(section='Server') + pipe.info(section='Memory') + pipe.info(section='Clients') + pipe.dbsize() info_server, info_memory, info_clients, key_count = await pipe.execute() redis_version = info_server.get('redis_version', '?') diff --git a/arq/jobs.py b/arq/jobs.py index d0c0a5ef..8ec7d0cb 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -5,13 +5,17 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple from redis.asyncio import Redis from .constants import abort_jobs_ss, default_queue_name, in_progress_key_prefix, job_key_prefix, result_key_prefix from .utils import ms_to_datetime, poll, timestamp_ms +if TYPE_CHECKING: + from redis.asyncio.client import Pipeline + + logger = logging.getLogger('arq.jobs') Serializer = Callable[[Dict[str, Any]], bytes] @@ -89,7 +93,7 @@ async def result( Get the result of the job or, if the job raised an exception, reraise it. This function waits for the result if it's not yet available and the job is - present in the queue. Otherwise ``ResultNotFound`` is raised. + present in the queue. Otherwise, ``ResultNotFound`` is raised. :param timeout: maximum time to wait for the job result before raising ``TimeoutError``, will wait forever :param poll_delay: how often to poll redis for the job result @@ -102,9 +106,10 @@ async def result( poll_delay = pole_delay async for delay in poll(poll_delay): + tr: 'Pipeline[bytes]' async with self._redis.pipeline(transaction=True) as tr: - tr.get(result_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine] + tr.get(result_key_prefix + self.job_id) + tr.zscore(self._queue_name, self.job_id) v, s = await tr.execute() if v: @@ -153,10 +158,11 @@ async def status(self) -> JobStatus: """ Status of the job. """ + tr: 'Pipeline[bytes]' async with self._redis.pipeline(transaction=True) as tr: - tr.exists(result_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.exists(in_progress_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine] + tr.exists(result_key_prefix + self.job_id) + tr.exists(in_progress_key_prefix + self.job_id) + tr.zscore(self._queue_name, self.job_id) is_complete, is_in_progress, score = await tr.execute() if is_complete: @@ -179,9 +185,10 @@ async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0. """ job_info = await self.info() if job_info and job_info.score and job_info.score > timestamp_ms(): + tr: 'Pipeline[bytes]' async with self._redis.pipeline(transaction=True) as tr: - tr.zrem(self._queue_name, self.job_id) # type: ignore[unused-coroutine] - tr.zadd(self._queue_name, {self.job_id: 1}) # type: ignore[unused-coroutine] + tr.zrem(self._queue_name, self.job_id) + tr.zadd(self._queue_name, {self.job_id: 1}) await tr.execute() await self._redis.zadd(abort_jobs_ss, {self.job_id: timestamp_ms()}) diff --git a/arq/worker.py b/arq/worker.py index 2bdab0f0..67a7b2e8 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -41,6 +41,8 @@ ) if TYPE_CHECKING: + from redis.asyncio.client import Pipeline + from .typing import SecondsTimedelta, StartupShutdown, WorkerCoroutine, WorkerSettingsType # noqa F401 logger = logging.getLogger('arq.worker') @@ -404,11 +406,10 @@ async def _cancel_aborted_jobs(self) -> None: """ Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks. """ + pipe: 'Pipeline[bytes]' async with self.pool.pipeline(transaction=True) as pipe: - pipe.zrange(abort_jobs_ss, start=0, end=-1) # type: ignore[unused-coroutine] - pipe.zremrangebyscore( # type: ignore[unused-coroutine] - abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf') - ) + pipe.zrange(abort_jobs_ss, start=0, end=-1) + pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) abort_job_ids, _ = await pipe.execute() aborted: Set[str] = set() @@ -445,6 +446,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: job_id = job_id_b.decode() in_progress_key = in_progress_key_prefix + job_id + pipe: 'Pipeline[bytes]' async with self.pool.pipeline(transaction=True) as pipe: await pipe.watch(in_progress_key) ongoing_exists = await pipe.exists(in_progress_key) @@ -457,9 +459,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: continue pipe.multi() - pipe.psetex( # type: ignore[no-untyped-call] - in_progress_key, int(self.in_progress_timeout_s * 1000), b'1' - ) + pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1') try: await pipe.execute() except (ResponseError, WatchError): @@ -474,12 +474,13 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 start_ms = timestamp_ms() + pipe: 'Pipeline[bytes]' async with self.pool.pipeline(transaction=True) as pipe: - pipe.get(job_key_prefix + job_id) # type: ignore[unused-coroutine] - pipe.incr(retry_key_prefix + job_id) # type: ignore[unused-coroutine] - pipe.expire(retry_key_prefix + job_id, 88400) # type: ignore[unused-coroutine] + pipe.get(job_key_prefix + job_id) + pipe.incr(retry_key_prefix + job_id) + pipe.expire(retry_key_prefix + job_id, 88400) if self.allow_abort_jobs: - pipe.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] + pipe.zrem(abort_jobs_ss, job_id) v, job_try, _, abort_job = await pipe.execute() else: v, job_try, _ = await pipe.execute() @@ -686,41 +687,44 @@ async def finish_job( incr_score: Optional[int], keep_in_progress: Optional[float], ) -> None: + + tr: 'Pipeline[bytes]' async with self.pool.pipeline(transaction=True) as tr: delete_keys = [] in_progress_key = in_progress_key_prefix + job_id if keep_in_progress is None: delete_keys += [in_progress_key] else: - tr.pexpire(in_progress_key, to_ms(keep_in_progress)) # type: ignore[unused-coroutine] + tr.pexpire(in_progress_key, to_ms(keep_in_progress)) if finish: if result_data: expire = None if keep_result_forever else result_timeout_s - tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine] + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id] - tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] - tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + tr.zrem(abort_jobs_ss, job_id) + tr.zrem(self.queue_name, job_id) elif incr_score: - tr.zincrby(self.queue_name, incr_score, job_id) # type: ignore[unused-coroutine] + tr.zincrby(self.queue_name, incr_score, job_id) if delete_keys: - tr.delete(*delete_keys) # type: ignore[unused-coroutine] + tr.delete(*delete_keys) await tr.execute() async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None: + tr: 'Pipeline[bytes]' async with self.pool.pipeline(transaction=True) as tr: - tr.delete( # type: ignore[unused-coroutine] + tr.delete( retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id, ) - tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] - tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + tr.zrem(abort_jobs_ss, job_id) + tr.zrem(self.queue_name, job_id) # result_data would only be None if serializing the result fails keep_result = self.keep_result_forever or self.keep_result_s > 0 if result_data is not None and keep_result: # pragma: no branch expire = 0 if self.keep_result_forever else self.keep_result_s - tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine] + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) await tr.execute() async def heart_beat(self) -> None: diff --git a/requirements/linting.in b/requirements/linting.in index ae67c86a..81dd3634 100644 --- a/requirements/linting.in +++ b/requirements/linting.in @@ -4,4 +4,4 @@ flake8-quotes>=3,<4 isort[colors]>=5,<6 mypy<1 types-pytz -types_redis>=4.2,<4.3 +types_redis>=4 diff --git a/requirements/linting.txt b/requirements/linting.txt index 57176e06..2b246b72 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -1,8 +1,8 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # -# pip-compile --output-file=requirements/linting.txt requirements/linting.in +# pip-compile requirements/linting.in # black==22.6.0 # via -r requirements/linting.in @@ -40,9 +40,7 @@ tomli==2.0.1 # mypy types-pytz==2022.2.1.0 # via -r requirements/linting.in -types-redis==4.2.8 +types-redis==4.5.5.2 # via -r requirements/linting.in typing-extensions==4.3.0 - # via - # black - # mypy + # via mypy