19
19
)
20
20
from scheduler .redis_models import JobStatus , SchedulerLock , Result , ResultType , JobModel
21
21
from scheduler .settings import logger , SCHEDULER_CONFIG
22
- from scheduler .types import ConnectionType , FunctionReferenceType , Self
22
+ from scheduler .types import ConnectionType , FunctionReferenceType , Self , PipelineType
23
23
24
24
25
25
class InvalidJobOperation (Exception ):
@@ -30,6 +30,10 @@ class NoSuchJobError(Exception):
30
30
pass
31
31
32
32
33
+ class NoSuchRegistryError (Exception ):
34
+ pass
35
+
36
+
33
37
def perform_job (job_model : JobModel , connection : ConnectionType ) -> Any : # noqa
34
38
"""The main execution method. Invokes the job function with the job arguments.
35
39
@@ -45,17 +49,17 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
45
49
coro_result = loop .run_until_complete (result )
46
50
result = coro_result
47
51
if job_model .success_callback :
48
- job_model .success_callback (job_model , connection , result ) # type: ignore
52
+ job_model .success_callback (job_model , connection , result )
49
53
return result
50
54
except :
51
55
if job_model .failure_callback :
52
- job_model .failure_callback (job_model , connection , * sys .exc_info ()) # type: ignore
56
+ job_model .failure_callback (job_model , connection , * sys .exc_info ())
53
57
raise
54
58
finally :
55
59
assert job_model is _job_stack .pop ()
56
60
57
61
58
- _job_stack = []
62
+ _job_stack : List [ JobModel ] = []
59
63
60
64
61
65
class Queue :
@@ -68,14 +72,14 @@ class Queue:
68
72
queued = "queued_job_registry" ,
69
73
)
70
74
71
- def __init__ (self , connection : Optional [ ConnectionType ] , name : str , is_async : bool = True ) -> None :
75
+ def __init__ (self , connection : ConnectionType , name : str , is_async : bool = True ) -> None :
72
76
"""Initializes a Queue object.
73
77
74
78
:param name: The queue name
75
79
:param connection: Broker connection
76
80
:param is_async: Whether jobs should run "async" (using the worker).
77
81
"""
78
- self .connection = connection
82
+ self .connection : ConnectionType = connection
79
83
self .name = name
80
84
self ._is_async = is_async
81
85
self .queued_job_registry = QueuedJobRegistry (connection = self .connection , name = self .name )
@@ -85,11 +89,11 @@ def __init__(self, connection: Optional[ConnectionType], name: str, is_async: bo
85
89
self .scheduled_job_registry = ScheduledJobRegistry (connection = self .connection , name = self .name )
86
90
self .canceled_job_registry = CanceledJobRegistry (connection = self .connection , name = self .name )
87
91
88
- def __len__ (self ):
92
+ def __len__ (self ) -> int :
89
93
return self .count
90
94
91
95
@property
92
- def scheduler_pid (self ) -> int :
96
+ def scheduler_pid (self ) -> Optional [ int ] :
93
97
lock = SchedulerLock (self .name )
94
98
pid = lock .value (self .connection )
95
99
return int (pid .decode ()) if pid is not None else None
@@ -155,11 +159,11 @@ def count(self) -> int:
155
159
res += getattr (self , registry ).count (connection = self .connection )
156
160
return res
157
161
158
- def get_registry (self , name : str ) -> Union [ None , JobNamesRegistry ] :
162
+ def get_registry (self , name : str ) -> JobNamesRegistry :
159
163
name = name .lower ()
160
164
if name in Queue .REGISTRIES :
161
- return getattr (self , Queue .REGISTRIES [name ])
162
- return None
165
+ return getattr (self , Queue .REGISTRIES [name ]) # type: ignore
166
+ raise NoSuchRegistryError ( f"Unknown registry name { name } " )
163
167
164
168
def get_all_job_names (self ) -> List [str ]:
165
169
res = list ()
@@ -178,22 +182,21 @@ def get_all_jobs(self) -> List[JobModel]:
178
182
def create_and_enqueue_job (
179
183
self ,
180
184
func : FunctionReferenceType ,
181
- args : Union [Tuple , List , None ] = None ,
182
- kwargs : Optional [Dict ] = None ,
185
+ args : Union [Tuple [ Any , ...], List [ Any ] , None ] = None ,
186
+ kwargs : Optional [Dict [ str , Any ] ] = None ,
183
187
when : Optional [datetime ] = None ,
184
188
timeout : Optional [int ] = None ,
185
189
result_ttl : Optional [int ] = None ,
186
190
job_info_ttl : Optional [int ] = None ,
187
191
description : Optional [str ] = None ,
188
192
name : Optional [str ] = None ,
189
193
at_front : bool = False ,
190
- meta : Optional [Dict ] = None ,
194
+ meta : Optional [Dict [ str , Any ] ] = None ,
191
195
on_success : Optional [Callback ] = None ,
192
196
on_failure : Optional [Callback ] = None ,
193
197
on_stopped : Optional [Callback ] = None ,
194
198
task_type : Optional [str ] = None ,
195
199
scheduled_task_id : Optional [int ] = None ,
196
- pipeline : Optional [ConnectionType ] = None ,
197
200
) -> JobModel :
198
201
"""Creates a job to represent the delayed function call and enqueues it.
199
202
:param when: When to schedule the job (None to enqueue immediately)
@@ -212,7 +215,6 @@ def create_and_enqueue_job(
212
215
:param on_stopped: Callback for on stopped
213
216
:param task_type: The task type
214
217
:param scheduled_task_id: The scheduled task id
215
- :param pipeline: The Broker Pipeline
216
218
:returns: The enqueued Job
217
219
"""
218
220
status = JobStatus .QUEUED if when is None else JobStatus .SCHEDULED
@@ -236,7 +238,7 @@ def create_and_enqueue_job(
236
238
scheduled_task_id = scheduled_task_id ,
237
239
)
238
240
if when is None :
239
- job_model = self .enqueue_job (job_model , connection = pipeline , at_front = at_front )
241
+ job_model = self .enqueue_job (job_model , at_front = at_front )
240
242
elif isinstance (when , datetime ):
241
243
job_model .save (connection = self .connection )
242
244
self .scheduled_job_registry .schedule (self .connection , job_model .name , when )
@@ -246,7 +248,7 @@ def create_and_enqueue_job(
246
248
247
249
def job_handle_success (
248
250
self , job : JobModel , result : Any , job_info_ttl : int , result_ttl : int , connection : ConnectionType
249
- ):
251
+ ) -> None :
250
252
"""Saves and cleanup job after successful execution"""
251
253
job .after_execution (
252
254
job_info_ttl ,
@@ -264,7 +266,7 @@ def job_handle_success(
264
266
ttl = result_ttl ,
265
267
)
266
268
267
- def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ):
269
+ def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ) -> None :
268
270
# Does not set job status since the job might be stopped
269
271
job .after_execution (
270
272
SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
@@ -304,10 +306,7 @@ def run_sync(self, job: JobModel) -> JobModel:
304
306
305
307
@classmethod
306
308
def dequeue_any (
307
- cls ,
308
- queues : List [Self ],
309
- timeout : Optional [int ],
310
- connection : Optional [ConnectionType ] = None ,
309
+ cls , queues : List [Self ], timeout : Optional [int ], connection : ConnectionType
311
310
) -> Tuple [Optional [JobModel ], Optional [Self ]]:
312
311
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
313
312
is important.
@@ -410,19 +409,19 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
410
409
pass
411
410
412
411
def enqueue_job (
413
- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
412
+ self , job_model : JobModel , pipeline : Optional [PipelineType ] = None , at_front : bool = False
414
413
) -> JobModel :
415
414
"""Enqueues a job for delayed execution without checking dependencies.
416
415
417
416
If Queue is instantiated with is_async=False, job is executed immediately.
418
417
:param job_model: The job redis model
419
- :param connection : The Redis Pipeline
418
+ :param pipeline : The Broker Pipeline
420
419
:param at_front: Whether to enqueue the job at the front
421
420
422
421
:returns: The enqueued JobModel
423
422
"""
424
423
425
- pipe = connection if connection is not None else self .connection .pipeline ()
424
+ pipe : PipelineType = pipeline if pipeline is not None else self .connection .pipeline ()
426
425
job_model .started_at = None
427
426
job_model .ended_at = None
428
427
job_model .status = JobStatus .QUEUED
0 commit comments