@@ -6,36 +6,19 @@ defmodule Parallel do
6
6
7
7
"""
8
8
9
- @ doc """
10
- runs a parallel map by spawning a bunch of processes and returns array of results
11
- ## Parameters
12
- - collection: a collection that can be enumerated
13
- - function: an anonymous function to apply to each element in collection
14
-
15
- ## Examples
16
- iex> Parallel.pmap((0..4), fn(x) -> x * x end)
17
- [0, 1, 4, 9, 16]
18
- """
19
- def pmap ( collection , function ) do
20
- me = self
21
- pid_list = Enum . map ( collection , fn ( elem ) ->
22
- spawn_link fn -> ( send me , { self , function . ( elem ) } ) end
23
- end )
24
- Enum . map ( pid_list , fn ( pid ) ->
25
- receive do { ^ pid , result } -> result end
26
- end )
27
- end
28
-
29
- def pmap_wrapper ( me , collection , function ) do
30
- result = pmap ( collection , function )
31
- send me , { self , result }
32
- end
33
9
34
10
@ doc """
35
- runs a parallel map by either a) in current process, b) synchronous multi-process or
11
+ This shows a common technique in Elixir/Erlang. If something is fast, do
12
+ it synchronous (either in process or multi-process), and it looks like
13
+ a big problem, then do it asynchronous, which lets the caller do other
14
+ work and check back later for the result.
15
+
16
+ Runs a parallel map by either a) in current process, b) synchronous multi-process or
36
17
c) fully asynchronous multi-process, and you need to wait for result.
37
18
38
- It chooses which one to run based on the size of the collection
19
+ It chooses which one to run based on the size of the collection.
20
+
21
+ Only works with ranges currently.
39
22
40
23
## Parameters
41
24
- range: a collection that can be enumerated
@@ -50,7 +33,7 @@ defmodule Parallel do
50
33
first .. last = range
51
34
result = case last - first do
52
35
x when x > 1000 ->
53
- IO . puts ( "<delayed_map >" )
36
+ IO . puts ( "<delayed_pmap >" )
54
37
pid = spawn_link ( fn -> pmap_wrapper ( me , range , function ) end )
55
38
{ :delayed , pid }
56
39
x when x > 100 ->
@@ -72,17 +55,43 @@ defmodule Parallel do
72
55
result
73
56
end
74
57
58
+ @ doc """
59
+ runs a parallel map by spawning a bunch of processes and returns array of results
60
+ ## Parameters
61
+ - collection: a collection that can be enumerated
62
+ - function: an anonymous function to apply to each element in collection
63
+
64
+ ## Examples
65
+ iex> Parallel.pmap((0..4), fn(x) -> x * x end)
66
+ [0, 1, 4, 9, 16]
67
+ """
68
+ def pmap ( collection , function ) do
69
+ me = self
70
+ pid_list = Enum . map ( collection , fn ( elem ) ->
71
+ spawn_link fn -> ( send me , { self , function . ( elem ) } ) end
72
+ end )
73
+ Enum . map ( pid_list , fn ( pid ) ->
74
+ receive do { ^ pid , result } -> result end
75
+ end )
76
+ end
77
+
78
+ def pmap_wrapper ( me , collection , function ) do
79
+ result = pmap ( collection , function )
80
+ send me , { self , result }
81
+ end
82
+
75
83
def map ( collection , function ) do
76
84
Enum . map ( collection , function )
77
85
end
78
86
79
- def show_result ( result ) do
80
- case result do
81
- { :ok , arr } -> IO . puts ( ">> Running synchronously, I am waiting for results" )
82
- IO . inspect ( arr )
83
- { :delayed , _ } -> IO . puts ( ">> Running asynchronously, I am not going to wait for results" )
84
- end
85
- result
87
+ def handle_result ( result = { :delayed , pid } ) do
88
+ IO . puts ( ">> Running asynchronously, I am not going to wait for results:" )
89
+ waiting ( pid )
90
+ end
91
+
92
+ def handle_result ( result = { :ok , arr } ) do
93
+ IO . puts ( ">> Ran synchronously, here are the results:" )
94
+ IO . inspect ( arr )
86
95
end
87
96
88
97
def run_fail_test do
@@ -102,25 +111,18 @@ defmodule Parallel do
102
111
IO . puts "checking the 3 versions"
103
112
104
113
# But what if you only want to wait if it is a BIG one...
105
- Parallel . show_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 10 ) , fn ( x ) -> x * x end ) )
106
- Parallel . show_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 150 ) , fn ( x ) -> x * x end ) )
107
- result = Parallel . show_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 1000000 ) , fn ( x ) -> x * x end ) )
108
-
109
- case result do
110
- { :delayed , pid } ->
111
- start_waiting ( pid )
112
- { :ok , arr } -> IO . puts ( "not delayed" )
113
- IO . inspect ( arr )
114
- end
114
+ Parallel . handle_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 10 ) , fn ( x ) -> x * x end ) )
115
+ Parallel . handle_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 150 ) , fn ( x ) -> x * x end ) )
116
+ Parallel . handle_result ( Parallel . map_or_pmap_or_apmap ( ( 0 .. 1000000 ) , fn ( x ) -> x * x end ) )
115
117
end
116
118
117
- def start_waiting ( pid ) do
119
+ def waiting ( pid ) do
118
120
receive do
119
- { ^ pid , result } -> IO . puts ( "done " )
121
+ { ^ pid , result } -> IO . puts ( "Results have ready. Yay!: " )
120
122
IO . inspect ( result )
121
123
after
122
- 1_000 -> IO . puts "still waiting..."
123
- start_waiting ( pid )
124
+ 1_000 -> IO . puts "still waiting... more hamsters needed... "
125
+ waiting ( pid )
124
126
end
125
127
end
126
128
0 commit comments