Skip to content

Commit d1dd3ba

Browse files
authored
Merge pull request #26 from IntelPython/example-work
Improving parallel MC use-case example
2 parents cc1c6c1 + a9db09f commit d1dd3ba

File tree

4 files changed

+71
-32
lines changed

4 files changed

+71
-32
lines changed

examples/README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,40 @@ This data is post-processed as necessary for the application.
1717
Code is tested to estimate the probability that 3 segments, obtained by splitting a unit stick
1818
in two randomly chosen places, can be sides of a triangle. This probability is known in closed form to be $\frac{1}{4}$.
1919

20+
Run python script "stick_triangle.py" to estimate this probability using parallel Monte-Carlo algorithm:
21+
22+
```
23+
> python stick_triangle.py
24+
Parallel Monte-Carlo estimation of stick triangle probability
25+
Parameters: n_workers=12, batch_size=262144, n_batches=10000, seed=77777
26+
27+
Monte-Carlo estimate of probability: 0.250000
28+
Population estimate of the estimator's standard deviation: 0.000834
29+
Expected standard deviation of the estimator: 0.000846
30+
Execution time: 64.043 seconds
31+
32+
```
33+
2034
## Stick tetrahedron problem
2135

2236
Code is used to estimate the probability that 6 segments, obtained by splitting a unit stick in
2337
5 random chosen places, can be sides of a tetrahedron.
2438

