1
- # License Info: https://github.com/rapidsai/notebooks/blob/master/LICENSE
2
1
import numpy as np
3
2
import datetime
4
3
import dask_xgboost as dxgb_gpu
5
4
import dask
6
5
import dask_cudf
6
+ from dask_cuda import LocalCUDACluster
7
7
from dask .delayed import delayed
8
8
from dask .distributed import Client , wait
9
9
import xgboost as xgb
15
15
import os
16
16
import argparse
17
17
18
- parser = argparse .ArgumentParser ("rapidssample" )
19
- parser .add_argument ("--data_dir" , type = str , help = "location of data" )
20
- parser .add_argument ("--num_gpu" , type = int , help = "Number of GPUs to use" , default = 1 )
21
- parser .add_argument ("--part_count" , type = int , help = "Number of data files to train against" , default = 2 )
22
- parser .add_argument ("--end_year" , type = int , help = "Year to end the data load" , default = 2000 )
23
- parser .add_argument ("--cpu_predictor" , type = str , help = "Flag to use CPU for prediction" , default = 'False' )
24
- parser .add_argument ('-f' , type = str , default = '' ) # added for notebook execution scenarios
25
- args = parser .parse_args ()
26
- data_dir = args .data_dir
27
- num_gpu = args .num_gpu
28
- part_count = args .part_count
29
- end_year = args .end_year
30
- cpu_predictor = args .cpu_predictor .lower () in ('yes' , 'true' , 't' , 'y' , '1' )
31
-
32
- print ('data_dir = {0}' .format (data_dir ))
33
- print ('num_gpu = {0}' .format (num_gpu ))
34
- print ('part_count = {0}' .format (part_count ))
35
- part_count = part_count + 1 # adding one because the usage below is not inclusive
36
- print ('end_year = {0}' .format (end_year ))
37
- print ('cpu_predictor = {0}' .format (cpu_predictor ))
38
-
39
- import subprocess
40
-
41
- cmd = "hostname --all-ip-addresses"
42
- process = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE )
43
- output , error = process .communicate ()
44
- IPADDR = str (output .decode ()).split ()[0 ]
45
- print ('IPADDR is {0}' .format (IPADDR ))
46
-
47
- cmd = "/rapids/notebooks/utils/dask-setup.sh 0"
48
- process = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE )
49
- output , error = process .communicate ()
50
-
51
- cmd = "/rapids/notebooks/utils/dask-setup.sh rapids " + str (num_gpu ) + " 8786 8787 8790 " + str (IPADDR ) + " MASTER"
52
- process = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE )
53
- output , error = process .communicate ()
54
-
55
- print (output .decode ())
56
-
57
- import dask
58
- from dask .delayed import delayed
59
- from dask .distributed import Client , wait
60
-
61
- _client = IPADDR + str (":8786" )
62
-
63
- client = dask .distributed .Client (_client )
64
-
65
18
def initialize_rmm_pool ():
66
19
from librmm_cffi import librmm_config as rmm_cfg
67
20
@@ -81,15 +34,17 @@ def run_dask_task(func, **kwargs):
81
34
task = func (** kwargs )
82
35
return task
83
36
84
- def process_quarter_gpu (year = 2000 , quarter = 1 , perf_file = "" ):
37
+ def process_quarter_gpu (client , col_names_path , acq_data_path , year = 2000 , quarter = 1 , perf_file = "" ):
38
+ dask_client = client
85
39
ml_arrays = run_dask_task (delayed (run_gpu_workflow ),
40
+ col_path = col_names_path ,
41
+ acq_path = acq_data_path ,
86
42
quarter = quarter ,
87
43
year = year ,
88
44
perf_file = perf_file )
89
- return client .compute (ml_arrays ,
45
+ return dask_client .compute (ml_arrays ,
90
46
optimize_graph = False ,
91
- fifo_timeout = "0ms"
92
- )
47
+ fifo_timeout = "0ms" )
93
48
94
49
def null_workaround (df , ** kwargs ):
95
50
for column , data_type in df .dtypes .items ():
@@ -99,9 +54,9 @@ def null_workaround(df, **kwargs):
99
54
df [column ] = df [column ].fillna (- 1 )
100
55
return df
101
56
102
- def run_gpu_workflow (quarter = 1 , year = 2000 , perf_file = "" , ** kwargs ):
103
- names = gpu_load_names ()
104
- acq_gdf = gpu_load_acquisition_csv (acquisition_path = acq_data_path + "/Acquisition_"
57
+ def run_gpu_workflow (col_path , acq_path , quarter = 1 , year = 2000 , perf_file = "" , ** kwargs ):
58
+ names = gpu_load_names (col_path = col_path )
59
+ acq_gdf = gpu_load_acquisition_csv (acquisition_path = acq_path + "/Acquisition_"
105
60
+ str (year ) + "Q" + str (quarter ) + ".txt" )
106
61
acq_gdf = acq_gdf .merge (names , how = 'left' , on = ['seller_name' ])
107
62
acq_gdf .drop_column ('seller_name' )
@@ -231,7 +186,7 @@ def gpu_load_acquisition_csv(acquisition_path, **kwargs):
231
186
232
187
return cudf .read_csv (acquisition_path , names = cols , delimiter = '|' , dtype = list (dtypes .values ()), skiprows = 1 )
233
188
234
- def gpu_load_names (** kwargs ):
189
+ def gpu_load_names (col_path ):
235
190
""" Loads names used for renaming the banks
236
191
237
192
Returns
@@ -248,7 +203,7 @@ def gpu_load_names(**kwargs):
248
203
("new" , "category" ),
249
204
])
250
205
251
- return cudf .read_csv (col_names_path , names = cols , delimiter = '|' , dtype = list (dtypes .values ()), skiprows = 1 )
206
+ return cudf .read_csv (col_path , names = cols , delimiter = '|' , dtype = list (dtypes .values ()), skiprows = 1 )
252
207
253
208
def create_ever_features (gdf , ** kwargs ):
254
209
everdf = gdf [['loan_id' , 'current_loan_delinquency_status' ]]
@@ -384,117 +339,157 @@ def last_mile_cleaning(df, **kwargs):
384
339
df ['delinquency_12' ] = df ['delinquency_12' ].fillna (False ).astype ('int32' )
385
340
for column in df .columns :
386
341
df [column ] = df [column ].fillna (- 1 )
387
- return df .to_arrow (index = False )
342
+ return df .to_arrow (preserve_index = False )
343
+
344
+ def main ():
345
+ #print('XGBOOST_BUILD_DOC is ' + os.environ['XGBOOST_BUILD_DOC'])
346
+ parser = argparse .ArgumentParser ("rapidssample" )
347
+ parser .add_argument ("--data_dir" , type = str , help = "location of data" )
348
+ parser .add_argument ("--num_gpu" , type = int , help = "Number of GPUs to use" , default = 1 )
349
+ parser .add_argument ("--part_count" , type = int , help = "Number of data files to train against" , default = 2 )
350
+ parser .add_argument ("--end_year" , type = int , help = "Year to end the data load" , default = 2000 )
351
+ parser .add_argument ("--cpu_predictor" , type = str , help = "Flag to use CPU for prediction" , default = 'False' )
352
+ parser .add_argument ('-f' , type = str , default = '' ) # added for notebook execution scenarios
353
+ args = parser .parse_args ()
354
+ data_dir = args .data_dir
355
+ num_gpu = args .num_gpu
356
+ part_count = args .part_count
357
+ end_year = args .end_year
358
+ cpu_predictor = args .cpu_predictor .lower () in ('yes' , 'true' , 't' , 'y' , '1' )
359
+
360
+ if cpu_predictor :
361
+ print ('Training with CPUs require num gpu = 1' )
362
+ num_gpu = 1
363
+
364
+ print ('data_dir = {0}' .format (data_dir ))
365
+ print ('num_gpu = {0}' .format (num_gpu ))
366
+ print ('part_count = {0}' .format (part_count ))
367
+ #part_count = part_count + 1 # adding one because the usage below is not inclusive
368
+ print ('end_year = {0}' .format (end_year ))
369
+ print ('cpu_predictor = {0}' .format (cpu_predictor ))
370
+
371
+ import subprocess
388
372
373
+ cmd = "hostname --all-ip-addresses"
374
+ process = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE )
375
+ output , error = process .communicate ()
376
+ IPADDR = str (output .decode ()).split ()[0 ]
377
+
378
+ cluster = LocalCUDACluster (ip = IPADDR ,n_workers = num_gpu )
379
+ client = Client (cluster )
380
+ client
381
+ print (client .ncores ())
389
382
390
383
# to download data for this notebook, visit https://rapidsai.github.io/demos/datasets/mortgage-data and update the following paths accordingly
391
- acq_data_path = "{0}/acq" .format (data_dir ) #"/rapids/data/mortgage/acq"
392
- perf_data_path = "{0}/perf" .format (data_dir ) #"/rapids/data/mortgage/perf"
393
- col_names_path = "{0}/names.csv" .format (data_dir ) # "/rapids/data/mortgage/names.csv"
394
- start_year = 2000
384
+ acq_data_path = "{0}/acq" .format (data_dir ) #"/rapids/data/mortgage/acq"
385
+ perf_data_path = "{0}/perf" .format (data_dir ) #"/rapids/data/mortgage/perf"
386
+ col_names_path = "{0}/names.csv" .format (data_dir ) # "/rapids/data/mortgage/names.csv"
387
+ start_year = 2000
395
388
#end_year = 2000 # end_year is inclusive -- converted to parameter
396
389
#part_count = 2 # the number of data files to train against -- converted to parameter
397
390
398
- client .run (initialize_rmm_pool )
399
-
391
+ client .run (initialize_rmm_pool )
392
+ client
393
+ print (client .ncores ())
400
394
# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.
401
395
# This can be optimized to avoid calculating the dropped features.
402
- print ("Reading ..." )
403
- t1 = datetime .datetime .now ()
404
- gpu_dfs = []
405
- gpu_time = 0
406
- quarter = 1
407
- year = start_year
408
- count = 0
409
- while year <= end_year :
410
- for file in glob (os .path .join (perf_data_path + "/Performance_" + str (year ) + "Q" + str (quarter ) + "*" )):
411
- if count < part_count :
412
- gpu_dfs .append (process_quarter_gpu (year = year , quarter = quarter , perf_file = file ))
413
- count += 1
414
- print ('file: {0}' .format (file ))
415
- print ('count: {0}' .format (count ))
416
- quarter += 1
417
- if quarter == 5 :
418
- year += 1
419
- quarter = 1
396
+ print ("Reading ..." )
397
+ t1 = datetime .datetime .now ()
398
+ gpu_dfs = []
399
+ gpu_time = 0
400
+ quarter = 1
401
+ year = start_year
402
+ count = 0
403
+ while year <= end_year :
404
+ for file in glob (os .path .join (perf_data_path + "/Performance_" + str (year ) + "Q" + str (quarter ) + "*" )):
405
+ if count < part_count :
406
+ gpu_dfs .append (process_quarter_gpu (client , col_names_path , acq_data_path , year = year , quarter = quarter , perf_file = file ))
407
+ count += 1
408
+ print ('file: {0}' .format (file ))
409
+ print ('count: {0}' .format (count ))
410
+ quarter += 1
411
+ if quarter == 5 :
412
+ year += 1
413
+ quarter = 1
414
+
415
+ wait (gpu_dfs )
416
+ t2 = datetime .datetime .now ()
417
+ print ("Reading time ..." )
418
+ print (t2 - t1 )
419
+ print ('len(gpu_dfs) is {0}' .format (len (gpu_dfs )))
420
+
421
+ client .run (cudf ._gdf .rmm_finalize )
422
+ client .run (initialize_rmm_no_pool )
423
+ client
424
+ print (client .ncores ())
425
+ dxgb_gpu_params = {
426
+ 'nround' : 100 ,
427
+ 'max_depth' : 8 ,
428
+ 'max_leaves' : 2 ** 8 ,
429
+ 'alpha' : 0.9 ,
430
+ 'eta' : 0.1 ,
431
+ 'gamma' : 0.1 ,
432
+ 'learning_rate' : 0.1 ,
433
+ 'subsample' : 1 ,
434
+ 'reg_lambda' : 1 ,
435
+ 'scale_pos_weight' : 2 ,
436
+ 'min_child_weight' : 30 ,
437
+ 'tree_method' : 'gpu_hist' ,
438
+ 'n_gpus' : 1 ,
439
+ 'distributed_dask' : True ,
440
+ 'loss' : 'ls' ,
441
+ 'objective' : 'gpu:reg:linear' ,
442
+ 'max_features' : 'auto' ,
443
+ 'criterion' : 'friedman_mse' ,
444
+ 'grow_policy' : 'lossguide' ,
445
+ 'verbose' : True
446
+ }
447
+
448
+ if cpu_predictor :
449
+ print ('Training using CPUs' )
450
+ dxgb_gpu_params ['predictor' ] = 'cpu_predictor'
451
+ dxgb_gpu_params ['tree_method' ] = 'hist'
452
+ dxgb_gpu_params ['objective' ] = 'reg:linear'
420
453
421
- wait (gpu_dfs )
422
- t2 = datetime .datetime .now ()
423
- print ("Reading time ..." )
424
- print (t2 - t1 )
425
- print ('len(gpu_dfs) is {0}' .format (len (gpu_dfs )))
426
-
427
- client .run (cudf ._gdf .rmm_finalize )
428
- client .run (initialize_rmm_no_pool )
429
-
430
- dxgb_gpu_params = {
431
- 'nround' : 100 ,
432
- 'max_depth' : 8 ,
433
- 'max_leaves' : 2 ** 8 ,
434
- 'alpha' : 0.9 ,
435
- 'eta' : 0.1 ,
436
- 'gamma' : 0.1 ,
437
- 'learning_rate' : 0.1 ,
438
- 'subsample' : 1 ,
439
- 'reg_lambda' : 1 ,
440
- 'scale_pos_weight' : 2 ,
441
- 'min_child_weight' : 30 ,
442
- 'tree_method' : 'gpu_hist' ,
443
- 'n_gpus' : 1 ,
444
- 'distributed_dask' : True ,
445
- 'loss' : 'ls' ,
446
- 'objective' : 'gpu:reg:linear' ,
447
- 'max_features' : 'auto' ,
448
- 'criterion' : 'friedman_mse' ,
449
- 'grow_policy' : 'lossguide' ,
450
- 'verbose' : True
451
- }
452
-
453
- if cpu_predictor :
454
- print ('Training using CPUs' )
455
- dxgb_gpu_params ['predictor' ] = 'cpu_predictor'
456
- dxgb_gpu_params ['tree_method' ] = 'hist'
457
- dxgb_gpu_params ['objective' ] = 'reg:linear'
454
+ else :
455
+ print ('Training using GPUs' )
458
456
459
- else :
460
- print ('Training using GPUs' )
461
-
462
- print ('Training parameters are {0}' .format (dxgb_gpu_params ))
463
-
464
- gpu_dfs = [delayed (DataFrame .from_arrow )(gpu_df ) for gpu_df in gpu_dfs [:part_count ]]
457
+ print ('Training parameters are {0}' .format (dxgb_gpu_params ))
465
458
466
- gpu_dfs = [gpu_df for gpu_df in gpu_dfs ]
467
-
468
- wait (gpu_dfs )
469
- tmp_map = [(gpu_df , list (client .who_has (gpu_df ).values ())[0 ]) for gpu_df in gpu_dfs ]
470
- new_map = {}
471
- for key , value in tmp_map :
472
- if value not in new_map :
473
- new_map [value ] = [key ]
474
- else :
475
- new_map [value ].append (key )
476
-
477
- del (tmp_map )
478
- gpu_dfs = []
479
- for list_delayed in new_map .values ():
480
- gpu_dfs .append (delayed (cudf .concat )(list_delayed ))
481
-
482
- del (new_map )
483
- gpu_dfs = [(gpu_df [['delinquency_12' ]], gpu_df [delayed (list )(gpu_df .columns .difference (['delinquency_12' ]))]) for gpu_df in gpu_dfs ]
484
- gpu_dfs = [(gpu_df [0 ].persist (), gpu_df [1 ].persist ()) for gpu_df in gpu_dfs ]
485
- gpu_dfs = [dask .delayed (xgb .DMatrix )(gpu_df [1 ], gpu_df [0 ]) for gpu_df in gpu_dfs ]
486
- gpu_dfs = [gpu_df .persist () for gpu_df in gpu_dfs ]
487
-
488
- gc .collect ()
489
- labels = None
490
-
491
- print ('str(gpu_dfs) is {0}' .format (str (gpu_dfs )))
492
-
493
- wait (gpu_dfs )
494
- t1 = datetime .datetime .now ()
495
- bst = dxgb_gpu .train (client , dxgb_gpu_params , gpu_dfs , labels , num_boost_round = dxgb_gpu_params ['nround' ])
496
- t2 = datetime .datetime .now ()
497
- print ("Training time ..." )
498
- print (t2 - t1 )
499
- print ('str(bst) is {0}' .format (str (bst )))
500
- print ('Exiting script' )
459
+ gpu_dfs = [delayed (DataFrame .from_arrow )(gpu_df ) for gpu_df in gpu_dfs [:part_count ]]
460
+ gpu_dfs = [gpu_df for gpu_df in gpu_dfs ]
461
+ wait (gpu_dfs )
462
+
463
+ tmp_map = [(gpu_df , list (client .who_has (gpu_df ).values ())[0 ]) for gpu_df in gpu_dfs ]
464
+ new_map = {}
465
+ for key , value in tmp_map :
466
+ if value not in new_map :
467
+ new_map [value ] = [key ]
468
+ else :
469
+ new_map [value ].append (key )
470
+
471
+ del (tmp_map )
472
+ gpu_dfs = []
473
+ for list_delayed in new_map .values ():
474
+ gpu_dfs .append (delayed (cudf .concat )(list_delayed ))
475
+
476
+ del (new_map )
477
+ gpu_dfs = [(gpu_df [['delinquency_12' ]], gpu_df [delayed (list )(gpu_df .columns .difference (['delinquency_12' ]))]) for gpu_df in gpu_dfs ]
478
+ gpu_dfs = [(gpu_df [0 ].persist (), gpu_df [1 ].persist ()) for gpu_df in gpu_dfs ]
479
+
480
+ gpu_dfs = [dask .delayed (xgb .DMatrix )(gpu_df [1 ], gpu_df [0 ]) for gpu_df in gpu_dfs ]
481
+ gpu_dfs = [gpu_df .persist () for gpu_df in gpu_dfs ]
482
+ gc .collect ()
483
+ wait (gpu_dfs )
484
+
485
+ labels = None
486
+ t1 = datetime .datetime .now ()
487
+ bst = dxgb_gpu .train (client , dxgb_gpu_params , gpu_dfs , labels , num_boost_round = dxgb_gpu_params ['nround' ])
488
+ t2 = datetime .datetime .now ()
489
+ print ("Training time ..." )
490
+ print (t2 - t1 )
491
+ print ('str(bst) is {0}' .format (str (bst )))
492
+ print ('Exiting script' )
493
+
494
+ if __name__ == '__main__' :
495
+ main ()
0 commit comments