Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for gpu queue #3642

Merged
merged 14 commits into from
Feb 21, 2025
1 change: 1 addition & 0 deletions nipype/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def get_nipype_gitversion():
"filelock>=3.0.0",
"etelemetry>=0.2.0",
"looseversion!=1.2",
"gputil==1.4.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard pins are a very bad idea. If you need a particular API, use >= to ensure it's present. We should avoid upper bounds as much as possible, although they are not always avoidable.

]

TESTS_REQUIRES = [
Expand Down
5 changes: 5 additions & 0 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ def update(self, **opts):
"""Update inputs"""
self.inputs.update(**opts)

def is_gpu_node(self):
return (hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) or (
hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda) or (
hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu
)
return bool(getattr(self.inputs, 'use_cuda', False)) or bool(
getattr(self.inputs, 'use_gpu', False))



class JoinNode(Node):
"""Wraps interface objects that join inputs into a list.
Expand Down
67 changes: 60 additions & 7 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

- non_daemon: boolean flag to execute as non-daemon processes
- n_procs: maximum number of threads to be executed in parallel
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
- memory_gb: maximum memory (in GB) that can be used at once.
- raise_insufficient: raise error if the requested resources for
a node over the maximum `n_procs` and/or `memory_gb`
Expand Down Expand Up @@ -130,10 +131,23 @@
)
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)

# GPU found on system
self.n_gpus_visible = MultiProcPlugin.gpu_count()
# proc per GPU set by user
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)

# total no. of processes allowed on all gpus
if self.n_gpu_procs > self.n_gpus_visible:
logger.info(
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!'
% (self.n_gpu_procs, self.n_gpus_visible)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loggers accept format strings and their arguments and only actually interpolate them if the logging event is emitted:

Suggested change
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!'
% (self.n_gpu_procs, self.n_gpus_visible)
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!',
self.n_gpu_procs, self.n_gpus_visible)

)

# Instantiate different thread pools for non-daemon processes
logger.debug(
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
self.processors,
self.n_gpu_procs,
self.memory_gb,
self._cwd,
)
Expand Down Expand Up @@ -184,9 +198,12 @@
"""Check if any node exceeds the available resources"""
tasks_mem_gb = []
tasks_num_th = []
tasks_gpu_th = []
for node in graph.nodes():
tasks_mem_gb.append(node.mem_gb)
tasks_num_th.append(node.n_procs)
if node.is_gpu_node():
tasks_gpu_th.append(node.n_procs)

if np.any(np.array(tasks_mem_gb) > self.memory_gb):
logger.warning(
Expand All @@ -203,6 +220,10 @@
)
if self.raise_insufficient:
raise RuntimeError("Insufficient resources available for job")
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
if self.raise_insufficient:
raise RuntimeError('Insufficient GPU resources available for job')

def _postrun_check(self):
self.pool.shutdown()
Expand All @@ -213,11 +234,14 @@
"""
free_memory_gb = self.memory_gb
free_processors = self.processors
free_gpu_slots = self.n_gpu_procs
for _, jobid in running_tasks:
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
free_processors -= min(self.procs[jobid].n_procs, free_processors)
if self.procs[jobid].is_gpu_node():
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)

Check warning on line 242 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L242

Added line #L242 was not covered by tests

return free_memory_gb, free_processors
return free_memory_gb, free_processors, free_gpu_slots

def _send_procs_to_workers(self, updatehash=False, graph=None):
"""
Expand All @@ -232,7 +256,9 @@
)

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
self.pending_tasks
)

stats = (
len(self.pending_tasks),
Expand All @@ -241,6 +267,8 @@
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
)
if self._stats != stats:
tasks_list_msg = ""
Expand All @@ -256,13 +284,15 @@
tasks_list_msg = indent(tasks_list_msg, " " * 21)
logger.info(
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
len(self.pending_tasks),
len(jobids),
free_memory_gb,
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
tasks_list_msg,
)
self._stats = stats
Expand Down Expand Up @@ -304,28 +334,39 @@
# Check requirements of this job
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)

is_gpu_node = self.procs[jobid].is_gpu_node()

# If node does not fit, skip at this moment
if next_job_th > free_processors or next_job_gb > free_memory_gb:
if (
next_job_th > free_processors
or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
):
logger.debug(
"Cannot allocate job %d (%0.2fGB, %d threads).",
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
jobid,
next_job_gb,
next_job_th,
next_job_gpu_th,
)
continue

free_memory_gb -= next_job_gb
free_processors -= next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th

Check warning on line 359 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L359

Added line #L359 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be hit by your test, but coverage shows it's not. Can you look into this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed that because I never used updatedhash=True, but it seems that no test includes that. Should we add a test with that option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover that error does not impact "common" use (I have a project including this gpu support code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was looking into this I found two error about updatehash functionality. I sent a pull request #3709 to fix the biggest.
The second is that in multiproc plugin EVERY node will be executed in main thread if updatehash=True, so no multi process is enabled. I will try to send a pull request for that too (maybe after this gpu support is merged to avoid to handle merge conflicts)

logger.debug(
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
"%0.2fGB, %d threads.",
"%0.2fGB, %d threads, %d GPU slots.",
self.procs[jobid].fullname,
jobid,
next_job_gb,
next_job_th,
free_memory_gb,
free_processors,
free_gpu_slots,
)

# change job status in appropriate queues
Expand All @@ -352,6 +393,8 @@
self._remove_node_dirs()
free_memory_gb += next_job_gb
free_processors += next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th

Check warning on line 397 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L397

Added line #L397 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is releasing resource claims that were made around line 356 so the next time through the loop sees available resources.

Suggested change
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th
if is_gpu_node:
free_gpu_slots += next_job_gpu_th

# Display stats next loop
self._stats = None

Expand Down Expand Up @@ -379,3 +422,13 @@
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
)
return jobids

@staticmethod
def gpu_count():
n_gpus = 1
try:
import GPUtil

return len(GPUtil.getGPUs())
except ImportError:
return n_gpus

Check warning on line 434 in nipype/pipeline/plugins/multiproc.py

View check run for this annotation

Codecov / codecov/patch

nipype/pipeline/plugins/multiproc.py#L433-L434

Added lines #L433 - L434 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general utility, I would put it into nipype.pipeline.plugins.tools as a function, not a static method.

Also consider:

Suggested change
@staticmethod
def gpu_count():
n_gpus = 1
try:
import GPUtil
return len(GPUtil.getGPUs())
except ImportError:
return n_gpus
@staticmethod
def gpu_count():
try:
import GPUtil
except ImportError:
return 1
else:
return len(GPUtil.getGPUs())

As a rule, I try to keep the section inside a try block as short as possible, to avoid accidentally catching other exceptions that are raised. An else block can contain anything that depends on the success of the try block.

19 changes: 19 additions & 0 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
class InputSpecSingleNode(nib.TraitedSpec):
input1 = nib.traits.Int(desc="a random int")
input2 = nib.traits.Int(desc="a random int")
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")


class OutputSpecSingleNode(nib.TraitedSpec):
Expand Down Expand Up @@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})


def test_no_more_gpu_threads_than_specified(tmpdir):
tmpdir.chdir()

pipe = pe.Workflow(name="pipe")
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
n1.inputs.use_gpu = True
n1.inputs.input1 = 4
pipe.add_nodes([n1])

max_threads = 2
max_gpu = 1
with pytest.raises(RuntimeError):
pipe.run(
plugin="MultiProc",
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
)


@pytest.mark.skipif(
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
)
Expand Down