Skip to content

Commit 11aeef2

Browse files
Fix parallel MC example to run on Windows
On Windows one needs to use multiprocessing.Manager to create the barrier to be shared across workers, and passed to worker initialization routine as argument. Also passed batch_size to routines as keyword argument, and in parallel_mc.py passed mc_worker_func to worker initialization routine as keyword argument, which assigns to process-global variable for reuse by subsequent MC computation tasks.
1 parent cc1c6c1 commit 11aeef2

File tree

3 files changed

+40
-31
lines changed

3 files changed

+40
-31
lines changed

examples/parallel_mc.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
import multiprocessing as mp
2+
from functools import partial
23

34
__all__ = ['parallel_mc_run']
45

56
def worker_compute(w_id):
67
"Worker function executed on the spawned slave process"
7-
# global _local_rs
8-
return _worker_mc_compute(_local_rs)
8+
global _local_rs, _worker_mc_compute_func
9+
return _worker_mc_compute_func(_local_rs)
910

1011

11-
def assign_worker_rs(w_rs):
12+
def init_worker(w_rs, mc_compute_func=None, barrier=None):
1213
"""Assign process local random state variable `rs` the given value"""
1314
assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`"
1415

15-
global _local_rs
16+
global _local_rs, _worker_mc_compute_func
1617
_local_rs = w_rs
18+
_worker_mc_compute_func = mc_compute_func
1719
# wait to ensure that the assignment takes place for each worker
18-
b.wait()
20+
barrier.wait()
1921

2022
def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
2123
"""
@@ -25,15 +27,15 @@ def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
2527
and has access to worker-local global variable `rs`, containing worker's random states.
2628
"""
2729
# use of Barrier ensures that every worker gets one
28-
global b, _worker_mc_compute
29-
b = mp.Barrier(n_workers)
30+
31+
with mp.Manager() as manager:
32+
b = manager.Barrier(n_workers)
3033

31-
_worker_mc_compute = mc_func
32-
with mp.Pool(processes=n_workers) as pool:
33-
# 1. map over every worker once to distribute RandomState instances
34-
pool.map(assign_worker_rs, random_states, chunksize=1)
35-
# 2. Perform computations on workers
36-
r = pool.map(worker_compute, range(n_batches), chunksize=1)
34+
with mp.Pool(processes=n_workers) as pool:
35+
# 1. map over every worker once to distribute RandomState instances
36+
pool.map(partial(init_worker, mc_compute_func=mc_func, barrier=b), random_states, chunksize=1)
37+
# 2. Perform computations on workers
38+
r = pool.map(worker_compute, range(n_batches), chunksize=1)
3739

3840
return r
3941

examples/stick_tetrahedron.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from sticky_math import mc_six_piece_stick_tetrahedron_prob
55
from arg_parsing import parse_arguments
66

7-
def mc_runner(rs):
7+
def mc_runner(rs, batch_size=None):
88
return mc_six_piece_stick_tetrahedron_prob(rs, batch_size)
99

1010
def aggregate_mc_counts(counts, n_batches, batch_size):
@@ -40,6 +40,7 @@ def print_result(p_est, p_std, mc_size):
4040
from itertools import repeat
4141
from timeit import default_timer as timer
4242
import sys
43+
from functools import partial
4344

4445
args = parse_arguments()
4546

@@ -51,23 +52,24 @@ def print_result(p_est, p_std, mc_size):
5152
batch_size = args.batch_size
5253
batches = args.batch_count
5354
id0 = args.id_offset
55+
print("Parallel Monte-Carlo estimation of stick tetrahedron probability")
56+
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
57+
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
58+
print("")
5459

5560
t0 = timer()
5661

5762
rss = build_MT2203_random_states(seed, id0, n_workers)
58-
r = parallel_mc_run(rss, n_workers, batches, mc_runner)
59-
# r = sequential_mc_run(rss, n_workers, batches, mc_runner)
63+
64+
r = parallel_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
65+
# r = sequential_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
6066

6167
# retrieve values of estimates into numpy array
6268
counts = np.fromiter(r, dtype=np.double)
6369
p_est, p_std, event_count, nonevent_count = aggregate_mc_counts(counts, batches, batch_size)
6470

6571
t1 = timer()
6672

67-
68-
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
69-
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
70-
print("")
7173
print_result(p_est, p_std, batches * batch_size)
7274
print("")
7375
print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count))

examples/fancy.py renamed to examples/stick_triangle.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,41 +39,46 @@ def mc_dist(rs, n):
3939
return mc_prob
4040

4141

42-
def assign_worker_rs(w_rs):
42+
def init_worker(w_rs, barrier=None):
4343
"""Assign process local random state variable `rs` the given value"""
4444
assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`"
4545

4646
global rs
4747
rs = w_rs
4848
# wait to ensure that the assignment takes place for each worker
49-
b.wait()
49+
barrier.wait()
5050

5151

52-
def worker_compute(w_id):
52+
def worker_compute(w_id, batch_size=None):
5353
return mc_dist(rs, batch_size)
5454

5555

5656
if __name__ == '__main__':
5757
import multiprocessing as mp
5858
from itertools import repeat
5959
from timeit import default_timer as timer
60+
from functools import partial
6061

6162
seed = 77777
6263
n_workers = 12
6364
batch_size = 1024 * 256
6465
batches = 10000
66+
print("Parallel Monte-Carlo estimation of stick triangle probability")
67+
print(f"Parameters: n_workers={n_workers}, batch_size={batch_size}, n_batches={batches}, seed={seed}")
68+
print("")
6569

6670
t0 = timer()
6771
# Create instances of RandomState for each worker process from MT2203 family of generators
6872
rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ]
69-
# use of Barrier ensures that every worker gets one
70-
b = mp.Barrier(n_workers)
71-
72-
with mp.Pool(processes=n_workers) as pool:
73-
# map over every worker once to distribute RandomState instances
74-
pool.map(assign_worker_rs, rss, chunksize=1)
75-
# Perform computations on workers
76-
r = pool.map(worker_compute, range(batches), chunksize=1)
73+
with mp.Manager() as manager:
74+
# use of Barrier ensures that every worker gets one
75+
b = manager.Barrier(n_workers)
76+
77+
with mp.Pool(processes=n_workers) as pool:
78+
# map over every worker once to distribute RandomState instances
79+
pool.map(partial(init_worker, barrier=b), rss, chunksize=1)
80+
# Perform computations on workers
81+
r = pool.map(partial(worker_compute, batch_size=batch_size), range(batches), chunksize=1)
7782

7883
# retrieve values of estimates into numpy array
7984
ps = np.fromiter(r, dtype=np.double)

0 commit comments

Comments
 (0)