Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for gpu queue #3642

Merged
merged 14 commits into from
Feb 21, 2025
1 change: 1 addition & 0 deletions nipype/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def get_nipype_gitversion():
"acres",
"etelemetry>=0.3.1",
"looseversion!=1.2",
"gputil>=1.4.0",
"puremagic",
]

Expand Down
5 changes: 5 additions & 0 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ def update(self, **opts):
"""Update inputs"""
self.inputs.update(**opts)

def is_gpu_node(self):
return bool(getattr(self.inputs, 'use_cuda', False)) or bool(
getattr(self.inputs, 'use_gpu', False)
)


class JoinNode(Node):
"""Wraps interface objects that join inputs into a list.
Expand Down
59 changes: 52 additions & 7 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ...utils.profiler import get_system_total_memory_gb
from ..engine import MapNode
from .base import DistributedPluginBase
from .tools import gpu_count

try:
from textwrap import indent
Expand Down Expand Up @@ -100,6 +101,7 @@

- non_daemon: boolean flag to execute as non-daemon processes
- n_procs: maximum number of threads to be executed in parallel
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
- memory_gb: maximum memory (in GB) that can be used at once.
- raise_insufficient: raise error if the requested resources for
a node over the maximum `n_procs` and/or `memory_gb`
Expand Down Expand Up @@ -130,10 +132,24 @@
)
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)

# GPU found on system
self.n_gpus_visible = gpu_count()
# proc per GPU set by user
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)

# total no. of processes allowed on all gpus
if self.n_gpu_procs > self.n_gpus_visible:
logger.info(
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!',
self.n_gpu_procs,
self.n_gpus_visible,
)

# Instantiate different thread pools for non-daemon processes
logger.debug(
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
self.processors,
self.n_gpu_procs,
self.memory_gb,
self._cwd,
)
Expand Down Expand Up @@ -184,9 +200,12 @@
"""Check if any node exceeds the available resources"""
tasks_mem_gb = []
tasks_num_th = []
tasks_gpu_th = []
for node in graph.nodes():
tasks_mem_gb.append(node.mem_gb)
tasks_num_th.append(node.n_procs)
if node.is_gpu_node():
tasks_gpu_th.append(node.n_procs)

if np.any(np.array(tasks_mem_gb) > self.memory_gb):
logger.warning(
Expand All @@ -203,6 +222,10 @@
)
if self.raise_insufficient:
raise RuntimeError("Insufficient resources available for job")
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
if self.raise_insufficient:
raise RuntimeError('Insufficient GPU resources available for job')

def _postrun_check(self):
self.pool.shutdown()
Expand All @@ -213,11 +236,14 @@
"""
free_memory_gb = self.memory_gb
free_processors = self.processors
free_gpu_slots = self.n_gpu_procs
for _, jobid in running_tasks:
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
free_processors -= min(self.procs[jobid].n_procs, free_processors)
if self.procs[jobid].is_gpu_node():
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)

Check warning on line 244 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L244

Added line #L244 was not covered by tests

return free_memory_gb, free_processors
return free_memory_gb, free_processors, free_gpu_slots

def _send_procs_to_workers(self, updatehash=False, graph=None):
"""
Expand All @@ -232,7 +258,9 @@
)

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
self.pending_tasks
)

stats = (
len(self.pending_tasks),
Expand All @@ -241,6 +269,8 @@
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
)
if self._stats != stats:
tasks_list_msg = ""
Expand All @@ -256,13 +286,15 @@
tasks_list_msg = indent(tasks_list_msg, " " * 21)
logger.info(
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
len(self.pending_tasks),
len(jobids),
free_memory_gb,
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
tasks_list_msg,
)
self._stats = stats
Expand Down Expand Up @@ -304,28 +336,39 @@
# Check requirements of this job
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)

is_gpu_node = self.procs[jobid].is_gpu_node()

# If node does not fit, skip at this moment
if next_job_th > free_processors or next_job_gb > free_memory_gb:
if (
next_job_th > free_processors
or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
):
logger.debug(
"Cannot allocate job %d (%0.2fGB, %d threads).",
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
jobid,
next_job_gb,
next_job_th,
next_job_gpu_th,
)
continue

free_memory_gb -= next_job_gb
free_processors -= next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th

Check warning on line 361 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L361

Added line #L361 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be hit by your test, but coverage shows it's not. Can you look into this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed that because I never used updatedhash=True, but it seems that no test includes that. Should we add a test with that option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover that error does not impact "common" use (I have a project including this gpu support code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was looking into this I found two error about updatehash functionality. I sent a pull request #3709 to fix the biggest.
The second is that in multiproc plugin EVERY node will be executed in main thread if updatehash=True, so no multi process is enabled. I will try to send a pull request for that too (maybe after this gpu support is merged to avoid to handle merge conflicts)

logger.debug(
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
"%0.2fGB, %d threads.",
"%0.2fGB, %d threads, %d GPU slots.",
self.procs[jobid].fullname,
jobid,
next_job_gb,
next_job_th,
free_memory_gb,
free_processors,
free_gpu_slots,
)

# change job status in appropriate queues
Expand Down Expand Up @@ -355,6 +398,8 @@
self._remove_node_dirs()
free_memory_gb += next_job_gb
free_processors += next_job_th
if is_gpu_node:
free_gpu_slots += next_job_gpu_th

Check warning on line 402 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L402

Added line #L402 was not covered by tests
# Display stats next loop
self._stats = None

Expand Down
19 changes: 19 additions & 0 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
class InputSpecSingleNode(nib.TraitedSpec):
input1 = nib.traits.Int(desc="a random int")
input2 = nib.traits.Int(desc="a random int")
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")


class OutputSpecSingleNode(nib.TraitedSpec):
Expand Down Expand Up @@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})


def test_no_more_gpu_threads_than_specified(tmpdir):
tmpdir.chdir()

pipe = pe.Workflow(name="pipe")
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
n1.inputs.use_gpu = True
n1.inputs.input1 = 4
pipe.add_nodes([n1])

max_threads = 2
max_gpu = 1
with pytest.raises(RuntimeError):
pipe.run(
plugin="MultiProc",
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
)


@pytest.mark.skipif(
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
)
Expand Down
10 changes: 10 additions & 0 deletions nipype/pipeline/plugins/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,13 @@ def create_pyscript(node, updatehash=False, store_exception=True):
with open(pyscript, "w") as fp:
fp.writelines(cmdstr)
return pyscript


def gpu_count():
n_gpus = 1
try:
import GPUtil
except ImportError:
return 1
else:
return len(GPUtil.getGPUs())