Skip to content

Commit

Permalink
[dvsim] more robust local launcher process management
Browse files Browse the repository at this point in the history
Signed-off-by: James McCorrie <[email protected]>
  • Loading branch information
James McCorrie authored and rswarbrick committed Feb 12, 2025
1 parent 79fb85d commit c7d7697
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 52 deletions.
11 changes: 9 additions & 2 deletions util/dvsim/Launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import re
import sys
from pathlib import Path
from typing import Union

from utils import VERBOSE, clean_odirs, mk_symlink, rm_path

Expand All @@ -18,6 +19,11 @@ def __init__(self, msg) -> None:
self.msg = msg


class LauncherBusy(Exception):
def __init__(self, msg):
self.msg = msg


class ErrorMessage(
collections.namedtuple(
"ErrorMessage",
Expand Down Expand Up @@ -219,7 +225,7 @@ def launch(self) -> None:
self._pre_launch()
self._do_launch()

def poll(self) -> None:
def poll(self) -> Union[str, None]:
"""Poll the launched job for completion.
Invokes _check_status() and _post_finish() when the job completes.
Expand Down Expand Up @@ -285,10 +291,11 @@ def _find_patterns(patterns, line):
if chk_failed and _find_patterns(self.deploy.fail_patterns, line):
# If failed, then nothing else to do. Just return.
# Provide some extra lines for context.
end = cnt + 5
return "F", ErrorMessage(
line_number=cnt + 1,
message=line.strip(),
context=lines[cnt : cnt + 5],
context=lines[cnt:end],
)

if chk_passed:
Expand Down
103 changes: 56 additions & 47 deletions util/dvsim/LocalLauncher.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
"""Launcher implementation to run jobs as subprocesses on the local machine."""

import datetime
import os
import shlex
import subprocess
from pathlib import Path
from typing import Union

from Launcher import ErrorMessage, Launcher, LauncherError
from Launcher import ErrorMessage, Launcher, LauncherBusy, LauncherError


class LocalLauncher(Launcher):
Expand All @@ -18,7 +21,8 @@ def __init__(self, deploy):
super().__init__(deploy)

# Popen object when launching the job.
self.process = None
self._process = None
self._log_file = None

def _do_launch(self) -> None:
# Update the shell's env vars with self.exports. Values in exports must
Expand All @@ -37,34 +41,37 @@ def _do_launch(self) -> None:
self._dump_env_vars(exports)

if not self.deploy.sim_cfg.interactive:
log_path = Path(self.deploy.get_log_path())
timeout_mins = self.deploy.get_timeout_mins()

self.timeout_secs = timeout_mins * 60 if timeout_mins else None

try:
f = open(
self.deploy.get_log_path(),
self._log_file = log_path.open(
"w",
encoding="UTF-8",
errors="surrogateescape",
)
f.write("[Executing]:\n{}\n\n".format(self.deploy.cmd))
f.flush()
timeout_mins = self.deploy.get_timeout_mins()
if timeout_mins:
self.timeout_secs = timeout_mins * 60
else:
self.timeout_secs = None
self.process = subprocess.Popen(
self._log_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n")
self._log_file.flush()

self._process = subprocess.Popen(
shlex.split(self.deploy.cmd),
bufsize=4096,
universal_newlines=True,
stdout=f,
stderr=f,
stdout=self._log_file,
stderr=self._log_file,
env=exports,
)

except BlockingIOError as e:
raise LauncherBusy(f"Failed to launch job: {e}") from e

except subprocess.SubprocessError as e:
raise LauncherError(
"IO Error: {}\nSee {}".format(e, self.deploy.get_log_path())
)
raise LauncherError(f"IO Error: {e}\nSee {log_path}") from e

finally:
self._close_process()
self._close_job_log_file()
else:
# Interactive: Set RUN_INTERACTIVE to 1
exports["RUN_INTERACTIVE"] = "1"
Expand All @@ -73,7 +80,7 @@ def _do_launch(self) -> None:
# no timeout and blocking op as user controls the flow
print("Interactive mode is not supported yet.")
print(f"Cmd : {self.deploy.cmd}")
self.process = subprocess.Popen(
self._process = subprocess.Popen(
shlex.split(self.deploy.cmd),
stdin=None,
stdout=None,
Expand All @@ -84,12 +91,12 @@ def _do_launch(self) -> None:
)

# Wait until the process exit
self.process.wait()
self._process.wait()

self._link_odir("D")

def poll(self):
"""Check status of the running process
def poll(self) -> Union[str, None]:
"""Check status of the running process.
This returns 'D', 'P', 'F', or 'K'. If 'D', the job is still running.
If 'P', the job finished successfully. If 'F', the job finished with
Expand All @@ -98,20 +105,20 @@ def poll(self):
This function must only be called after running self.dispatch_cmd() and
must not be called again once it has returned 'P' or 'F'.
"""
if self._process is None:
return "E"

assert self.process is not None
elapsed_time = datetime.datetime.now() - self.start_time
self.job_runtime_secs = elapsed_time.total_seconds()
if self.process.poll() is None:
if self._process.poll() is None:
if (
self.timeout_secs and
(self.job_runtime_secs > self.timeout_secs) and not
(self.deploy.gui)
self.timeout_secs
and (self.job_runtime_secs > self.timeout_secs) # noqa: W503
and not (self.deploy.gui) # noqa: W503
):
self._kill()
timeout_message = (
f"Job timed out after {self.deploy.get_timeout_mins()} minutes"
)
timeout_mins = self.deploy.get_timeout_mins()
timeout_message = f"Job timed out after {timeout_mins} minutes"
self._post_finish(
"K",
ErrorMessage(
Expand All @@ -124,44 +131,46 @@ def poll(self):

return "D"

self.exit_code = self.process.returncode
self.exit_code = self._process.returncode
status, err_msg = self._check_status()
self._post_finish(status, err_msg)

return self.status

def _kill(self):
def _kill(self) -> None:
"""Kill the running process.
Try to kill the running process. Send SIGTERM first, wait a bit,
and then send SIGKILL if it didn't work.
"""
assert self.process is not None
self.process.terminate()
if self._process is None:
# process already dead or didn't start
return

self._process.terminate()
try:
self.process.wait(timeout=2)
self._process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.process.kill()
self._process.kill()

def kill(self):
def kill(self) -> None:
"""Kill the running process.
This must be called between dispatching and reaping the process (the
same window as poll()).
"""
self._kill()
self._post_finish(
"K", ErrorMessage(line_number=None, message="Job killed!", context=[])
"K",
ErrorMessage(line_number=None, message="Job killed!", context=[]),
)

def _post_finish(self, status, err_msg):
self._close_process()
self.process = None
def _post_finish(self, status: str, err_msg: Union[ErrorMessage, None]) -> None:
self._close_job_log_file()
self._process = None
super()._post_finish(status, err_msg)

def _close_process(self):
def _close_job_log_file(self) -> None:
"""Close the file descriptors associated with the process."""

assert self.process
if self.process.stdout:
self.process.stdout.close()
if self._log_file:
self._log_file.close()
22 changes: 19 additions & 3 deletions util/dvsim/Scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
from signal import SIGINT, SIGTERM, signal

from Launcher import LauncherError
from Launcher import LauncherBusy, LauncherError
from StatusPrinter import get_status_printer
from Timer import Timer
from utils import VERBOSE
Expand Down Expand Up @@ -376,10 +376,11 @@ def _poll(self, hms):
status = item.launcher.poll()
level = VERBOSE

assert status in ["D", "P", "F", "K"]
assert status in ["D", "P", "F", "E", "K"]
if status == "D":
continue
elif status == "P":

if status == "P":
self._passed[target].add(item)
elif status == "F":
self._failed[target].add(item)
Expand Down Expand Up @@ -481,10 +482,25 @@ def _dispatch(self, hms):
for item in to_dispatch:
try:
item.launcher.launch()

except LauncherError as err:
log.exception(err.msg)
self._kill_item(item)

except LauncherBusy as err:
log.error("Launcher busy: %s", err)

self._queued[target].push(item)

log.log(
VERBOSE,
"[%s]: [%s]: [reqeued]: %s",
hms,
target,
item.full_name,
)
continue

self._running[target].append(item)
self.item_to_status[item] = "D"

Expand Down

0 comments on commit c7d7697

Please sign in to comment.