Skip to content

Commit 7a1a02b

Browse files
committed
Job scheduler: Add start time to scheduled job health info
Change-Id: I04dd4eb2e1214eed5cd997a1afa9a82d697e2454
1 parent 7b01f6c commit 7a1a02b

File tree

5 files changed

+45
-27
lines changed

5 files changed

+45
-27
lines changed

cmk/gui/background_job/_models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
44
# conditions defined in the file COPYING, which is part of this source code package.
55

6-
from collections.abc import Mapping, Sequence
6+
from collections.abc import Mapping
77

88
from pydantic import BaseModel
99

@@ -56,7 +56,7 @@ class BackgroundJobsHealth(BaseModel, frozen=True):
5656

5757

5858
class ScheduledJobsHealth(BaseModel, frozen=True):
59-
running_jobs: Sequence[str]
59+
running_jobs: Mapping[str, int]
6060
job_executions: Mapping[str, int]
6161

6262

cmk/gui/job_scheduler/_background_jobs/_app.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ async def check_health(request: Request) -> HealthResponse:
105105
job_executions=executor.job_executions(),
106106
),
107107
scheduled_jobs=ScheduledJobsHealth(
108-
running_jobs=list(filter_running_jobs(scheduler_state.running_jobs).keys()),
108+
running_jobs={
109+
name: job.started_at
110+
for name, job in filter_running_jobs(scheduler_state.running_jobs).items()
111+
},
109112
job_executions=dict(scheduler_state.job_executions),
110113
),
111114
)

cmk/gui/job_scheduler/_scheduler.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,19 @@ def run_scheduled_jobs(
122122
state.job_executions[job.name] += 1
123123
if job.run_in_thread:
124124
logger.debug("Starting [%s] in thread", job.name)
125-
state.running_jobs[job.name] = thread = threading.Thread(
126-
target=job_thread_main,
127-
args=(
128-
job,
129-
trace.Link(span.get_span_context()),
130-
crash_report_callback,
125+
state.running_jobs[job.name] = ScheduledJob(
126+
started_at=int(time.time()),
127+
thread=(
128+
thread := threading.Thread(
129+
target=job_thread_main,
130+
args=(
131+
job,
132+
trace.Link(span.get_span_context()),
133+
crash_report_callback,
134+
),
135+
name=f"scheduled-{job.name}",
136+
)
131137
),
132-
name=f"scheduled-{job.name}",
133138
)
134139
thread.start()
135140
logger.debug("Started [%s]", job.name)
@@ -174,33 +179,37 @@ def job_thread_main(
174179

175180

176181
@tracer.start_as_current_span("wait_for_job_threads")
177-
def _wait_for_job_threads(running_jobs: dict[str, threading.Thread]) -> None:
182+
def _wait_for_job_threads(running_jobs: dict[str, ScheduledJob]) -> None:
178183
logger.debug("Waiting for threads to terminate")
179-
for job_name, thread in list(running_jobs.items()):
180-
thread.join()
184+
for job_name, job in list(running_jobs.items()):
185+
job.thread.join()
181186
del running_jobs[job_name]
182187

183188

184-
def _collect_finished_threads(running_jobs: dict[str, threading.Thread]) -> None:
185-
for job_name, thread in list(running_jobs.items()):
186-
if not thread.is_alive():
189+
def _collect_finished_threads(running_jobs: dict[str, ScheduledJob]) -> None:
190+
for job_name, job in list(running_jobs.items()):
191+
if not job.thread.is_alive():
187192
logger.debug("Removing finished thread [%s]", job_name)
188193
del running_jobs[job_name]
189194

190195

191-
def filter_running_jobs(
192-
running_jobs: Mapping[str, threading.Thread],
193-
) -> dict[str, threading.Thread]:
196+
def filter_running_jobs(running_jobs: Mapping[str, ScheduledJob]) -> dict[str, ScheduledJob]:
194197
"""Provide an up-to-date list of running jobs.
195198
196199
collect_finished_threads might have not been executed since a job finished, which
197200
causes some lag in the update of scheduler_state.running_jobs. This function
198201
does some ad-hoc filtering to get the correct list of running jobs.
199202
"""
200-
return {job_id: thread for job_id, thread in running_jobs.items() if thread.is_alive()}
203+
return {job_id: job for job_id, job in running_jobs.items() if job.thread.is_alive()}
204+
205+
206+
@dataclass
207+
class ScheduledJob:
208+
started_at: int
209+
thread: threading.Thread
201210

202211

203212
@dataclass
204213
class SchedulerState:
205-
running_jobs: dict[str, threading.Thread] = field(default_factory=dict)
214+
running_jobs: dict[str, ScheduledJob] = field(default_factory=dict)
206215
job_executions: Counter[str] = field(default_factory=Counter)

tests/unit/cmk/gui/job_scheduler/background_jobs/test_app.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
TerminateRequest,
2929
)
3030
from cmk.gui.job_scheduler._background_jobs._app import get_application
31-
from cmk.gui.job_scheduler._scheduler import SchedulerState
31+
from cmk.gui.job_scheduler._scheduler import ScheduledJob, SchedulerState
3232

3333
logger = logging.getLogger(__name__)
3434

@@ -106,8 +106,14 @@ def _get_test_client(loaded_at: int) -> TestClient:
106106
executor=DummyExecutor(logger),
107107
scheduler_state=SchedulerState(
108108
running_jobs={
109-
"scheduled_1_running": DummyThread(is_stopped=False),
110-
"scheduled_2_finished": DummyThread(is_stopped=True),
109+
"scheduled_1_running": ScheduledJob(
110+
started_at=123,
111+
thread=DummyThread(is_stopped=False),
112+
),
113+
"scheduled_2_finished": ScheduledJob(
114+
started_at=111,
115+
thread=DummyThread(is_stopped=True),
116+
),
111117
},
112118
job_executions=Counter({"scheduled_1": 1, "scheduled_2": 2}),
113119
),
@@ -184,7 +190,7 @@ def test_health_check() -> None:
184190
)
185191
assert response.background_jobs.running_jobs == {"job_id": 42}
186192
assert response.background_jobs.job_executions == {"job_1": 1, "job_2": 2}
187-
assert response.scheduled_jobs.running_jobs == ["scheduled_1_running"]
193+
assert response.scheduled_jobs.running_jobs == {"scheduled_1_running": 123}
188194
assert response.scheduled_jobs.job_executions == {"scheduled_1": 1, "scheduled_2": 2}
189195

190196

tests/unit/cmk/gui/job_scheduler/test_job_scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_run_scheduled_jobs_in_thread() -> None:
7676
run_scheduled_jobs(jobs, state, crash_report_callback=reraise_exception)
7777

7878
assert "threaded_job" in state.running_jobs
79-
state.running_jobs["threaded_job"].join()
79+
state.running_jobs["threaded_job"].thread.join()
8080
assert called.is_set()
8181
assert state.job_executions == {"threaded_job": 1}
8282

@@ -111,5 +111,5 @@ def test_run_scheduled_jobs_in_thread_does_not_start_twice(
111111
finally:
112112
shall_terminate.set()
113113
assert "threaded_job" in state.running_jobs
114-
state.running_jobs["threaded_job"].join()
114+
state.running_jobs["threaded_job"].thread.join()
115115
assert state.job_executions == {"threaded_job": 1}

0 commit comments

Comments
 (0)