@@ -167,37 +167,38 @@ def get_registry(self, name: str) -> JobNamesRegistry:
167
167
raise NoSuchRegistryError (f"Unknown registry name { name } " )
168
168
169
169
def get_all_job_names (self ) -> List [str ]:
170
- res = list ()
171
- res .extend (self .queued_job_registry .all ())
172
- res .extend (self .finished_job_registry .all ())
173
- res .extend (self .active_job_registry .all ())
174
- res .extend (self .failed_job_registry .all ())
175
- res .extend (self .scheduled_job_registry .all ())
176
- res .extend (self .canceled_job_registry .all ())
170
+ all_job_names = list ()
171
+ all_job_names .extend (self .queued_job_registry .all ())
172
+ all_job_names .extend (self .finished_job_registry .all ())
173
+ all_job_names .extend (self .active_job_registry .all ())
174
+ all_job_names .extend (self .failed_job_registry .all ())
175
+ all_job_names .extend (self .scheduled_job_registry .all ())
176
+ all_job_names .extend (self .canceled_job_registry .all ())
177
+ res = list (filter (lambda job_name : JobModel .exists (job_name , self .connection ), all_job_names ))
177
178
return res
178
179
179
180
def get_all_jobs (self ) -> List [JobModel ]:
180
181
job_names = self .get_all_job_names ()
181
182
return JobModel .get_many (job_names , connection = self .connection )
182
183
183
184
def create_and_enqueue_job (
184
- self ,
185
- func : FunctionReferenceType ,
186
- args : Union [Tuple [Any , ...], List [Any ], None ] = None ,
187
- kwargs : Optional [Dict [str , Any ]] = None ,
188
- when : Optional [datetime ] = None ,
189
- timeout : Optional [int ] = None ,
190
- result_ttl : Optional [int ] = None ,
191
- job_info_ttl : Optional [int ] = None ,
192
- description : Optional [str ] = None ,
193
- name : Optional [str ] = None ,
194
- at_front : bool = False ,
195
- meta : Optional [Dict [str , Any ]] = None ,
196
- on_success : Optional [Callback ] = None ,
197
- on_failure : Optional [Callback ] = None ,
198
- on_stopped : Optional [Callback ] = None ,
199
- task_type : Optional [str ] = None ,
200
- scheduled_task_id : Optional [int ] = None ,
185
+ self ,
186
+ func : FunctionReferenceType ,
187
+ args : Union [Tuple [Any , ...], List [Any ], None ] = None ,
188
+ kwargs : Optional [Dict [str , Any ]] = None ,
189
+ when : Optional [datetime ] = None ,
190
+ timeout : Optional [int ] = None ,
191
+ result_ttl : Optional [int ] = None ,
192
+ job_info_ttl : Optional [int ] = None ,
193
+ description : Optional [str ] = None ,
194
+ name : Optional [str ] = None ,
195
+ at_front : bool = False ,
196
+ meta : Optional [Dict [str , Any ]] = None ,
197
+ on_success : Optional [Callback ] = None ,
198
+ on_failure : Optional [Callback ] = None ,
199
+ on_stopped : Optional [Callback ] = None ,
200
+ task_type : Optional [str ] = None ,
201
+ scheduled_task_id : Optional [int ] = None ,
201
202
) -> JobModel :
202
203
"""Creates a job to represent the delayed function call and enqueues it.
203
204
:param when: When to schedule the job (None to enqueue immediately)
@@ -248,7 +249,7 @@ def create_and_enqueue_job(
248
249
return job_model
249
250
250
251
def job_handle_success (
251
- self , job : JobModel , result : Any , job_info_ttl : int , result_ttl : int , connection : ConnectionType
252
+ self , job : JobModel , result : Any , job_info_ttl : int , result_ttl : int , connection : ConnectionType
252
253
) -> None :
253
254
"""Saves and cleanup job after successful execution"""
254
255
job .after_execution (
@@ -307,7 +308,7 @@ def run_sync(self, job: JobModel) -> JobModel:
307
308
308
309
@classmethod
309
310
def dequeue_any (
310
- cls , queues : List [Self ], timeout : Optional [int ], connection : ConnectionType
311
+ cls , queues : List [Self ], timeout : Optional [int ], connection : ConnectionType
311
312
) -> Tuple [Optional [JobModel ], Optional [Self ]]:
312
313
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
313
314
is important.
@@ -410,7 +411,7 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
410
411
pass
411
412
412
413
def enqueue_job (
413
- self , job_model : JobModel , pipeline : Optional [PipelineType ] = None , at_front : bool = False
414
+ self , job_model : JobModel , pipeline : Optional [PipelineType ] = None , at_front : bool = False
414
415
) -> JobModel :
415
416
"""Enqueues a job for delayed execution without checking dependencies.
416
417
0 commit comments