1
1
import asyncio
2
- from contextlib import contextmanager
3
- import tempfile
2
+ import logging
4
3
import os
5
4
import signal
6
- from typing import List , Optional , Iterable
7
- import logging
5
+ import tempfile
6
+ from contextlib import contextmanager
7
+ from typing import Iterable , List , Optional , Union
8
8
9
9
import distributed
10
10
from distributed .diagnostics import SchedulerPlugin
11
11
12
+ from .prctl import allow_ptrace
13
+
12
14
logger = logging .getLogger (__name__ )
13
15
14
16
@@ -46,6 +48,7 @@ def __init__(
46
48
self .pyspy_args .extend (extra_pyspy_args )
47
49
self .proc = None
48
50
self ._tempfile = None
51
+ self ._run_failed_msg = None
49
52
50
53
def __repr__ (self ) -> str :
51
54
return f"<{ type (self ).__name__ } { self .pyspy_args } >"
@@ -69,6 +72,17 @@ async def start(self, scheduler):
69
72
scheduler .handlers [self ._HANDLER_NAME ] = self ._get_py_spy_profile
70
73
71
74
pid = os .getpid ()
75
+
76
+ try :
77
+ # Allow subprocesses of this process to ptrace it.
78
+ # Since we'll start py-spy as a subprocess, it will be below the current PID
79
+ # in the process tree, and therefore allowed to trace its parent.
80
+ allow_ptrace (pid )
81
+ except OSError as e :
82
+ self ._run_failed_msg = str (e )
83
+ else :
84
+ self ._run_failed_msg = None
85
+
72
86
self .proc = await asyncio .create_subprocess_exec (
73
87
"py-spy" ,
74
88
"record" ,
@@ -88,9 +102,10 @@ async def _stop(self) -> Optional[int]:
88
102
try :
89
103
self .proc .send_signal (signal .SIGINT )
90
104
except ProcessLookupError :
91
- logger .warning (
92
- f"py-spy subprocess { self .proc .pid } already terminated (it probably never ran?)."
93
- )
105
+ msg = f"py-spy subprocess { self .proc .pid } already terminated (it probably never ran?)."
106
+ if self ._run_failed_msg :
107
+ msg += "\n NOTE: " + self ._run_failed_msg
108
+ logger .warning (msg )
94
109
95
110
stdout , stderr = await self .proc .communicate () # TODO timeout
96
111
retcode = self .proc .returncode
@@ -147,7 +162,7 @@ def start_pyspy_on_scheduler(
147
162
"""
148
163
client = client or distributed .worker .get_client ()
149
164
150
- async def _inject (dask_scheduler : distributed .Scheduler ):
165
+ async def _inject_pyspy (dask_scheduler : distributed .Scheduler ):
151
166
plugin = PySpyScheduler (
152
167
output = output ,
153
168
format = format ,
@@ -164,11 +179,11 @@ async def _inject(dask_scheduler: distributed.Scheduler):
164
179
await plugin .start (dask_scheduler )
165
180
dask_scheduler .add_plugin (plugin )
166
181
167
- client .run_on_scheduler (_inject )
182
+ client .run_on_scheduler (_inject_pyspy )
168
183
169
184
170
185
def get_profile_from_scheduler (
171
- path : str , client : Optional [distributed .Client ] = None
186
+ path : Union [ str , os . PathLike ] , client : Optional [distributed .Client ] = None
172
187
) -> None :
173
188
"""
174
189
Stop the current `PySpyScheduler` plugin, send back its profile data, and write it to ``path``.
@@ -188,7 +203,7 @@ async def _get_profile():
188
203
189
204
@contextmanager
190
205
def pyspy_on_scheduler (
191
- output : str ,
206
+ output : Union [ str , os . PathLike ] ,
192
207
format : str = "speedscope" ,
193
208
rate : int = 100 ,
194
209
subprocesses : bool = True ,
0 commit comments