Skip to content

Commit 44551d3

Browse files
authored
Merge pull request #3642 from mauriliogenovese/enh/cuda_support
support for gpu queue
2 parents 0c67bb3 + 59862c8 commit 44551d3

File tree

5 files changed

+87
-7
lines changed

5 files changed

+87
-7
lines changed

nipype/info.py

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def get_nipype_gitversion():
149149
"acres",
150150
"etelemetry>=0.3.1",
151151
"looseversion!=1.2",
152+
"gputil>=1.4.0",
152153
"puremagic",
153154
]
154155

nipype/pipeline/engine/nodes.py

+5
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,11 @@ def update(self, **opts):
820820
"""Update inputs"""
821821
self.inputs.update(**opts)
822822

823+
def is_gpu_node(self):
824+
return bool(getattr(self.inputs, 'use_cuda', False)) or bool(
825+
getattr(self.inputs, 'use_gpu', False)
826+
)
827+
823828

824829
class JoinNode(Node):
825830
"""Wraps interface objects that join inputs into a list.

nipype/pipeline/plugins/multiproc.py

+52-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ...utils.profiler import get_system_total_memory_gb
2222
from ..engine import MapNode
2323
from .base import DistributedPluginBase
24+
from .tools import gpu_count
2425

2526
try:
2627
from textwrap import indent
@@ -100,6 +101,7 @@ class MultiProcPlugin(DistributedPluginBase):
100101
101102
- non_daemon: boolean flag to execute as non-daemon processes
102103
- n_procs: maximum number of threads to be executed in parallel
104+
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
103105
- memory_gb: maximum memory (in GB) that can be used at once.
104106
- raise_insufficient: raise error if the requested resources for
105107
a node over the maximum `n_procs` and/or `memory_gb`
@@ -130,10 +132,24 @@ def __init__(self, plugin_args=None):
130132
)
131133
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)
132134

135+
# GPU found on system
136+
self.n_gpus_visible = gpu_count()
137+
# proc per GPU set by user
138+
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)
139+
140+
# total no. of processes allowed on all gpus
141+
if self.n_gpu_procs > self.n_gpus_visible:
142+
logger.info(
143+
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!',
144+
self.n_gpu_procs,
145+
self.n_gpus_visible,
146+
)
147+
133148
# Instantiate different thread pools for non-daemon processes
134149
logger.debug(
135-
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
150+
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
136151
self.processors,
152+
self.n_gpu_procs,
137153
self.memory_gb,
138154
self._cwd,
139155
)
@@ -184,9 +200,12 @@ def _prerun_check(self, graph):
184200
"""Check if any node exceeds the available resources"""
185201
tasks_mem_gb = []
186202
tasks_num_th = []
203+
tasks_gpu_th = []
187204
for node in graph.nodes():
188205
tasks_mem_gb.append(node.mem_gb)
189206
tasks_num_th.append(node.n_procs)
207+
if node.is_gpu_node():
208+
tasks_gpu_th.append(node.n_procs)
190209

191210
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
192211
logger.warning(
@@ -203,6 +222,10 @@ def _prerun_check(self, graph):
203222
)
204223
if self.raise_insufficient:
205224
raise RuntimeError("Insufficient resources available for job")
225+
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
226+
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
227+
if self.raise_insufficient:
228+
raise RuntimeError('Insufficient GPU resources available for job')
206229

207230
def _postrun_check(self):
208231
self.pool.shutdown()
@@ -213,11 +236,14 @@ def _check_resources(self, running_tasks):
213236
"""
214237
free_memory_gb = self.memory_gb
215238
free_processors = self.processors
239+
free_gpu_slots = self.n_gpu_procs
216240
for _, jobid in running_tasks:
217241
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
218242
free_processors -= min(self.procs[jobid].n_procs, free_processors)
243+
if self.procs[jobid].is_gpu_node():
244+
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)
219245

220-
return free_memory_gb, free_processors
246+
return free_memory_gb, free_processors, free_gpu_slots
221247

