Skip to content

Commit 7b01f6c

Browse files
committed
Job scheduler: Filter finished scheduled jobs on health endpoint query
The housekeeping of finished jobs is done once a minute. In case a job finished, the health endpoint still reported the job as running until the next housekeeping cycle. This change adds a filter to the health endpoint query to exclude finished jobs on query. Change-Id: I36fcde536727c95236df0f3c668e61622a973ec6
1 parent d7333ad commit 7b01f6c

File tree

3 files changed

+28
-4
lines changed

3 files changed

+28
-4
lines changed

cmk/gui/job_scheduler/_background_jobs/_app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
StartResponse,
3030
TerminateRequest,
3131
)
32-
from cmk.gui.job_scheduler._scheduler import SchedulerState
32+
from cmk.gui.job_scheduler._scheduler import filter_running_jobs, SchedulerState
3333

3434

3535
def get_application(
@@ -105,7 +105,7 @@ async def check_health(request: Request) -> HealthResponse:
105105
job_executions=executor.job_executions(),
106106
),
107107
scheduled_jobs=ScheduledJobsHealth(
108-
running_jobs=list(scheduler_state.running_jobs.keys()),
108+
running_jobs=list(filter_running_jobs(scheduler_state.running_jobs).keys()),
109109
job_executions=dict(scheduler_state.job_executions),
110110
),
111111
)

cmk/gui/job_scheduler/_scheduler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,18 @@ def _collect_finished_threads(running_jobs: dict[str, threading.Thread]) -> None
188188
del running_jobs[job_name]
189189

190190

191+
def filter_running_jobs(
192+
running_jobs: Mapping[str, threading.Thread],
193+
) -> dict[str, threading.Thread]:
194+
"""Provide an up-to-date list of running jobs.
195+
196+
collect_finished_threads might have not been executed since a job finished, which
197+
causes some lag in the update of scheduler_state.running_jobs. This function
198+
does some ad-hoc filtering to get the correct list of running jobs.
199+
"""
200+
return {job_id: thread for job_id, thread in running_jobs.items() if thread.is_alive()}
201+
202+
191203
@dataclass
192204
class SchedulerState:
193205
running_jobs: dict[str, threading.Thread] = field(default_factory=dict)

tests/unit/cmk/gui/job_scheduler/background_jobs/test_app.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ def on_scheduler_start(cls, executor: JobExecutor) -> None:
8181
HelloJob.on_scheduler_start_called = True
8282

8383

84+
class DummyThread(threading.Thread):
85+
def __init__(self, is_stopped: bool) -> None:
86+
super().__init__()
87+
self._is_stopped = is_stopped
88+
89+
def is_alive(self) -> bool:
90+
return not self._is_stopped
91+
92+
8493
def _get_test_client(loaded_at: int) -> TestClient:
8594
return TestClient(
8695
get_application(
@@ -96,7 +105,10 @@ def _get_test_client(loaded_at: int) -> TestClient:
96105
registered_jobs={"hello_job": HelloJob},
97106
executor=DummyExecutor(logger),
98107
scheduler_state=SchedulerState(
99-
running_jobs={"scheduled_1": threading.Thread()},
108+
running_jobs={
109+
"scheduled_1_running": DummyThread(is_stopped=False),
110+
"scheduled_2_finished": DummyThread(is_stopped=True),
111+
},
100112
job_executions=Counter({"scheduled_1": 1, "scheduled_2": 2}),
101113
),
102114
)
@@ -172,7 +184,7 @@ def test_health_check() -> None:
172184
)
173185
assert response.background_jobs.running_jobs == {"job_id": 42}
174186
assert response.background_jobs.job_executions == {"job_1": 1, "job_2": 2}
175-
assert response.scheduled_jobs.running_jobs == ["scheduled_1"]
187+
assert response.scheduled_jobs.running_jobs == ["scheduled_1_running"]
176188
assert response.scheduled_jobs.job_executions == {"scheduled_1": 1, "scheduled_2": 2}
177189

178190

0 commit comments

Comments
 (0)