Skip to content

Commit 7fda6ae

Browse files
authored
Merge pull request #2598 from effigies/rf/multiproc_futures
RF: Futures-based MultiProc
2 parents 53966cf + f836118 commit 7fda6ae

File tree

5 files changed

+397
-44
lines changed

5 files changed

+397
-44
lines changed

nipype/info.py

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ def get_nipype_gitversion():
147147
'pydotplus',
148148
'pydot>=%s' % PYDOT_MIN_VERSION,
149149
'packaging',
150+
'futures; python_version == "2.7"',
150151
]
151152

152153
if sys.version_info <= (3, 4):

nipype/pipeline/plugins/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .condor import CondorPlugin
1313
from .dagman import CondorDAGManPlugin
1414
from .multiproc import MultiProcPlugin
15+
from .legacymultiproc import LegacyMultiProcPlugin
1516
from .ipython import IPythonPlugin
1617
from .somaflow import SomaFlowPlugin
1718
from .pbsgraph import PBSGraphPlugin
+382
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
# -*- coding: utf-8 -*-
2+
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
3+
# vi: set ft=python sts=4 ts=4 sw=4 et:
4+
"""Parallel workflow execution via multiprocessing
5+
6+
Support for child processes running as non-daemons based on
7+
http://stackoverflow.com/a/8963618/1183453
8+
"""
9+
from __future__ import (print_function, division, unicode_literals,
10+
absolute_import)
11+
12+
# Import packages
13+
import os
14+
from multiprocessing import Process, Pool, cpu_count, pool
15+
from traceback import format_exception
16+
import sys
17+
from logging import INFO
18+
import gc
19+
20+
from copy import deepcopy
21+
import numpy as np
22+
from ... import logging
23+
from ...utils.profiler import get_system_total_memory_gb
24+
from ..engine import MapNode
25+
from .base import DistributedPluginBase
26+
27+
try:
28+
from textwrap import indent
29+
except ImportError:
30+
31+
def indent(text, prefix):
32+
""" A textwrap.indent replacement for Python < 3.3 """
33+
if not prefix:
34+
return text
35+
splittext = text.splitlines(True)
36+
return prefix + prefix.join(splittext)
37+
38+
39+
# Init logger
40+
logger = logging.getLogger('workflow')
41+
42+
43+
# Run node
44+
def run_node(node, updatehash, taskid):
45+
"""Function to execute node.run(), catch and log any errors and
46+
return the result dictionary
47+
48+
Parameters
49+
----------
50+
node : nipype Node instance
51+
the node to run
52+
updatehash : boolean
53+
flag for updating hash
54+
taskid : int
55+
an identifier for this task
56+
57+
Returns
58+
-------
59+
result : dictionary
60+
dictionary containing the node runtime results and stats
61+
"""
62+
63+
# Init variables
64+
result = dict(result=None, traceback=None, taskid=taskid)
65+
66+
# Try and execute the node via node.run()
67+
try:
68+
result['result'] = node.run(updatehash=updatehash)
69+
except: # noqa: E722, intendedly catch all here
70+
result['traceback'] = format_exception(*sys.exc_info())
71+
result['result'] = node.result
72+
73+
# Return the result dictionary
74+
return result
75+
76+
77+
class NonDaemonProcess(Process):
78+
"""A non-daemon process to support internal multiprocessing.
79+
"""
80+
81+
def _get_daemon(self):
82+
return False
83+
84+
def _set_daemon(self, value):
85+
pass
86+
87+
daemon = property(_get_daemon, _set_daemon)
88+
89+
90+
class NonDaemonPool(pool.Pool):
91+
"""A process pool with non-daemon processes.
92+
"""
93+
Process = NonDaemonProcess
94+
95+
96+
class LegacyMultiProcPlugin(DistributedPluginBase):
97+
"""
98+
Execute workflow with multiprocessing, not sending more jobs at once
99+
than the system can support.
100+
101+
The plugin_args input to run can be used to control the multiprocessing
102+
execution and defining the maximum amount of memory and threads that
103+
should be used. When those parameters are not specified,
104+
the number of threads and memory of the system is used.
105+
106+
System consuming nodes should be tagged::
107+
108+
memory_consuming_node.mem_gb = 8
109+
thread_consuming_node.n_procs = 16
110+
111+
The default number of threads and memory are set at node
112+
creation, and are 1 and 0.25GB respectively.
113+
114+
Currently supported options are:
115+
116+
- non_daemon : boolean flag to execute as non-daemon processes
117+
- n_procs: maximum number of threads to be executed in parallel
118+
- memory_gb: maximum memory (in GB) that can be used at once.
119+
- raise_insufficient: raise error if the requested resources for
120+
a node over the maximum `n_procs` and/or `memory_gb`
121+
(default is ``True``).
122+
- scheduler: sort jobs topologically (``'tsort'``, default value)
123+
or prioritize jobs by, first, memory consumption and, second,
124+
number of threads (``'mem_thread'`` option).
125+
- maxtasksperchild: number of nodes to run on each process before
126+
refreshing the worker (default: 10).
127+
128+
"""
129+
130+
def __init__(self, plugin_args=None):
131+
# Init variables and instance attributes
132+
super(LegacyMultiProcPlugin, self).__init__(plugin_args=plugin_args)
133+
self._taskresult = {}
134+
self._task_obj = {}
135+
self._taskid = 0
136+
137+
# Cache current working directory and make sure we
138+
# change to it when workers are set up
139+
self._cwd = os.getcwd()
140+
141+
# Read in options or set defaults.
142+
non_daemon = self.plugin_args.get('non_daemon', True)
143+
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
144+
self.processors = self.plugin_args.get('n_procs', cpu_count())
145+
self.memory_gb = self.plugin_args.get(
146+
'memory_gb', # Allocate 90% of system memory
147+
get_system_total_memory_gb() * 0.9)
148+
self.raise_insufficient = self.plugin_args.get('raise_insufficient',
149+
True)
150+
151+
# Instantiate different thread pools for non-daemon processes
152+
logger.debug('[LegacyMultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
153+
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
154+
self.processors, self.memory_gb, self._cwd)
155+
156+
NipypePool = NonDaemonPool if non_daemon else Pool
157+
try:
158+
self.pool = NipypePool(
159+
processes=self.processors,
160+
maxtasksperchild=maxtasks,
161+
initializer=os.chdir,
162+
initargs=(self._cwd,)
163+
)
164+
except TypeError:
165+
# Python < 3.2 does not have maxtasksperchild
166+
# When maxtasksperchild is not set, initializer is not to be
167+
# called
168+
self.pool = NipypePool(processes=self.processors)
169+
170+
self._stats = None
171+
172+
def _async_callback(self, args):
173+
# Make sure runtime is not left at a dubious working directory
174+
os.chdir(self._cwd)
175+
self._taskresult[args['taskid']] = args
176+
177+
def _get_result(self, taskid):
178+
return self._taskresult.get(taskid)
179+
180+
def _clear_task(self, taskid):
181+
del self._task_obj[taskid]
182+
183+
def _submit_job(self, node, updatehash=False):
184+
self._taskid += 1
185+
186+
# Don't allow streaming outputs
187+
if getattr(node.interface, 'terminal_output', '') == 'stream':
188+
node.interface.terminal_output = 'allatonce'
189+
190+
self._task_obj[self._taskid] = self.pool.apply_async(
191+
run_node, (node, updatehash, self._taskid),
192+
callback=self._async_callback)
193+
194+
logger.debug('[LegacyMultiProc] Submitted task %s (taskid=%d).',
195+
node.fullname, self._taskid)
196+
return self._taskid
197+
198+
def _prerun_check(self, graph):
199+
"""Check if any node exeeds the available resources"""
200+
tasks_mem_gb = []
201+
tasks_num_th = []
202+
for node in graph.nodes():
203+
tasks_mem_gb.append(node.mem_gb)
204+
tasks_num_th.append(node.n_procs)
205+
206+
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
207+
logger.warning(
208+
'Some nodes exceed the total amount of memory available '
209+
'(%0.2fGB).', self.memory_gb)
210+
if self.raise_insufficient:
211+
raise RuntimeError('Insufficient resources available for job')
212+
213+
if np.any(np.array(tasks_num_th) > self.processors):
214+
logger.warning(
215+
'Some nodes demand for more threads than available (%d).',
216+
self.processors)
217+
if self.raise_insufficient:
218+
raise RuntimeError('Insufficient resources available for job')
219+
220+
def _postrun_check(self):
221+
self.pool.close()
222+
223+
def _check_resources(self, running_tasks):
224+
"""
225+
Make sure there are resources available
226+
"""
227+
free_memory_gb = self.memory_gb
228+
free_processors = self.processors
229+
for _, jobid in running_tasks:
230+
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
231+
free_processors -= min(self.procs[jobid].n_procs, free_processors)
232+
233+
return free_memory_gb, free_processors
234+
235+
def _send_procs_to_workers(self, updatehash=False, graph=None):
236+
"""
237+
Sends jobs to workers when system resources are available.
238+
"""
239+
240+
# Check to see if a job is available (jobs with all dependencies run)
241+
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
242+
# See also https://github.com/nipy/nipype/issues/2372
243+
jobids = np.flatnonzero(~self.proc_done &
244+
(self.depidx.sum(axis=0) == 0).__array__())
245+
246+
# Check available resources by summing all threads and memory used
247+
free_memory_gb, free_processors = self._check_resources(
248+
self.pending_tasks)
249+
250+
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
251+
self.memory_gb, free_processors, self.processors)
252+
if self._stats != stats:
253+
tasks_list_msg = ''
254+
255+
if logger.level <= INFO:
256+
running_tasks = [
257+
' * %s' % self.procs[jobid].fullname
258+
for _, jobid in self.pending_tasks
259+
]
260+
if running_tasks:
261+
tasks_list_msg = '\nCurrently running:\n'
262+
tasks_list_msg += '\n'.join(running_tasks)
263+
tasks_list_msg = indent(tasks_list_msg, ' ' * 21)
264+
logger.info(
265+
'[LegacyMultiProc] Running %d tasks, and %d jobs ready. Free '
266+
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s',
267+
len(self.pending_tasks), len(jobids), free_memory_gb,
268+
self.memory_gb, free_processors, self.processors,
269+
tasks_list_msg)
270+
self._stats = stats
271+
272+
if free_memory_gb < 0.01 or free_processors == 0:
273+
logger.debug('No resources available')
274+
return
275+
276+
if len(jobids) + len(self.pending_tasks) == 0:
277+
logger.debug('No tasks are being run, and no jobs can '
278+
'be submitted to the queue. Potential deadlock')
279+
return
280+
281+
jobids = self._sort_jobs(
282+
jobids, scheduler=self.plugin_args.get('scheduler'))
283+
284+
# Run garbage collector before potentially submitting jobs
285+
gc.collect()
286+
287+
# Submit jobs
288+
for jobid in jobids:
289+
# First expand mapnodes
290+
if isinstance(self.procs[jobid], MapNode):
291+
try:
292+
num_subnodes = self.procs[jobid].num_subnodes()
293+
except Exception:
294+
traceback = format_exception(*sys.exc_info())
295+
self._clean_queue(
296+
jobid,
297+
graph,
298+
result={
299+
'result': None,
300+
'traceback': traceback
301+
})
302+
self.proc_pending[jobid] = False
303+
continue
304+
if num_subnodes > 1:
305+
submit = self._submit_mapnode(jobid)
306+
if not submit:
307+
continue
308+
309+
# Check requirements of this job
310+
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
311+
next_job_th = min(self.procs[jobid].n_procs, self.processors)
312+
313+
# If node does not fit, skip at this moment
314+
if next_job_th > free_processors or next_job_gb > free_memory_gb:
315+
logger.debug('Cannot allocate job %d (%0.2fGB, %d threads).',
316+
jobid, next_job_gb, next_job_th)
317+
continue
318+
319+
free_memory_gb -= next_job_gb
320+
free_processors -= next_job_th
321+
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
322+
'%0.2fGB, %d threads.', self.procs[jobid].fullname,
323+
jobid, next_job_gb, next_job_th, free_memory_gb,
324+
free_processors)
325+
326+
# change job status in appropriate queues
327+
self.proc_done[jobid] = True
328+
self.proc_pending[jobid] = True
329+
330+
# If cached and up-to-date just retrieve it, don't run
331+
if self._local_hash_check(jobid, graph):
332+
continue
333+
334+
# updatehash and run_without_submitting are also run locally
335+
if updatehash or self.procs[jobid].run_without_submitting:
336+
logger.debug('Running node %s on master thread',
337+
self.procs[jobid])
338+
try:
339+
self.procs[jobid].run(updatehash=updatehash)
340+
except Exception:
341+
traceback = format_exception(*sys.exc_info())
342+
self._clean_queue(
343+
jobid,
344+
graph,
345+
result={
346+
'result': None,
347+
'traceback': traceback
348+
})
349+
350+
# Release resources
351+
self._task_finished_cb(jobid)
352+
self._remove_node_dirs()
353+
free_memory_gb += next_job_gb
354+
free_processors += next_job_th
355+
# Display stats next loop
356+
self._stats = None
357+
358+
# Clean up any debris from running node in main process
359+
gc.collect()
360+
continue
361+
362+
# Task should be submitted to workers
363+
# Send job to task manager and add to pending tasks
364+
if self._status_callback:
365+
self._status_callback(self.procs[jobid], 'start')
366+
tid = self._submit_job(
367+
deepcopy(self.procs[jobid]), updatehash=updatehash)
368+
if tid is None:
369+
self.proc_done[jobid] = False
370+
self.proc_pending[jobid] = False
371+
else:
372+
self.pending_tasks.insert(0, (tid, jobid))
373+
# Display stats next loop
374+
self._stats = None
375+
376+
def _sort_jobs(self, jobids, scheduler='tsort'):
377+
if scheduler == 'mem_thread':
378+
return sorted(
379+
jobids,
380+
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs)
381+
)
382+
return jobids

0 commit comments

Comments
 (0)