diff --git a/nipype/info.py b/nipype/info.py index 729689ae5d..072a0330ef 100644 --- a/nipype/info.py +++ b/nipype/info.py @@ -149,6 +149,7 @@ def get_nipype_gitversion(): "acres", "etelemetry>=0.3.1", "looseversion!=1.2", + "gputil>=1.4.0", "puremagic", ] diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index eb524a6a6f..e29b56718b 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -820,6 +820,11 @@ def update(self, **opts): """Update inputs""" self.inputs.update(**opts) + def is_gpu_node(self): + return bool(getattr(self.inputs, 'use_cuda', False)) or bool( + getattr(self.inputs, 'use_gpu', False) + ) + class JoinNode(Node): """Wraps interface objects that join inputs into a list. diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 086ee4430c..b403749ff9 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -21,6 +21,7 @@ from ...utils.profiler import get_system_total_memory_gb from ..engine import MapNode from .base import DistributedPluginBase +from .tools import gpu_count try: from textwrap import indent @@ -100,6 +101,7 @@ class MultiProcPlugin(DistributedPluginBase): - non_daemon: boolean flag to execute as non-daemon processes - n_procs: maximum number of threads to be executed in parallel + - n_gpu_procs: maximum number of GPU threads to be executed in parallel - memory_gb: maximum memory (in GB) that can be used at once. - raise_insufficient: raise error if the requested resources for a node over the maximum `n_procs` and/or `memory_gb` @@ -130,10 +132,24 @@ def __init__(self, plugin_args=None): ) self.raise_insufficient = self.plugin_args.get("raise_insufficient", True) + # GPU found on system + self.n_gpus_visible = gpu_count() + # proc per GPU set by user + self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible) + + # total no. of processes allowed on all gpus + if self.n_gpu_procs > self.n_gpus_visible: + logger.info( + 'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!', + self.n_gpu_procs, + self.n_gpus_visible, + ) + # Instantiate different thread pools for non-daemon processes logger.debug( - "[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)", + "[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)", self.processors, + self.n_gpu_procs, self.memory_gb, self._cwd, ) @@ -184,9 +200,12 @@ def _prerun_check(self, graph): """Check if any node exceeds the available resources""" tasks_mem_gb = [] tasks_num_th = [] + tasks_gpu_th = [] for node in graph.nodes(): tasks_mem_gb.append(node.mem_gb) tasks_num_th.append(node.n_procs) + if node.is_gpu_node(): + tasks_gpu_th.append(node.n_procs) if np.any(np.array(tasks_mem_gb) > self.memory_gb): logger.warning( @@ -203,6 +222,10 @@ def _prerun_check(self, graph): ) if self.raise_insufficient: raise RuntimeError("Insufficient resources available for job") + if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs): + logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs) + if self.raise_insufficient: + raise RuntimeError('Insufficient GPU resources available for job') def _postrun_check(self): self.pool.shutdown() @@ -213,11 +236,14 @@ def _check_resources(self, running_tasks): """ free_memory_gb = self.memory_gb free_processors = self.processors + free_gpu_slots = self.n_gpu_procs for _, jobid in running_tasks: free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) free_processors -= min(self.procs[jobid].n_procs, free_processors) + if self.procs[jobid].is_gpu_node(): + free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots) - return free_memory_gb, free_processors + return free_memory_gb, free_processors, free_gpu_slots def _send_procs_to_workers(self, updatehash=False, graph=None): """ @@ -232,7 +258,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): ) # Check available resources by summing all threads and memory used - free_memory_gb, free_processors = self._check_resources(self.pending_tasks) + free_memory_gb, free_processors, free_gpu_slots = self._check_resources( + self.pending_tasks + ) stats = ( len(self.pending_tasks), @@ -241,6 +269,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.memory_gb, free_processors, self.processors, + free_gpu_slots, + self.n_gpu_procs, ) if self._stats != stats: tasks_list_msg = "" @@ -256,13 +286,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): tasks_list_msg = indent(tasks_list_msg, " " * 21) logger.info( "[MultiProc] Running %d tasks, and %d jobs ready. Free " - "memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s", + "memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s", len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, free_processors, self.processors, + free_gpu_slots, + self.n_gpu_procs, tasks_list_msg, ) self._stats = stats @@ -304,28 +336,39 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): # Check requirements of this job next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) next_job_th = min(self.procs[jobid].n_procs, self.processors) + next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs) + + is_gpu_node = self.procs[jobid].is_gpu_node() # If node does not fit, skip at this moment - if next_job_th > free_processors or next_job_gb > free_memory_gb: + if ( + next_job_th > free_processors + or next_job_gb > free_memory_gb + or (is_gpu_node and next_job_gpu_th > free_gpu_slots) + ): logger.debug( - "Cannot allocate job %d (%0.2fGB, %d threads).", + "Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).", jobid, next_job_gb, next_job_th, + next_job_gpu_th, ) continue free_memory_gb -= next_job_gb free_processors -= next_job_th + if is_gpu_node: + free_gpu_slots -= next_job_gpu_th logger.debug( "Allocating %s ID=%d (%0.2fGB, %d threads). Free: " - "%0.2fGB, %d threads.", + "%0.2fGB, %d threads, %d GPU slots.", self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, free_memory_gb, free_processors, + free_gpu_slots, ) # change job status in appropriate queues @@ -355,6 +398,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self._remove_node_dirs() free_memory_gb += next_job_gb free_processors += next_job_th + if is_gpu_node: + free_gpu_slots += next_job_gpu_th # Display stats next loop self._stats = None diff --git a/nipype/pipeline/plugins/tests/test_multiproc.py b/nipype/pipeline/plugins/tests/test_multiproc.py index 938e1aab9e..484c0d07bc 100644 --- a/nipype/pipeline/plugins/tests/test_multiproc.py +++ b/nipype/pipeline/plugins/tests/test_multiproc.py @@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir): class InputSpecSingleNode(nib.TraitedSpec): input1 = nib.traits.Int(desc="a random int") input2 = nib.traits.Int(desc="a random int") + use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes") class OutputSpecSingleNode(nib.TraitedSpec): @@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir): pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads}) +def test_no_more_gpu_threads_than_specified(tmpdir): + tmpdir.chdir() + + pipe = pe.Workflow(name="pipe") + n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2) + n1.inputs.use_gpu = True + n1.inputs.input1 = 4 + pipe.add_nodes([n1]) + + max_threads = 2 + max_gpu = 1 + with pytest.raises(RuntimeError): + pipe.run( + plugin="MultiProc", + plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu}, + ) + + @pytest.mark.skipif( sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8" ) diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 7e066b0ea3..37c841a208 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -175,3 +175,13 @@ def create_pyscript(node, updatehash=False, store_exception=True): with open(pyscript, "w") as fp: fp.writelines(cmdstr) return pyscript + + +def gpu_count(): + n_gpus = 1 + try: + import GPUtil + except ImportError: + return 1 + else: + return len(GPUtil.getGPUs())