diff --git a/config/samples/xgboost-dist/README.md b/config/samples/xgboost-dist/README.md index 0756a920..83708048 100644 --- a/config/samples/xgboost-dist/README.md +++ b/config/samples/xgboost-dist/README.md @@ -25,31 +25,57 @@ The following files are available to setup distributed XGBoost computation runti To store the model in OSS: -* xgboostjob_v1alpha1_iris_train.yaml -* xgboostjob_v1alpha1_iris_predict.yaml +* xgboostjob_v1alpha1_iris_train_oss.yaml +* xgboostjob_v1alpha1_iris_predict_oss.yaml + +To store the model in GCP: +* xgboostjob_v1alpha1_iris_train_gcp.yaml +* xgboostjob_v1alpha1_iris_predict_gcp.yaml To store the model in local path: * xgboostjob_v1alpha1_iris_train_local.yaml * xgboostjob_v1alpha1_iris_predict_local.yaml -For training jobs in OSS , you could configure xgboostjob_v1alpha1_iris_train.yaml and xgboostjob_v1alpha1_iris_predict.yaml +**Configure OSS parameter** +For training jobs in OSS , you could configure xgboostjob_v1alpha1_iris_train_oss.yaml and xgboostjob_v1alpha1_iris_predict_oss.yaml Note, we use [OSS](https://www.alibabacloud.com/product/oss) to store the trained model, -thus, you need to specify the OSS parameter in the yaml file. Therefore, remember to fill the OSS parameter in xgboostjob_v1alpha1_iris_train.yaml and xgboostjob_v1alpha1_iris_predict.yaml file. +thus, you need to specify the OSS parameter in the yaml file. Therefore, remember to fill the OSS parameter in xgboostjob_v1alpha1_iris_train_oss.yaml and xgboostjob_v1alpha1_iris_predict_oss.yaml file. The oss parameter includes the account information such as access_id, access_key, access_bucket and endpoint. For Eg: --oss_param=endpoint:http://oss-ap-south-1.aliyuncs.com,access_id:XXXXXXXXXXX,access_key:XXXXXXXXXXXXXXXXXXX,access_bucket:XXXXXX -Similarly, xgboostjob_v1alpha1_iris_predict.yaml is used to configure XGBoost job batch prediction. +Similarly, xgboostjob_v1alpha1_iris_predict_oss.yaml is used to configure XGBoost job batch prediction. + +**Configure GCP parameter** +For training jobs in GCP , you could configure xgboostjob_v1alpha1_iris_train_gcp.yaml and xgboostjob_v1alpha1_iris_predict_gcp.yaml +Note, we use [GCP](https://cloud.google.com/) to store the trained model, +thus, you need to specify the GCP parameter in the yaml file. Therefore, remember to fill the GCP parameter in xgboostjob_v1alpha1_iris_train_gcp.yaml and xgboostjob_v1alpha1_iris_predict_gcp.yaml file. +The gcp parameter includes the account information such as type, client_id, client_email,private_key_id,private_key and access_bucket. +For Eg: +--gcp_param=type:XXXXXXX,client_id:XXXXXXXX,client_email:XXXXXXXXXX@gmail.com,private_key_id: XXXXXXXXXXXXX,private_key:XXXXXXXXXXXXXXX, access_bucket:XXXXXX +Similarly, xgboostjob_v1alpha1_iris_predict_gcp.yaml is used to configure XGBoost job batch prediction. -**Start the distributed XGBoost train to store the model in OSS** +**Start the distributed XGBoost train to store the model in cloud** + +If you use OSS +``` +kubectl create -f xgboostjob_v1alpha1_iris_train_oss.yaml ``` -kubectl create -f xgboostjob_v1alpha1_iris_train.yaml +If you use GCP +``` +kubectl create -f xgboostjob_v1alpha1_iris_train_gcp.yaml ``` **Look at the train job status** + +If you use OSS +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-train-oss + ``` +If you use GCP ``` - kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-train + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-train-gcp ``` Here is a sample output when the job is finished. The output log like this ``` @@ -154,14 +180,25 @@ Events: Normal XGBoostJobSucceeded 47s xgboostjob-operator XGBoostJob xgboost-dist-iris-test is successfully completed. ``` -**Start the distributed XGBoost job predict** -```shell -kubectl create -f xgboostjob_v1alpha1_iris_predict.yaml +**Start the distributed XGBoost job predict in cloud** + +If you use OSS +``` +kubectl create -f xgboostjob_v1alpha1_iris_predict_oss.yaml +``` +If you use GCP +``` +kubectl create -f xgboostjob_v1alpha1_iris_predict_gcp.yaml ``` **Look at the batch predict job status** +If you use OSS +``` + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-predict-oss + ``` +If you use GCP ``` - kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-predict + kubectl get -o yaml XGBoostJob/xgboost-dist-iris-test-predict-gcp ``` Here is a sample output when the job is finished. The output log like this ``` diff --git a/config/samples/xgboost-dist/main.py b/config/samples/xgboost-dist/main.py index 19311911..e60a0c24 100644 --- a/config/samples/xgboost-dist/main.py +++ b/config/samples/xgboost-dist/main.py @@ -21,10 +21,10 @@ def main(args): model_storage_type = args.model_storage_type - if (model_storage_type == "local" or model_storage_type == "oss"): + if (model_storage_type == "local" or model_storage_type == "oss" or model_storage_type == 'gcp'): print ( "The storage type is " + model_storage_type) else: - raise Exception("Only supports storage types like local and OSS") + raise Exception("Only supports storage types like local, OSS and GCP") if args.job_type == "Predict": logging.info("starting the predict job") @@ -60,7 +60,6 @@ def main(args): parser.add_argument( '--n_estimators', help='Number of trees in the model', - type=int, default=1000 ) parser.add_argument( @@ -71,6 +70,7 @@ def main(args): parser.add_argument( '--early_stopping_rounds', help='XGBoost argument for stopping early', + type=int, default=50 ) parser.add_argument( @@ -85,7 +85,11 @@ def main(args): ) parser.add_argument( '--oss_param', - help='oss parameter if you choose the model storage as OSS type', + help='oss parameter if you choose the model storage as OSS type' + ) + parser.add_argument( + '--gcp_param', + help='gcp parameter if you choose the model storage as GCP type' ) logging.basicConfig(format='%(message)s') diff --git a/config/samples/xgboost-dist/requirements.txt b/config/samples/xgboost-dist/requirements.txt index 60841a31..70f9dda7 100644 --- a/config/samples/xgboost-dist/requirements.txt +++ b/config/samples/xgboost-dist/requirements.txt @@ -6,4 +6,6 @@ scipy>=1.1.0 joblib>=0.13.2 scikit-learn>=0.20 oss2>=2.7.0 -pandas>=0.24.2 \ No newline at end of file +google-cloud-storage>=1.28.1 +pandas>=0.24.2 +oauth2client>=2.0 diff --git a/config/samples/xgboost-dist/utils.py b/config/samples/xgboost-dist/utils.py index 283af8ba..100732ce 100644 --- a/config/samples/xgboost-dist/utils.py +++ b/config/samples/xgboost-dist/utils.py @@ -15,6 +15,8 @@ import xgboost as xgb import os import tempfile +from googel.cloud import storage +from oauth2client.service_account import ServiceAccountCredentials import oss2 import json import pandas as pd @@ -59,7 +61,7 @@ def read_train_data(rank, num_workers, path): y = iris.target start, end = get_range_data(len(x), rank, num_workers) - x = x[start:end, :] + x = x[start:end] y = y[start:end] x = pd.DataFrame(x) @@ -87,7 +89,7 @@ def read_predict_data(rank, num_workers, path): y = iris.target start, end = get_range_data(len(x), rank, num_workers) - x = x[start:end, :] + x = x[start:end] y = y[start:end] x = pd.DataFrame(x) y = pd.DataFrame(y) @@ -113,7 +115,7 @@ def get_range_data(num_row, rank, num_workers): x_start = rank * num_per_partition x_end = (rank + 1) * num_per_partition - if x_end > num_row: + if x_end > num_row or (rank==num_workers-1 and x_end< num_row): x_end = num_row return x_start, x_end @@ -140,10 +142,18 @@ def dump_model(model, type, model_path, args): oss_param = parse_parameters(args.oss_param, ",", ":") if oss_param is None: raise Exception("Please config oss parameter to store model") - + return False oss_param['path'] = args.model_path dump_model_to_oss(oss_param, model) logging.info("Dump model into oss place %s", args.model_path) + elif type == 'gcp': + gcp_param = parse_parameters(args.gcp_param, ',',':') + if gcp_param is None: + raise Exception('Please config gcp parameter to store model') + return False + gcp_param['path'] = args.model_path + dump_model_to_gcp(gcp_param, model) + logging.info('Dump model into gcp place %s', args.model_path) return True @@ -171,6 +181,14 @@ def read_model(type, model_path, args): model = read_model_from_oss(oss_param) logging.info("read model from oss place %s", model_path) + elif type == 'gcp': + gcp_param = parse_parameters(args.gcp_param,',',':') + if gcp_param is None: + raise Exception('Please config gcp to read model') + return False + gcp_param['path'] = args.model_path + model = read_model_from_gcp(args.gcp_param) + logging.info('read model from gcp place %s', model_path) return model @@ -189,7 +207,7 @@ def dump_model_to_oss(oss_parameters, booster): 'feature_importance.json') oss_path = oss_parameters['path'] - logger.info('---- export model ----') + logger.info('---- export model to OSS----') booster.save_model(model_fname) booster.dump_model(text_model_fname) # format output model fscore_dict = booster.get_fscore() @@ -208,6 +226,39 @@ def dump_model_to_oss(oss_parameters, booster): upload_oss(oss_parameters, model_fname, aux_path) upload_oss(oss_parameters, text_model_fname, aux_path) upload_oss(oss_parameters, feature_importance, aux_path) + logger.info('---- model uploaded to OSS successfully!----') + else: + raise Exception("fail to generate model") + return False + + return True +def dump_model_to_gcp(gcp_parameters,booster): + model_fname = os.path.join(tempfile.mkdtemp(), 'model') + text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text') + feature_importance = os.path.join(tempfile.mkdtemp(), + 'feature_importance.json') + + gcp_path = gcp_parameters['path'] + logger.info('---- export model to GCP----') + booster.save_model(model_fname) + booster.dump_model(text_model_fname) + fscore_dict = booster.get_fscore() + with open(feature_importance, 'w') as file: + file.write(json.dumps(fscore_dict)) + logger.info('---- chief dump model successfully!') + + if os.path.exists(model_fname): + logger.info('---- Upload Model start...') + + while gcp_path[-1] == '/': + gcp_path = gcp_path[:-1] + + upload_gcp(gcp_parameters, model_fname, gcp_path) + aux_path = gcp_path + '_dir/' + upload_gcp(gcp_parameters, model_fname, aux_path) + upload_gcp(gcp_parameters, text_model_fname, aux_path) + upload_gcp(gcp_parameters, feature_importance, aux_path) + logger.info('---- model uploaded to GCP successfully!----') else: raise Exception("fail to generate model") return False @@ -237,6 +288,25 @@ def upload_oss(kw, local_file, oss_path): except Exception(): raise ValueError('upload %s to %s failed' % (os.path.abspath(local_file), oss_path)) +def upload_gcp(kw, local_file, gcp_path): + if gcp_path[-1] == '/': + gcp_path = '%s%s' % (gcp_path, os.path.basename(local_file)) + credentials_dict = { + 'type': 'service_account', + 'client_id': kw['client_id'], + 'client_email': kw['client_email'], + 'private_key_id':kw['private_key_id'], + 'private_key': kw['private_key'], + } + credentials=ServiceAccountCredentials.from_json_keyfile_dict( + credentials_dict + ) + client = storage.Client(credentials=credentials) + bucket=storage.get_bucket(kw['access_bucket']) + blob=bucket.blob(gcp_path) + blob.upload_from_filename(local_file) + + def read_model_from_oss(kw): @@ -263,7 +333,29 @@ def read_model_from_oss(kw): bst.load_model(temp_model_fname) return bst - +def read_model_from_gcp(kw): + credentials_dict = { + 'type': 'service_account', + 'client_id': kw['client_id'], + 'client_email': kw['client_email'], + 'private_key_id':kw['private_key_id'], + 'private_key': kw['private_key'], + } + credentials=ServiceAccountCredentials.from_json_keyfile_dict( + credentials_dict + ) + client = storage.Client(credentials=credentials) + bucket=storage.get_bucket(kw['access_bucket']) + gcp_path = kw["path"] + blob = bucket.blob(gcp_path) + temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model') + try: + blob.download_to_filename(temp_model_fname) + logger.info("success to load model from gcp %s", gcp_path) + except Exception as e: + logging.error("fail to load model: " + e) + raise Exception("fail to load model from gcp %s", gcp_path) + def parse_parameters(input, splitter_between, splitter_in): """ diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_gcp.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_gcp.yaml new file mode 100644 index 00000000..9f602530 --- /dev/null +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_gcp.yaml @@ -0,0 +1,44 @@ +apiVersion: "xgboostjob.kubeflow.org/v1alpha1" +kind: "XGBoostJob" +metadata: + name: "xgboost-dist-iris-test-predict-gcp" +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/xfate123/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=gcp + - --gcp_param=unknown + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/xfate123/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Predict + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=gcp + - --gcp_param=unknown diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml index d19112cd..f3286ac5 100644 --- a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_local.yaml @@ -17,7 +17,7 @@ spec: claimName: xgboostlocal containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 volumeMounts: - name: task-pv-storage mountPath: /tmp/xgboost_model @@ -42,7 +42,7 @@ spec: claimName: xgboostlocal containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 volumeMounts: - name: task-pv-storage mountPath: /tmp/xgboost_model diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_oss.yaml similarity index 86% rename from config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml rename to config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_oss.yaml index 3f3391ac..58bfdbb2 100644 --- a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict.yaml +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_predict_oss.yaml @@ -1,7 +1,7 @@ apiVersion: "xgboostjob.kubeflow.org/v1alpha1" kind: "XGBoostJob" metadata: - name: "xgboost-dist-iris-test-predict" + name: "xgboost-dist-iris-test-predict-oss" spec: xgbReplicaSpecs: Master: @@ -13,7 +13,7 @@ spec: spec: containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 ports: - containerPort: 9991 name: xgboostjob-port @@ -32,7 +32,7 @@ spec: spec: containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 ports: - containerPort: 9991 name: xgboostjob-port diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_gcp.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_gcp.yaml new file mode 100644 index 00000000..3b9f66da --- /dev/null +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_gcp.yaml @@ -0,0 +1,47 @@ +apiVersion: "xgboostjob.kubeflow.org/v1alpha1" +kind: "XGBoostJob" +metadata: + name: "xgboost-dist-iris-test-train-gcp" +spec: + xgbReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/xfate123/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter=objective:multi:softprob,num_class:3 + - --n_estimators=10 + - --learning_rate=0.1 + - --model_path=autoAI/xgb-opt/2 + - --model_storage_type=gcp + - --gcp_param=unknown + Worker: + replicas: 2 + restartPolicy: ExitCode + template: + apiVersion: v1 + kind: Pod + spec: + containers: + - name: xgboostjob + image: docker.io/xfate123/xgboost-dist-iris:1.1 + ports: + - containerPort: 9991 + name: xgboostjob-port + imagePullPolicy: Always + args: + - --job_type=Train + - --xgboost_parameter="objective:multi:softprob,num_class:3" + - --n_estimators=10 + - --learning_rate=0.1 diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml index 2d96c725..fe35a92e 100644 --- a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_local.yaml @@ -17,7 +17,7 @@ spec: claimName: xgboostlocal containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 volumeMounts: - name: task-pv-storage mountPath: /tmp/xgboost_model @@ -45,7 +45,7 @@ spec: claimName: xgboostlocal containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 volumeMounts: - name: task-pv-storage mountPath: /tmp/xgboost_model diff --git a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_oss.yaml similarity index 88% rename from config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml rename to config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_oss.yaml index d08c3242..e4e4b6b6 100644 --- a/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train.yaml +++ b/config/samples/xgboost-dist/xgboostjob_v1alpha1_iris_train_oss.yaml @@ -1,7 +1,7 @@ apiVersion: "xgboostjob.kubeflow.org/v1alpha1" kind: "XGBoostJob" metadata: - name: "xgboost-dist-iris-test-train" + name: "xgboost-dist-iris-test-train-oss" spec: xgbReplicaSpecs: Master: @@ -13,7 +13,7 @@ spec: spec: containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 ports: - containerPort: 9991 name: xgboostjob-port @@ -35,7 +35,7 @@ spec: spec: containers: - name: xgboostjob - image: docker.io/merlintang/xgboost-dist-iris:1.1 + image: docker.io/xfate123/xgboost-dist-iris:1.1 ports: - containerPort: 9991 name: xgboostjob-port