-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path050.performance.Rmd
365 lines (291 loc) · 15.5 KB
/
050.performance.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
## Measuring Cluster Performance ##
### Introduction of the Single Run of the Performance ###
We'll demonstrate the computation ability of `RHIPE` package with a performance test by using logistic
regression.
The basic idea is firstly, we wish to generate an observation sample with a size of N, V number
of variables, which means there are $V-1$ explanatory variables. In summary, we wish to generate a
dataframe with N rows and V collumns. Secondly, we want to use the logistic regression method to
analyze the dataframe. However, when the data size is quite large, it will be impractical to generate
and analyse the whole data set in terms of the time cosuming and the memory limitation.
Instead, we will use `RHIPE` package to generate subsets first and then use logistic regression
method to analyze each subset by R function `glm.fit`. The subset observation sample size is M and
the number of subsets is R. As a result, $N = MR$. Each subset contains a sub-dataframe with M rows
and V collumns. We will use R function `system.time` to measure elapsed time. We introduce "n", "m",
and "v" here, where $n = log2(N)$, $m = log2(M)$, and $v = log2(V)$.
There are two types of elapsed-time computation. The subsets are stored on the HDFS as R objects.
The first computation type is "O", the elapsed time to read the subsets from the HDFS and make
them available to `glm.fit` in memory as an R objects. The other type, "L", starts when "O" ends
and it consists of `glm.fit` computations on the subsets by `map`, plus `reduce gathering the
subset estimates and computing the means. However, we cannot measure "L" directly. So we measure
"O" in one run and $T = O + L$ in another.
In this example, we will show how to do a sing run of the performance test, which means we only pick
one possible value of "n", "m" and "v" to see how fast it is to go.
- n : 27
- m : 12
- v : 5
- rep : 1
### A Small, Slow and Old Cluster ###
The cluster here is a very small and old one with two nodes, each a Dell 1950. One runs NameNode and
the other runs JobTracker. Both run Hadoop DataNode and TaskTracker, and each has
- Dual 2.33GHz 4-core Intel(R) Xeon(R) E5410 processors (8 cores)
- 32 GB memory
- 2TB disk in SAS-RAID
- 1 Gbps Ethernet interconnect
Collectively, the cluster has 16 cores, 64GB total memory, 4TB disk. The processors are back to 2009,
which are very old. We will see the computation ablity of `RHIPE` in this small, slow and old cluster.
### Generate Dataset ###
The size for the whole dataset is $2^{27} \cdot 2^5 \cdot 2^3 = 32GB$. We will generate each subset
with the size of $2^{12} \cdot 2^5 \cdot 2^3 = 1MB$
As we mentioned in the housing data example, the first step in a D&R analysis is to choose a
division method and create subsets. In this example, we devide the whole dataset to R sets, where
$log2(R) = 15$. So we will generate 2^15 dataframes with 2^12 rows and 2^5 columns each.
Before we run the map function, we will set up some basic parameters in the front end of R enviroment:
```{r,eval=FALSE,tidy=FALSE}
n <- 27
m <- 12
v <- 5
p <- 2^v - 1
rep <- 1
```
Here we define an R object called `p`, the value of which equals to $2^v - 1$. As introduced in the
last part, $2^v$ is the number of variables, so `p` is the number of the explanatory variables. We
introduce `p` here to make the following codes much easier to write and understand. We also specify
the replicate value `rep` is 1 because we only have a single run of the cluster performance test. But
in your future performance test, you might want to have more replicates for the test, and you can
easily change the value of `rep`, we will talk about it later.
#### Map ####
```{r, eval=FALSE,tidy=FALSE}
map1 <- expression({
for (r in map.values){
set.seed(r)
value <- matrix(c(rnorm(m*p), sample(c(0,1), m, replace=TRUE)), ncol=p+1)
rhcollect(r, value)
}
})
```
In this case, we don't have an existing data which is different from the housing data example.
Creating a dataset by randomly simulation is the part of the `map` job. The input keys and values are
the same by default for the sumulating data step. The keys are numeric numbers 1, 2, ...., R, where
R is the number of the subsets. In our example, $log2(R) = log2(N/M) = 15$. They are the elements of
the list object `map.keys`, which in this case, `map.keys` and `map.values` would be the same. The
output keys from the `map1` function are the same as the input keys. This is defined by the `map1`
function. The output value for each key is a matrix with $2^{13}$ rows and $2^5$ columns. As we have
explained in the housing data example, the output key-value pairs from the `map` function
are also called intermediate key-value pairs. So `rhcollect` emits these intermediate key-value
pairs.
There is no reducer step for the generation part. Next we will write the generated subsets to HDFS.
#### Execution Function ####
```{r eval=FALSE, tidy=FALSE}
dir.dm = "/ln/song273/tmp/multi.factor/dm/n27v5m12"
mr1 <- rhwatch(
map = map1,
input = c(2^(n-m),12),
output = dir.dm,
jobname = dir.dm,
mapred = list(
mapred.task.timeout=0,
mapred.reduce.tasks=0),
parameters = list(m = 2^m, p = p),
readback = FALSE
)
```
We have introduced the `rhwatch` function before. In this case, `rhwatch` function will submit our
MapReduce jobs to finish the data generation part and write the subsets to HDFS. There are three types
of the `input` as Text file input, Sequence file input and lapply input. The first two types of input
have been introduced in the housing data example. And in our performance example, the type of
`input` is lapply input because we are generating numeric data. The argument `input` in our
performance test example is a vector consisting of two elements. The first element specifies the
input key would be from 1 to $2^{(n-m)}$ for `map1` function. Also, it specifies how many tasks you
would like Hadoop MapReduce Framework to run because each task conresponds to one key-value pair.
In our performance test example, each task is to generate a subset with 2^13 rows and 2^5 columns.
The second element of the vector `input` specifies the numbers of mappers you would like to use. So
each mapper would run $2^{(n-m)}/12$ tasks, where `12` is the value of the second element of `input`.
So the value of the second element would definitely affect the performance test and it is worthy to
carry out an experiment to test its effect.
The argument `parameters` passes all the possible values we will need in the Hadoop MapReduce jobs
to HDFS. In our case, we specify the values of `m` and `p` in the front end R, but Hadoop doesn't
have these values in the back end. So the argument `parameter` will packages `m` and `p` as a list
and distribute it to HDFS.
The argument `jobname` is not neccesary to be set. It is the name of the job, which is visible on
the Jobtracker website. If not provided, Hadoop MapReduce uses the default name job_date_time_number
e.g. job_201007281701_0274.
```
[Mon Oct 13 23:00:45 2014] Name:/ln/song273/tmp/multi.factor/dm/n27v5m12 Job: job_201405301308_5317 State: RUNNING Duration: 83.276
URL: http://deneb.stat.purdue.edu:50030/jobdetails.jsp?jobid=job_201405301308_5317
pct numtasks pending running complete killed failed_attempts
map 0.9999999 12 0 12 0 0 0
reduce 0.0000000 0 0 0 0 0 0
killed_attempts
map 0
reduce 0
Waiting 5 seconds
```
Our subsets have been created and are saved on HDFS now. We will use `rhls()` to see more details
about files on HDFS.
```{r eval=FALSE, tidy=FALSE}
rhls("/ln/song273/tmp/multi.factor/dm/n27v5m12")
```
```
permission owner group size modtime file
1 -rw-r--r-- song273 supergroup 0 2014-10-13 23:04 /ln/song273/tmp/multi.factor/dm/n27v5m12/_SUCCESS
2 drwxrwxrwt song273 supergroup 0 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/_logs
3 -rw-r--r-- song273 supergroup 2.674 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00000
4 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00001
5 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00002
6 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00003
7 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00004
8 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00005
9 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00006
10 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00007
11 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00008
12 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00009
13 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00010
14 -rw-r--r-- song273 supergroup 2.666 gb 2014-10-13 22:59 /ln/song273/tmp/multi.factor/dm/n27v5m12/part-m-00011
```
It generates 12 files with ending name "00000" to "00011". Most of their size is 682 Mb. Each file
consists of $2^{15} / 12$ subsets.
### Elapsed Time Measurement ###
As we has talked in the introduction, we will measure the first elapsed time "O" in one run and
measure the whole elapsed time "T" in another run.
#### The First Kind Of Elapsed Time -- **O** ####
```{r eval=FALSE, tidy=FALSE}
timing <- data.frame()
type <- "O"
```
First, we will initialize a data frame called `timing` to store these two different kinds of elapsed
time later and create a character `type` to lable "O" and "T" seperately.
##### Map #####
```{r eval=FALSE, tidy=FALSE}
map2 <- expression({})
```
It is important to notice that `map2` is an expression with no command. That's because we only want
to read the data from HDFS to the front end without other operations and then record the reading time .
##### Execution Function #####
```{r eval=FALSE, tidy=FALSE}
dir.dm <- "/ln/song273/tmp/multi.factor/dm/n27v5m12"
dir.nf <- "/ln/song273/tmp/multi.factor/nf/n27v5m12"
mr2 <- rhwatch(
map = map2,
input = dir.dm,
output = dir.nf,
jobname = dir.dm,
mapred = list(
mapred.reduce.tasks=0,
rhipe_map_buff_size=2^15),
parameters = list(p = p),
noeval = TRUE,
readback = FALSE
)
t <- as.numeric(system.time({rhex(mr2, async=FALSE)})[3])
t <- data.frame(rep=rep,n=n,m=m,v=v,type=type,t=t)
timing <- rbind(timing,t)
```
```
Saving 1 parameter to /tmp/rhipe-temp-params-c677abd27e3b1654c25dccadaa7e3483 (use rhclean to delete all temp files)
```
The `noeval` is another very useful argument for `rhwatch`. By default it is set to be 'FALSE'.
In our example, we set `noeval` equals to 'TRUE', which means the Hadoop MapReduce job will not run,
and `rhwatch` just return an R object that contains all the information required by Rhipe to run a
MapReduce job. Instead, the `rhex` function will submit the MapReduce job to the Hadoop MapReduce
framework.
The `dir.dm` and `dir.nf` specify the `input` and `output` argument of `rhwatch` function.
So the first `t` in the R codes records the first type elapsed time O and the second `t` is a
data frame which saves the value of computation `type`, the value of `n`, `m`, `v`, the number of
replicate `rep`, and the value of first type of elapsed time O. Then we use R function `rbind`
to combine data frame `timing` and data frame `t` by rows. We can take a look at the data frame
`timing`.
```{r eval=FALSE, tidy=FALSE}
timing
```
```
rep n m v type t
1 1 27 12 5 O 253.237
```
So `timing` is a data frame with 1 row and 6 collumns. We will save another type of elapsed time "T"
into this data frame.
#### The Second Kind of Elapsed Time -- **T** ####
```{r eval=FALSE, tidy=FALSE}
type <- "T"
```
We first specify the value of `type` as `T` to identify the second kind of elapsed time "T".
##### Map #####
```{r eval=FALSE, tidy=FALSE}
map3 <- expression({
for (v in map.values) {
value = glm.fit(v[,1:p],v[,p+1],family=binomial())$coef
rhcollect(1, value)
}
})
```
The input to our `map3` is the final output we created in the previous section which the key is from
1 to $2^{15}$ and the input value is a dataframe with $2^{12}$ rows and $2^5$ collumns. For each subset,
we apply the same action : take the coefficient of the logistic regression for the subset. They are
stored in a single row dataframe to be recombined in the reduce step below. This single row dataframe
has `p` collumns. We use the same key for all the subsets so that they wil appear together in the
reduce step. As before, `rhcollect` emits intermediate key-value pairs.
##### Reduce #####
```{r eval=FALSE, tidy=FALSE}
reduce3 <- expression(
pre = {
v = rep(0,p)
nsub = 0
},
reduce = {
v = v + colSums(matrix(unlist(reduce.values), ncol=p, byrow=TRUE))
nsub = nsub + length(reduce.values)
},
post = {
rhcollect(reduce.key, v/nsub)
}
)
```
In the reduce step, we recombine the coefficients from different subset together and sum them up by
collumn using R function `colSums` and and get the estimate of the coefficients by the sample mean.
The input key is the placeholder value 1, which is left unchanged as the output key. The input values
are the single row data frames for each subset, and the output values is the single row data frames
consisting of the mean estimate for the coefficients. In `post`, we use `rhcollect` to emit the final
output so that it will be written to HDFS.
##### Exectution Function #####
```{r eval=FALSE, tidy=FALSE}
dir.dm = "/ln/song273/tmp/multi.factor/dm/n27v5m12"
dir.gf = "/ln/song273/tmp/multi.factor/gf/n27v5m12"
mr3 <- rhwatch(
map = map3,
reduce = reduce3,
input = dir.dm,
output = dir.gf,
mapred = list(
mapred.reduce.tasks=1,
rhipe_map_buff_size=10
),
parameters = list(p=p),
jobname = dir.gf,
noeval = TRUE
)
t <- as.numeric(system.time({rhex(mr3, async=FALSE)})[3])
t = data.frame(rep=rep,n=n,m=m,v=v,type=type,t=t)
timing = rbind(timing,t)
```
```
Saving 2 parameters to /tmp/rhipe-temp-params-48fd3131fbae41b13c97ce71393f8aaa (use rhclean to delete all temp files)
```
After the job completes successfully, we record the whole elapsed time "T" and save it to the R
object `t`. And we save the results to our data frame `timing`. Till now, we have finished the single
run of the cluster performance test and save the result to a data frame called `timing`.
### Result ###
```{r eval=FALSE, tidy=FALSE}
timing
```
```
rep n m v type t
1 1 27 12 5 O 253.237
2 1 27 12 5 T 561.081
```
As we can see from the `timing` dataframe, there are 2 rows and 6 collumns. The last collumn is the
elapsed time `t`. As we can see, the elapsed time "O" we need to read the datasets
from HDFS to the front end is 253.237 seconds which is about 4.2 mins and the whole elapsed time "T"
which consists of the reading time and the analyzing time is 561.081 seconds, which is less than 9.4
mins. That result is impressive! The cluster to run the test here is a very small one with two nodes
and 64 GB memory in total. You can definitely use `RHIPE` computation environment to generalize this
one single run to a full cluster performance test. We will show you the basic idea about how to run a
multi-factor experiment for the cluster performance in the next section.