2539
The probability is not known in closed form. See
26-
[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details.
40+
[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details.
41+
42+
```
43+
> python stick_tetrahedron.py -s 1274 -p 4 -n 8096
44+
Parallel Monte-Carlo estimation of stick tetrahedron probability
45+
Input parameters: -s 1274 -b 65536 -n 8096 -p 4 -d 0
46+
47+
Monte-Carlo estimate of probability: 0.01257113
48+
Population estimate of the estimator's standard deviation: 0.00000488
49+
Expected standard deviation of the estimator: 0.00000484
50+
Total MC size: 530579456
51+
52+
Bayesian posterior beta distribution parameters: (6669984, 523909472)
53+
54+
Execution time: 30.697 seconds
55+
56+
```

examples/parallel_mc.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
import multiprocessing as mp
2+
from functools import partial
23

34
__all__ = ['parallel_mc_run']
45

56
def worker_compute(w_id):
67
"Worker function executed on the spawned slave process"
7-
# global _local_rs
8-
return _worker_mc_compute(_local_rs)
8+
global _local_rs, _worker_mc_compute_func
9+
return _worker_mc_compute_func(_local_rs)
910

1011

11-
def assign_worker_rs(w_rs):
12+
def init_worker(w_rs, mc_compute_func=None, barrier=None):
1213
"""Assign process local random state variable `rs` the given value"""
1314
assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`"
1415

15-
global _local_rs
16+
global _local_rs, _worker_mc_compute_func
1617
_local_rs = w_rs
18+
_worker_mc_compute_func = mc_compute_func
1719
# wait to ensure that the assignment takes place for each worker
18-
b.wait()
20+
barrier.wait()
1921

2022
def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
2123
"""
@@ -25,15 +27,15 @@ def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
2527
and has access to worker-local global variable `rs`, containing worker's random states.
2628
"""
2729
# use of Barrier ensures that every worker gets one
28-
global b, _worker_mc_compute
29-
b = mp.Barrier(n_workers)
30+
31+
with mp.Manager() as manager:
32+
b = manager.Barrier(n_workers)
3033

31-
_worker_mc_compute = mc_func
32-
with mp.Pool(processes=n_workers) as pool:
33-
# 1. map over every worker once to distribute RandomState instances
34-
pool.map(assign_worker_rs, random_states, chunksize=1)
35-
# 2. Perform computations on workers
36-
r = pool.map(worker_compute, range(n_batches), chunksize=1)
34+
with mp.Pool(processes=n_workers) as pool:
35+
# 1. map over every worker once to distribute RandomState instances
36+
pool.map(partial(init_worker, mc_compute_func=mc_func, barrier=b), random_states, chunksize=1)
37+
# 2. Perform computations on workers
38+
r = pool.map(worker_compute, range(n_batches), chunksize=1)
3739

3840
return r
3941

examples/stick_tetrahedron.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from sticky_math import mc_six_piece_stick_tetrahedron_prob
55
from arg_parsing import parse_arguments
66

7-
def mc_runner(rs):
7+
def mc_runner(rs, batch_size=None):
88
return mc_six_piece_stick_tetrahedron_prob(rs, batch_size)
99

1010
def aggregate_mc_counts(counts, n_batches, batch_size):
@@ -40,6 +40,7 @@ def print_result(p_est, p_std, mc_size):
4040
from itertools import repeat
4141
from timeit import default_timer as timer
4242
import sys
43+
from functools import partial
4344

4445
args = parse_arguments()
4546

@@ -51,23 +52,24 @@ def print_result(p_est, p_std, mc_size):
5152
batch_size = args.batch_size
5253
batches = args.batch_count
5354
id0 = args.id_offset
55+
print("Parallel Monte-Carlo estimation of stick tetrahedron probability")
56+
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
57+
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
58+
print("")
5459

5560
t0 = timer()
5661

5762
rss = build_MT2203_random_states(seed, id0, n_workers)
58-
r = parallel_mc_run(rss, n_workers, batches, mc_runner)
59-
# r = sequential_mc_run(rss, n_workers, batches, mc_runner)
63+
64+
r = parallel_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
65+
# r = sequential_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
6066

6167
# retrieve values of estimates into numpy array
6268
counts = np.fromiter(r, dtype=np.double)
6369
p_est, p_std, event_count, nonevent_count = aggregate_mc_counts(counts, batches, batch_size)
6470

6571
t1 = timer()
6672

67-
68-
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
69-
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
70-
print("")
7173
print_result(p_est, p_std, batches * batch_size)
7274
print("")
7375
print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count))

examples/fancy.py renamed to examples/stick_triangle.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,41 +39,46 @@ def mc_dist(rs, n):
3939
return mc_prob
4040

4141

42-
def assign_worker_rs(w_rs):
42+
def init_worker(w_rs, barrier=None):
4343
"""Assign process local random state variable `rs` the given value"""
4444
assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`"
4545

4646
global rs
4747
rs = w_rs
4848
# wait to ensure that the assignment takes place for each worker
49-
b.wait()
49+
barrier.wait()
5050

5151

52-
def worker_compute(w_id):
52+
def worker_compute(w_id, batch_size=None):
5353
return mc_dist(rs, batch_size)
5454

5555

5656
if __name__ == '__main__':
5757
import multiprocessing as mp
5858
from itertools import repeat
5959
from timeit import default_timer as timer
60+
from functools import partial
6061

6162
seed = 77777
6263
n_workers = 12
6364
batch_size = 1024 * 256
6465
batches = 10000
66+
print("Parallel Monte-Carlo estimation of stick triangle probability")
67+
print(f"Parameters: n_workers={n_workers}, batch_size={batch_size}, n_batches={batches}, seed={seed}")
68+
print("")
6569

6670
t0 = timer()
6771
# Create instances of RandomState for each worker process from MT2203 family of generators
6872
rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ]
69-
# use of Barrier ensures that every worker gets one
70-
b = mp.Barrier(n_workers)
71-
72-
with mp.Pool(processes=n_workers) as pool:
73-
# map over every worker once to distribute RandomState instances
74-
pool.map(assign_worker_rs, rss, chunksize=1)
75-
# Perform computations on workers
76-
r = pool.map(worker_compute, range(batches), chunksize=1)
73+
with mp.Manager() as manager:
74+
# use of Barrier ensures that every worker gets one
75+
b = manager.Barrier(n_workers)
76+
77+
with mp.Pool(processes=n_workers) as pool:
78+
# map over every worker once to distribute RandomState instances
79+
pool.map(partial(init_worker, barrier=b), rss, chunksize=1)
80+
# Perform computations on workers
81+
r = pool.map(partial(worker_compute, batch_size=batch_size), range(batches), chunksize=1)
7782

7883
# retrieve values of estimates into numpy array
7984
ps = np.fromiter(r, dtype=np.double)

0 commit comments

Comments
 (0)