|
| 1 | +import asyncio |
| 2 | +from contextlib import contextmanager |
| 3 | +import tempfile |
| 4 | +import os |
| 5 | +import signal |
| 6 | +from typing import List, Optional, Iterable |
| 7 | +import logging |
| 8 | + |
| 9 | +import distributed |
| 10 | +from distributed.diagnostics import SchedulerPlugin |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +class PySpyScheduler(SchedulerPlugin): |
| 16 | + _HANDLER_NAME = "get_py_spy_profile" |
| 17 | + |
| 18 | + def __init__( |
| 19 | + self, |
| 20 | + output: Optional[str] = None, |
| 21 | + format: str = "speedscope", |
| 22 | + rate: int = 100, |
| 23 | + subprocesses: bool = True, |
| 24 | + function: bool = False, |
| 25 | + gil: bool = False, |
| 26 | + threads: bool = False, |
| 27 | + idle: bool = True, |
| 28 | + nonblocking: bool = False, |
| 29 | + native: bool = False, |
| 30 | + extra_pyspy_args: Iterable[str] = (), |
| 31 | + ) -> None: |
| 32 | + self.output = output |
| 33 | + self.pyspy_args: List[str] = ["--format", format, "--rate", str(rate)] + [ |
| 34 | + flag |
| 35 | + for flag, active in { |
| 36 | + "--subprocesses": subprocesses, |
| 37 | + "--function": function, |
| 38 | + "--gil": gil, |
| 39 | + "--threads": threads, |
| 40 | + "--idle": idle, |
| 41 | + "--nonblocking": nonblocking, |
| 42 | + "--native": native, |
| 43 | + }.items() |
| 44 | + if active |
| 45 | + ] |
| 46 | + self.pyspy_args.extend(extra_pyspy_args) |
| 47 | + self.proc = None |
| 48 | + self._tempfile = None |
| 49 | + |
| 50 | + def __repr__(self) -> str: |
| 51 | + return f"<{type(self).__name__} {self.pyspy_args}>" |
| 52 | + |
| 53 | + async def start(self, scheduler): |
| 54 | + if self.output is None: |
| 55 | + self._tempfile = tempfile.NamedTemporaryFile(suffix="pyspy.json") |
| 56 | + self.output = self._tempfile.name |
| 57 | + |
| 58 | + # HACK: inject a `get_py_spy_profile` handler into the scheduler, |
| 59 | + # so we can retrieve the data more easily. Until we can stream back files, |
| 60 | + # there's probably not any advantage to this over an async |
| 61 | + # `run_on_scheduler` to retrieve the data. |
| 62 | + self.scheduler = scheduler |
| 63 | + if self._HANDLER_NAME in scheduler.handlers: |
| 64 | + raise RuntimeError( |
| 65 | + f"A py-spy plugin is already registered: " |
| 66 | + f"{scheduler.handlers[self._HANDLER_NAME]} vs {self._get_py_spy_profile}!" |
| 67 | + ) |
| 68 | + else: |
| 69 | + scheduler.handlers[self._HANDLER_NAME] = self._get_py_spy_profile |
| 70 | + |
| 71 | + pid = os.getpid() |
| 72 | + self.proc = await asyncio.create_subprocess_exec( |
| 73 | + "py-spy", |
| 74 | + "record", |
| 75 | + "--pid", |
| 76 | + str(pid), |
| 77 | + "--output", |
| 78 | + self.output, |
| 79 | + *self.pyspy_args, |
| 80 | + stdout=asyncio.subprocess.PIPE, |
| 81 | + stderr=asyncio.subprocess.PIPE, |
| 82 | + ) |
| 83 | + |
| 84 | + async def _stop(self) -> Optional[int]: |
| 85 | + if self.proc is None: |
| 86 | + return None |
| 87 | + |
| 88 | + try: |
| 89 | + self.proc.send_signal(signal.SIGINT) |
| 90 | + except ProcessLookupError: |
| 91 | + logger.warning( |
| 92 | + f"py-spy subprocess {self.proc.pid} already terminated (it probably never ran?)." |
| 93 | + ) |
| 94 | + |
| 95 | + stdout, stderr = await self.proc.communicate() # TODO timeout |
| 96 | + retcode = self.proc.returncode |
| 97 | + if retcode != 0: |
| 98 | + logging.warn(f"py-spy exited with code {retcode}") |
| 99 | + logging.warn(f"py-spy stderr:\n{stderr.decode()}") |
| 100 | + logging.warn(f"py-spy stdout:\n{stdout.decode()}") |
| 101 | + |
| 102 | + self.proc = None |
| 103 | + # Remove our injected handler |
| 104 | + del self.scheduler.handlers[self._HANDLER_NAME] |
| 105 | + # TODO should we remove the plugin as well? |
| 106 | + # At this point, there's not much reason to be using a plugin... |
| 107 | + return retcode |
| 108 | + |
| 109 | + def _maybe_close_tempfile(self): |
| 110 | + if self._tempfile is not None: |
| 111 | + self._tempfile.close() |
| 112 | + self._tempfile = None |
| 113 | + |
| 114 | + # This handler gets injected into the scheduler |
| 115 | + async def _get_py_spy_profile(self, comm=None) -> Optional[bytes]: |
| 116 | + retcode = await self._stop() |
| 117 | + if retcode == 0: |
| 118 | + with open(self.output, "rb") as f: |
| 119 | + data = f.read() # TODO streaming! |
| 120 | + else: |
| 121 | + data = None |
| 122 | + |
| 123 | + self._maybe_close_tempfile() |
| 124 | + return data |
| 125 | + |
| 126 | + async def close(self): |
| 127 | + await self._stop() |
| 128 | + self._maybe_close_tempfile() |
| 129 | + |
| 130 | + |
| 131 | +def start_pyspy_on_scheduler( |
| 132 | + output: Optional[str] = None, |
| 133 | + format: str = "speedscope", |
| 134 | + rate: int = 100, |
| 135 | + subprocesses: bool = True, |
| 136 | + function: bool = False, |
| 137 | + gil: bool = False, |
| 138 | + threads: bool = False, |
| 139 | + idle: bool = True, |
| 140 | + nonblocking: bool = False, |
| 141 | + native: bool = False, |
| 142 | + extra_pyspy_args: Iterable[str] = (), |
| 143 | + client: Optional[distributed.Client] = None, |
| 144 | +) -> None: |
| 145 | + """ |
| 146 | + Add a `PySpyScheduler` plugin to the Scheduler, and start it. |
| 147 | + """ |
| 148 | + client = client or distributed.worker.get_client() |
| 149 | + |
| 150 | + async def _inject(dask_scheduler: distributed.Scheduler): |
| 151 | + plugin = PySpyScheduler( |
| 152 | + output=output, |
| 153 | + format=format, |
| 154 | + rate=rate, |
| 155 | + subprocesses=subprocesses, |
| 156 | + function=function, |
| 157 | + gil=gil, |
| 158 | + threads=threads, |
| 159 | + idle=idle, |
| 160 | + nonblocking=nonblocking, |
| 161 | + native=native, |
| 162 | + extra_pyspy_args=extra_pyspy_args, |
| 163 | + ) |
| 164 | + await plugin.start(dask_scheduler) |
| 165 | + dask_scheduler.add_plugin(plugin) |
| 166 | + |
| 167 | + client.run_on_scheduler(_inject) |
| 168 | + |
| 169 | + |
| 170 | +def get_profile_from_scheduler( |
| 171 | + path: str, client: Optional[distributed.Client] = None |
| 172 | +) -> None: |
| 173 | + """ |
| 174 | + Stop the current `PySpyScheduler` plugin, send back its profile data, and write it to ``path``. |
| 175 | + """ |
| 176 | + client = client or distributed.worker.get_client() |
| 177 | + |
| 178 | + async def _get_profile(): |
| 179 | + return await getattr(client.scheduler, PySpyScheduler._HANDLER_NAME)() |
| 180 | + |
| 181 | + data = client.sync(_get_profile) |
| 182 | + if data: |
| 183 | + with open(path, "wb") as f: |
| 184 | + f.write(data) |
| 185 | + else: |
| 186 | + logger.warning("No data from py-spy profile!") |
| 187 | + |
| 188 | + |
| 189 | +@contextmanager |
| 190 | +def pyspy_on_scheduler( |
| 191 | + output: str, |
| 192 | + format: str = "speedscope", |
| 193 | + rate: int = 100, |
| 194 | + subprocesses: bool = True, |
| 195 | + function: bool = False, |
| 196 | + gil: bool = False, |
| 197 | + threads: bool = False, |
| 198 | + idle: bool = True, |
| 199 | + nonblocking: bool = False, |
| 200 | + native: bool = False, |
| 201 | + extra_pyspy_args: Iterable[str] = (), |
| 202 | + client: Optional[distributed.Client] = None, |
| 203 | +): |
| 204 | + """ |
| 205 | + Spy on the Scheduler with py-spy. |
| 206 | +
|
| 207 | + Use as a context manager (similar to `distributed.performance_report`) to record a py-spy |
| 208 | + profile of the scheduler. |
| 209 | +
|
| 210 | + When the context manager exits, the profile is sent back to the client and saved to |
| 211 | + the ``output`` path. |
| 212 | +
|
| 213 | + Parameters |
| 214 | + ---------- |
| 215 | + output: |
| 216 | + *Local* path to save the profile to, once it's sent back from the scheduler. |
| 217 | + format: |
| 218 | + Output file format [default: flamegraph] [possible values: flamegraph, raw, speedscope] |
| 219 | + rate: |
| 220 | + The number of samples to collect per second [default: 100] |
| 221 | + subprocesses: |
| 222 | + Profile subprocesses of the original process |
| 223 | + function: |
| 224 | + Aggregate samples by function name instead of by line number |
| 225 | + gil: |
| 226 | + Only include traces that are holding on to the GIL |
| 227 | + threads: |
| 228 | + Show thread ids in the output |
| 229 | + idle: |
| 230 | + Include stack traces for idle threads |
| 231 | + nonblocking: |
| 232 | + Don't pause the python process when collecting samples. Setting this option |
| 233 | + will reduce the perfomance impact of sampling, but may lead to inaccurate results |
| 234 | + native: |
| 235 | + Collect stack traces from native extensions written in Cython, C or C++ |
| 236 | + extra_pyspy_args: |
| 237 | + Iterable of any extra arguments to pass to ``py-spy``. |
| 238 | + client: |
| 239 | + The distributed Client to use. If None (default), the default client is used. |
| 240 | + """ |
| 241 | + client = client or distributed.worker.get_client() |
| 242 | + |
| 243 | + start_pyspy_on_scheduler( |
| 244 | + output=None, |
| 245 | + format=format, |
| 246 | + rate=rate, |
| 247 | + subprocesses=subprocesses, |
| 248 | + function=function, |
| 249 | + gil=gil, |
| 250 | + threads=threads, |
| 251 | + idle=idle, |
| 252 | + nonblocking=nonblocking, |
| 253 | + native=native, |
| 254 | + extra_pyspy_args=extra_pyspy_args, |
| 255 | + client=client, |
| 256 | + ) |
| 257 | + try: |
| 258 | + yield |
| 259 | + finally: |
| 260 | + get_profile_from_scheduler(output, client=client) |
0 commit comments