Skip to content

Commit 0bc3066

Browse files
committed
fix:update os.wait4 to os.waitpid
1 parent be6fdae commit 0bc3066

File tree

3 files changed

+16
-15
lines changed

3 files changed

+16
-15
lines changed

scheduler/models/task.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ def is_scheduled(self) -> bool:
187187
return False
188188
# check whether job_id is in scheduled/queued/active jobs
189189
res = (
190-
(self.job_name in self.rqueue.scheduled_job_registry.all())
191-
or (self.job_name in self.rqueue.queued_job_registry.all())
192-
or (self.job_name in self.rqueue.active_job_registry.all())
190+
(self.job_name in self.rqueue.scheduled_job_registry.all())
191+
or (self.job_name in self.rqueue.queued_job_registry.all())
192+
or (self.job_name in self.rqueue.active_job_registry.all())
193193
)
194194
# If the job_id is not scheduled/queued/started,
195195
# update the job_id to None. (The job_id belongs to a previous run which is completed)

scheduler/tests/test_settings.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,24 @@
99

1010

1111
class TestWorkerAdmin(SchedulerBaseCase):
12-
1312
def setUp(self):
1413
from scheduler.settings import SCHEDULER_CONFIG
14+
1515
self.old_settings = SCHEDULER_CONFIG
1616

1717
def tearDown(self):
1818
from scheduler import settings as scheduler_settings
19+
1920
scheduler_settings.SCHEDULER_CONFIG = self.old_settings
2021

2122
def test_scheduler_config_as_dict(self):
2223
from scheduler.settings import SCHEDULER_CONFIG
24+
2325
settings.SCHEDULER_CONFIG = dict(
2426
EXECUTIONS_IN_PAGE=SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE + 1,
2527
SCHEDULER_INTERVAL=SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1,
2628
BROKER=Broker.REDIS,
2729
CALLBACK_TIMEOUT=SCHEDULER_CONFIG.SCHEDULER_INTERVAL + 1,
28-
2930
DEFAULT_SUCCESS_TTL=SCHEDULER_CONFIG.DEFAULT_SUCCESS_TTL + 1,
3031
DEFAULT_FAILURE_TTL=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL + 1,
3132
DEFAULT_JOB_TTL=SCHEDULER_CONFIG.DEFAULT_JOB_TTL + 1,
@@ -38,18 +39,19 @@ def test_scheduler_config_as_dict(self):
3839
)
3940
conf_settings()
4041
from scheduler.settings import SCHEDULER_CONFIG
42+
4143
for key, value in settings.SCHEDULER_CONFIG.items():
4244
self.assertEqual(getattr(SCHEDULER_CONFIG, key), value)
4345

4446
def test_scheduler_config_as_data_class(self):
4547
from scheduler.settings import SCHEDULER_CONFIG
48+
4649
self.assertEqual(SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE, 20)
4750
settings.SCHEDULER_CONFIG = SchedulerConfiguration(
4851
EXECUTIONS_IN_PAGE=1,
4952
SCHEDULER_INTERVAL=60,
5053
BROKER=Broker.REDIS,
5154
CALLBACK_TIMEOUT=1111,
52-
5355
DEFAULT_SUCCESS_TTL=1111,
5456
DEFAULT_FAILURE_TTL=111111,
5557
DEFAULT_JOB_TTL=1111,
@@ -62,6 +64,7 @@ def test_scheduler_config_as_data_class(self):
6264
)
6365
conf_settings()
6466
from scheduler.settings import SCHEDULER_CONFIG
67+
6568
for key, value in dataclasses.asdict(settings.SCHEDULER_CONFIG).items():
6669
self.assertEqual(getattr(SCHEDULER_CONFIG, key), value)
6770

@@ -71,7 +74,6 @@ def test_scheduler_config_as_dict_bad_param(self):
7174
SCHEDULER_INTERVAL=60,
7275
BROKER=Broker.REDIS,
7376
CALLBACK_TIMEOUT=1111,
74-
7577
DEFAULT_SUCCESS_TTL=1111,
7678
DEFAULT_FAILURE_TTL=111111,
7779
DEFAULT_JOB_TTL=1111,
@@ -81,6 +83,6 @@ def test_scheduler_config_as_dict_bad_param(self):
8183
DEFAULT_MAINTENANCE_TASK_INTERVAL=111,
8284
DEFAULT_JOB_MONITORING_INTERVAL=1111,
8385
SCHEDULER_FALLBACK_PERIOD_SECS=1111,
84-
BAD_PARAM='bad_value', # This should raise an error
86+
BAD_PARAM="bad_value", # This should raise an error
8587
)
8688
self.assertRaises(ImproperlyConfigured, conf_settings)

scheduler/worker/worker.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from datetime import timedelta
1414
from enum import Enum
1515
from random import shuffle
16-
from resource import struct_rusage
1716
from types import FrameType
1817
from typing import List, Optional, Tuple, Any, Iterable
1918

@@ -475,14 +474,14 @@ def kill_job_execution_process(self, sig: signal.Signals = SIGKILL):
475474
else:
476475
raise
477476

478-
def wait_for_job_execution_process(self) -> Tuple[Optional[int], Optional[int], Optional[struct_rusage]]:
477+
def wait_for_job_execution_process(self) -> Tuple[Optional[int], Optional[int]]:
479478
"""Waits for the job execution process to complete.
480479
Uses `0` as argument as to include "any child in the process group of the current process".
481480
"""
482-
pid = stat = rusage = None
481+
pid = stat = None
483482
with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes
484-
pid, stat, rusage = os.wait4(self._model.job_execution_process_pid, 0)
485-
return pid, stat, rusage
483+
pid, stat = os.waitpid(self._model.job_execution_process_pid, 0)
484+
return pid, stat
486485

487486
def request_force_stop(self, signum: int, frame: Optional[FrameType]):
488487
"""Terminates the application (cold shutdown).
@@ -634,14 +633,14 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
634633
:param job: The Job
635634
:param queue: The Queue
636635
"""
637-
retpid = ret_val = rusage = None
636+
retpid = ret_val = None
638637
job.started_at = utcnow()
639638
while True:
640639
try:
641640
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
642641
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
643642
):
644-
retpid, ret_val, rusage = self.wait_for_job_execution_process()
643+
retpid, ret_val = self.wait_for_job_execution_process()
645644
break
646645
except JobExecutionMonitorTimeoutException:
647646
# job execution process has not exited yet and is still running. Send a heartbeat to keep the worker alive.

0 commit comments

Comments
 (0)