Skip to content

Commit f391d70

Browse files
authored
fix:signal on non-main thread #284 (#287)
1 parent 24a34ae commit f391d70

File tree

7 files changed

+48
-19
lines changed

7 files changed

+48
-19
lines changed

scheduler/decorators.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ def __init__(
1818
at_front: bool = False,
1919
meta: Optional[Dict[Any, Any]] = None,
2020
description: Optional[str] = None,
21-
on_failure: Optional[Union["Callback", Callable[..., Any]]] = None,
22-
on_success: Optional[Union["Callback", Callable[..., Any]]] = None,
23-
on_stopped: Optional[Union["Callback", Callable[..., Any]]] = None,
21+
on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
22+
on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
23+
on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
2424
):
2525
"""A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when
2626
called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string

scheduler/helpers/callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Union, Callable, Any, Optional
33

44
from scheduler.helpers.utils import callable_func
5-
from scheduler.timeouts import JobTimeoutException
5+
from scheduler.helpers.timeouts import JobTimeoutException
66

77

88
class CallbackSetupError(Exception):

scheduler/timeouts.py renamed to scheduler/helpers/timeouts.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import ctypes
2+
import logging
23
import signal
34
import threading
45

6+
logger = logging.getLogger("scheduler")
7+
58

69
class BaseTimeoutException(Exception):
710
"""Base exception for timeouts."""
@@ -59,13 +62,19 @@ def handle_death_penalty(self, signum, frame) -> None:
5962
def setup_death_penalty(self) -> None:
6063
"""Sets up an alarm signal and a signal handler that raises an exception after the timeout amount
6164
(expressed in seconds)."""
62-
signal.signal(signal.SIGALRM, self.handle_death_penalty)
63-
signal.alarm(self._timeout)
65+
if threading.current_thread() is threading.main_thread():
66+
signal.signal(signal.SIGALRM, self.handle_death_penalty)
67+
signal.alarm(self._timeout)
68+
else:
69+
logger.warning(f"Ignoring death penalty setup in non-main thread `{threading.current_thread().name}`.")
6470

6571
def cancel_death_penalty(self) -> None:
6672
"""Removes the death penalty alarm and puts back the system into default signal handling."""
67-
signal.alarm(0)
68-
signal.signal(signal.SIGALRM, signal.SIG_DFL)
73+
if threading.current_thread() is threading.main_thread():
74+
signal.alarm(0)
75+
signal.signal(signal.SIGALRM, signal.SIG_DFL)
76+
else:
77+
logger.warning(f"Ignoring death penalty cancel in non-main thread `{threading.current_thread().name}`.")
6978

7079

7180
class TimerDeathPenalty(BaseDeathPenalty):

scheduler/settings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from scheduler.types import SchedulerConfiguration, QueueConfiguration
88

99
logger = logging.getLogger("scheduler")
10-
logging.basicConfig(level=logging.DEBUG)
1110

1211
_QUEUES: Dict[str, QueueConfiguration] = dict()
1312
SCHEDULER_CONFIG: SchedulerConfiguration = SchedulerConfiguration()

scheduler/tests/test_job_decorator.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import threading
12
import time
23

34
from django.test import TestCase
@@ -45,16 +46,21 @@ def __eq__(self, other):
4546

4647

4748
@job()
48-
def long_running_func(x):
49+
def func_with_param(x):
4950
x.run()
5051

5152

53+
@job(timeout=1)
54+
def long_running_func():
55+
time.sleep(1000)
56+
57+
5258
class JobDecoratorTest(TestCase):
5359
def setUp(self) -> None:
5460
get_queue("default").connection.flushall()
5561

5662
def test_all_job_methods_registered(self):
57-
self.assertEqual(5, len(JOB_METHODS_LIST))
63+
self.assertEqual(6, len(JOB_METHODS_LIST))
5864

5965
def test_job_decorator_no_params(self):
6066
test_job.delay()
@@ -104,23 +110,38 @@ def _assert_job_with_func_and_props(self, queue_name, expected_func, expected_re
104110

105111
def test_job_decorator_bad_queue(self):
106112
with self.assertRaises(settings.QueueNotFoundError):
107-
108113
@job("bad-queue")
109114
def test_job_bad_queue():
110-
time.sleep(1)
111115
return 1 + 1
112116

113117
def test_job_decorator_delay_with_param(self):
114118
queue_name = "default"
115-
long_running_func.delay(MyClass())
119+
func_with_param.delay(MyClass())
116120

117121
worker = create_worker(queue_name, burst=True)
118122
worker.work()
119123

120124
jobs_list = worker.queues[0].get_all_jobs()
121125
self.assertEqual(1, len(jobs_list))
122126
job = jobs_list[0]
123-
self.assertEqual(job.func, long_running_func)
127+
self.assertEqual(job.func, func_with_param)
124128
self.assertEqual(job.kwargs, {})
125129
self.assertEqual(job.status, JobStatus.FINISHED)
126130
self.assertEqual(job.args, (MyClass(),))
131+
132+
def test_job_decorator_delay_with_param_worker_thread(self):
133+
queue_name = "default"
134+
135+
long_running_func.delay()
136+
137+
worker = create_worker(queue_name, burst=True)
138+
t = threading.Thread(target=worker.work)
139+
t.start()
140+
t.join()
141+
142+
jobs_list = get_queue(queue_name).get_all_jobs()
143+
self.assertEqual(1, len(jobs_list))
144+
j = jobs_list[0]
145+
self.assertEqual(j.func, long_running_func)
146+
self.assertEqual(j.kwargs, {})
147+
self.assertEqual(j.status, JobStatus.FAILED)

scheduler/types/settings_types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import sys
22
from dataclasses import dataclass
33
from enum import Enum
4-
from typing import Callable, Dict, Optional, List, Tuple, Any, Type
4+
from typing import Callable, Dict, Optional, List, Tuple, Any, Type, ClassVar
55

6-
from scheduler.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
6+
from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
77

88
if sys.version_info >= (3, 11):
99
from typing import Self
@@ -45,7 +45,7 @@ class SchedulerConfiguration:
4545

4646
@dataclass(slots=True, frozen=True, kw_only=True)
4747
class QueueConfiguration:
48-
__CONNECTION_FIELDS__ = {
48+
__CONNECTION_FIELDS__: ClassVar[Dict] = {
4949
"URL",
5050
"DB",
5151
"UNIX_SOCKET_PATH",

scheduler/worker/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from contextlib import suppress
3636

3737
from scheduler.helpers.queues import Queue, perform_job
38-
from scheduler.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
38+
from scheduler.helpers.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
3939
from scheduler.helpers.utils import utcnow, current_timestamp
4040

4141
try:

0 commit comments

Comments
 (0)