Skip to content

Commit e822e28

Browse files
committed
first commit
0 parents  commit e822e28

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

parallel.ex

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
defmodule Parallel do
2+
@moduledoc """
3+
4+
This module shows how you can hide the synchronous and asynchronous behaviour
5+
behind an API.
6+
7+
"""
8+
9+
@doc """
10+
runs a parallel map by spawning a bunch of processes and returns array of results
11+
## Parameters
12+
- collection: a collection that can be enumerated
13+
- function: an anonymous function to apply to each element in collection
14+
15+
## Examples
16+
iex> Parallel.pmap((0..4), fn(x) -> x * x end)
17+
[0, 1, 4, 9, 16]
18+
"""
19+
def pmap(collection, function) do
20+
me = self
21+
pid_list = Enum.map(collection, fn (elem) ->
22+
spawn_link fn -> (send me, { self, function.(elem) }) end
23+
end)
24+
Enum.map(pid_list, fn (pid) ->
25+
receive do { ^pid, result } -> result end
26+
end)
27+
end
28+
29+
def pmap_wrapper(me, collection, function) do
30+
result = pmap(collection, function)
31+
send me, {self, result}
32+
end
33+
34+
@doc """
35+
runs a parallel map by either a) in current process, b) synchronous multi-process or
36+
c) fully asynchronous multi-process, and you need to wait for result.
37+
38+
It chooses which one to run based on the size of the collection
39+
40+
## Parameters
41+
- range: a collection that can be enumerated
42+
- function: an anonymous function to apply to each element in collection
43+
44+
## Examples
45+
iex> Parallel.map_or_pmap_or_apmap((0..4), fn(x) -> x * x end)
46+
[0, 1, 4, 9, 16]
47+
"""
48+
def map_or_pmap_or_apmap(%Range{} = range, function) do
49+
me = self
50+
first..last = range
51+
result = case last - first do
52+
x when x > 1000 ->
53+
IO.puts("<delayed_map>")
54+
pid = spawn_link( fn -> pmap_wrapper(me, range, function) end)
55+
{:delayed, pid}
56+
x when x > 100 ->
57+
IO.puts("<pmap>")
58+
{:ok, pmap(range, fn(x) -> x * x end) }
59+
_ ->
60+
IO.puts("<normal map>")
61+
{:ok, Enum.map(range, fn(x) -> x * x end) }
62+
end
63+
result
64+
end
65+
66+
def map_or_pmap(%Range{} = range, function) do
67+
first..last = range
68+
result = case last - first > 1000 do
69+
true -> pmap(range, function)
70+
_ -> Enum.map((0..100000), fn(x) -> x * x end)
71+
end
72+
result
73+
end
74+
75+
def map(collection, function) do
76+
Enum.map(collection, function)
77+
end
78+
79+
def show_result(result) do
80+
case result do
81+
{:ok, arr} -> IO.puts(">> Running synchronously, I am waiting for results")
82+
IO.inspect(arr)
83+
{:delayed, _} -> IO.puts(">> Running asynchronously, I am not going to wait for results")
84+
end
85+
result
86+
end
87+
88+
def run_fail_test do
89+
[0, 1, 3, 9, 16 ] = Parallel.pmap((0..4), fn(x) -> x * x end) # multi-process
90+
end
91+
92+
def run_tests do
93+
IO.puts "------map:"
94+
IO.inspect Parallel.map((0..100000), fn(x) -> x * x end) # single-process
95+
IO.puts "------pmap"
96+
IO.inspect Parallel.pmap((0..4), fn(x) -> x * x end) # multi-process
97+
IO.puts "------map_or_pmap use multi-process"
98+
IO.inspect Parallel.map_or_pmap((0..100000), fn(x) -> x * x end) # use multi-process
99+
IO.puts "------map_or_pmap use this (one) process"
100+
IO.inspect Parallel.map_or_pmap((0..100), fn(x) -> x * x end) # use single-process
101+
102+
IO.puts "checking the 3 versions"
103+
104+
# But what if you only want to wait if it is a BIG one...
105+
Parallel.show_result(Parallel.map_or_pmap_or_apmap((0..10), fn(x) -> x * x end))
106+
Parallel.show_result(Parallel.map_or_pmap_or_apmap((0..150), fn(x) -> x * x end))
107+
result = Parallel.show_result(Parallel.map_or_pmap_or_apmap((0..1000000), fn(x) -> x * x end))
108+
109+
case result do
110+
{:delayed, pid} ->
111+
start_waiting(pid)
112+
{:ok, arr } -> IO.puts("not delayed")
113+
IO.inspect(arr)
114+
end
115+
end
116+
117+
def start_waiting(pid) do
118+
receive do
119+
{ ^pid, result } -> IO.puts("done")
120+
IO.inspect(result)
121+
after
122+
1_000 -> IO.puts "still waiting..."
123+
start_waiting(pid)
124+
end
125+
end
126+
127+
end

0 commit comments

Comments
 (0)