222248
def _send_procs_to_workers(self, updatehash=False, graph=None):
223249
"""
@@ -232,7 +258,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232258
)
233259

234260
# Check available resources by summing all threads and memory used
235-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
261+
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
262+
self.pending_tasks
263+
)
236264

237265
stats = (
238266
len(self.pending_tasks),
@@ -241,6 +269,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
241269
self.memory_gb,
242270
free_processors,
243271
self.processors,
272+
free_gpu_slots,
273+
self.n_gpu_procs,
244274
)
245275
if self._stats != stats:
246276
tasks_list_msg = ""
@@ -256,13 +286,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
256286
tasks_list_msg = indent(tasks_list_msg, " " * 21)
257287
logger.info(
258288
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
259-
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
289+
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
260290
len(self.pending_tasks),
261291
len(jobids),
262292
free_memory_gb,
263293
self.memory_gb,
264294
free_processors,
265295
self.processors,
296+
free_gpu_slots,
297+
self.n_gpu_procs,
266298
tasks_list_msg,
267299
)
268300
self._stats = stats
@@ -304,28 +336,39 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
304336
# Check requirements of this job
305337
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
306338
next_job_th = min(self.procs[jobid].n_procs, self.processors)
339+
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)
340+
341+
is_gpu_node = self.procs[jobid].is_gpu_node()
307342

308343
# If node does not fit, skip at this moment
309-
if next_job_th > free_processors or next_job_gb > free_memory_gb:
344+
if (
345+
next_job_th > free_processors
346+
or next_job_gb > free_memory_gb
347+
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
348+
):
310349
logger.debug(
311-
"Cannot allocate job %d (%0.2fGB, %d threads).",
350+
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
312351
jobid,
313352
next_job_gb,
314353
next_job_th,
354+
next_job_gpu_th,
315355
)
316356
continue
317357

318358
free_memory_gb -= next_job_gb
319359
free_processors -= next_job_th
360+
if is_gpu_node:
361+
free_gpu_slots -= next_job_gpu_th
320362
logger.debug(
321363
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
322-
"%0.2fGB, %d threads.",
364+
"%0.2fGB, %d threads, %d GPU slots.",
323365
self.procs[jobid].fullname,
324366
jobid,
325367
next_job_gb,
326368
next_job_th,
327369
free_memory_gb,
328370
free_processors,
371+
free_gpu_slots,
329372
)
330373

331374
# change job status in appropriate queues
@@ -355,6 +398,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
355398
self._remove_node_dirs()
356399
free_memory_gb += next_job_gb
357400
free_processors += next_job_th
401+
if is_gpu_node:
402+
free_gpu_slots += next_job_gpu_th
358403
# Display stats next loop
359404
self._stats = None
360405

nipype/pipeline/plugins/tests/test_multiproc.py

+19
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
5656
class InputSpecSingleNode(nib.TraitedSpec):
5757
input1 = nib.traits.Int(desc="a random int")
5858
input2 = nib.traits.Int(desc="a random int")
59+
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")
5960

6061

6162
class OutputSpecSingleNode(nib.TraitedSpec):
@@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir):
117118
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})
118119

119120

121+
def test_no_more_gpu_threads_than_specified(tmpdir):
122+
tmpdir.chdir()
123+
124+
pipe = pe.Workflow(name="pipe")
125+
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
126+
n1.inputs.use_gpu = True
127+
n1.inputs.input1 = 4
128+
pipe.add_nodes([n1])
129+
130+
max_threads = 2
131+
max_gpu = 1
132+
with pytest.raises(RuntimeError):
133+
pipe.run(
134+
plugin="MultiProc",
135+
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
136+
)
137+
138+
120139
@pytest.mark.skipif(
121140
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
122141
)

nipype/pipeline/plugins/tools.py

+10
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,13 @@ def create_pyscript(node, updatehash=False, store_exception=True):
175175
with open(pyscript, "w") as fp:
176176
fp.writelines(cmdstr)
177177
return pyscript
178+
179+
180+
def gpu_count():
181+
n_gpus = 1
182+
try:
183+
import GPUtil
184+
except ImportError:
185+
return 1
186+
else:
187+
return len(GPUtil.getGPUs())

0 commit comments

Comments
 (0)