Skip to content

Commit 0720aa1

Browse files
support for gpu queue
1 parent a17de8e commit 0720aa1

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,10 @@ def update(self, **opts):
821821
"""Update inputs"""
822822
self.inputs.update(**opts)
823823

824+
def is_gpu_node(self):
825+
return ((hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda)
826+
or (hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu))
827+
824828

825829
class JoinNode(Node):
826830
"""Wraps interface objects that join inputs into a list.

nipype/pipeline/plugins/multiproc.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class MultiProcPlugin(DistributedPluginBase):
100100
101101
- non_daemon: boolean flag to execute as non-daemon processes
102102
- n_procs: maximum number of threads to be executed in parallel
103+
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
103104
- memory_gb: maximum memory (in GB) that can be used at once.
104105
- raise_insufficient: raise error if the requested resources for
105106
a node over the maximum `n_procs` and/or `memory_gb`
@@ -130,10 +131,22 @@ def __init__(self, plugin_args=None):
130131
)
131132
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)
132133

134+
# GPU found on system
135+
self.n_gpus_visible = MultiProcPlugin.gpu_count()
136+
# proc per GPU set by user
137+
self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible)
138+
139+
# total no. of processes allowed on all gpus
140+
if self.n_gpu_procs > self.n_gpus_visible:
141+
logger.info(
142+
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % (
143+
self.n_gpu_procs, self.n_gpus_visible))
144+
133145
# Instantiate different thread pools for non-daemon processes
134146
logger.debug(
135-
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
147+
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
136148
self.processors,
149+
self.n_gpu_procs,
137150
self.memory_gb,
138151
self._cwd,
139152
)
@@ -184,9 +197,12 @@ def _prerun_check(self, graph):
184197
"""Check if any node exceeds the available resources"""
185198
tasks_mem_gb = []
186199
tasks_num_th = []
200+
tasks_gpu_th = []
187201
for node in graph.nodes():
188202
tasks_mem_gb.append(node.mem_gb)
189203
tasks_num_th.append(node.n_procs)
204+
if node.is_gpu_node():
205+
tasks_gpu_th.append(node.n_procs)
190206

191207
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
192208
logger.warning(
@@ -203,6 +219,12 @@ def _prerun_check(self, graph):
203219
)
204220
if self.raise_insufficient:
205221
raise RuntimeError("Insufficient resources available for job")
222+
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
223+
logger.warning(
224+
'Nodes demand more GPU than allowed (%d).',
225+
self.n_gpu_procs)
226+
if self.raise_insufficient:
227+
raise RuntimeError('Insufficient GPU resources available for job')
206228

207229
def _postrun_check(self):
208230
self.pool.shutdown()
@@ -213,11 +235,14 @@ def _check_resources(self, running_tasks):
213235
"""
214236
free_memory_gb = self.memory_gb
215237
free_processors = self.processors
238+
free_gpu_slots = self.n_gpu_procs
216239
for _, jobid in running_tasks:
217240
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
218241
free_processors -= min(self.procs[jobid].n_procs, free_processors)
242+
if self.procs[jobid].is_gpu_node():
243+
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)
219244

220-
return free_memory_gb, free_processors
245+
return free_memory_gb, free_processors, free_gpu_slots
221246

222247
def _send_procs_to_workers(self, updatehash=False, graph=None):
223248
"""
@@ -232,7 +257,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232257
)
233258

234259
# Check available resources by summing all threads and memory used
235-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
260+
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks)
236261

237262
stats = (
238263
len(self.pending_tasks),
@@ -241,6 +266,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
241266
self.memory_gb,
242267
free_processors,
243268
self.processors,
269+
free_gpu_slots,
270+
self.n_gpu_procs
244271
)
245272
if self._stats != stats:
246273
tasks_list_msg = ""
@@ -256,13 +283,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
256283
tasks_list_msg = indent(tasks_list_msg, " " * 21)
257284
logger.info(
258285
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
259-
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
286+
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
260287
len(self.pending_tasks),
261288
len(jobids),
262289
free_memory_gb,
263290
self.memory_gb,
264291
free_processors,
265292
self.processors,
293+
free_gpu_slots,
294+
self.n_gpu_procs,
266295
tasks_list_msg,
267296
)
268297
self._stats = stats
@@ -304,28 +333,36 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
304333
# Check requirements of this job
305334
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
306335
next_job_th = min(self.procs[jobid].n_procs, self.processors)
336+
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)
337+
338+
is_gpu_node = self.procs[jobid].is_gpu_node()
307339

308340
# If node does not fit, skip at this moment
309-
if next_job_th > free_processors or next_job_gb > free_memory_gb:
341+
if (next_job_th > free_processors or next_job_gb > free_memory_gb
342+
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)):
310343
logger.debug(
311-
"Cannot allocate job %d (%0.2fGB, %d threads).",
344+
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
312345
jobid,
313346
next_job_gb,
314347
next_job_th,
348+
next_job_gpu_th,
315349
)
316350
continue
317351

318352
free_memory_gb -= next_job_gb
319353
free_processors -= next_job_th
354+
if is_gpu_node:
355+
free_gpu_slots -= next_job_gpu_th
320356
logger.debug(
321357
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
322-
"%0.2fGB, %d threads.",
358+
"%0.2fGB, %d threads, %d GPU slots.",
323359
self.procs[jobid].fullname,
324360
jobid,
325361
next_job_gb,
326362
next_job_th,
327363
free_memory_gb,
328364
free_processors,
365+
free_gpu_slots,
329366
)
330367

331368
# change job status in appropriate queues
@@ -352,6 +389,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
352389
self._remove_node_dirs()
353390
free_memory_gb += next_job_gb
354391
free_processors += next_job_th
392+
if is_gpu_node:
393+
free_gpu_slots -= next_job_gpu_th
355394
# Display stats next loop
356395
self._stats = None
357396

@@ -379,3 +418,12 @@ def _sort_jobs(self, jobids, scheduler="tsort"):
379418
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
380419
)
381420
return jobids
421+
422+
@staticmethod
423+
def gpu_count():
424+
n_gpus = 1
425+
try:
426+
import GPUtil
427+
return len(GPUtil.getGPUs())
428+
except ImportError:
429+
return n_gpus

nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
5656
class InputSpecSingleNode(nib.TraitedSpec):
5757
input1 = nib.traits.Int(desc="a random int")
5858
input2 = nib.traits.Int(desc="a random int")
59+
use_gpu = nib.traits.Bool(False, mandatory = False, desc="boolean for GPU nodes")
5960

6061

6162
class OutputSpecSingleNode(nib.TraitedSpec):
@@ -116,6 +117,20 @@ def test_no_more_threads_than_specified(tmpdir):
116117
with pytest.raises(RuntimeError):
117118
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})
118119

120+
def test_no_more_gpu_threads_than_specified(tmpdir):
121+
tmpdir.chdir()
122+
123+
pipe = pe.Workflow(name="pipe")
124+
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
125+
n1.inputs.use_gpu = True
126+
n1.inputs.input1 = 4
127+
pipe.add_nodes([n1])
128+
129+
max_threads = 2
130+
max_gpu = 1
131+
with pytest.raises(RuntimeError):
132+
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu})
133+
119134

120135
@pytest.mark.skipif(
121136
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"

0 commit comments

Comments
 (0)