Skip to content
This repository has been archived by the owner on Feb 1, 2022. It is now read-only.

add gcp storage to xgboost-operator #81

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2240810
add gcp storage to xgboost-operator, still working on in
xfate123 May 16, 2020
ba00fb4
Update utils.py
xfate123 May 16, 2020
f3d8619
Update utils.py
xfate123 May 16, 2020
d904b9c
Update xgboostjob_v1alpha1_iris_predict.yaml
xfate123 May 16, 2020
813e3cc
Update and rename xgboostjob_v1alpha1_iris_predict.yaml to xgboostjob…
xfate123 May 16, 2020
4056365
Rename xgboostjob_v1alpha1_iris_train.yaml to xgboostjob_v1alpha1_iri…
xfate123 May 16, 2020
bae957b
Create xgboostjob_v1alpha1_iris_train_gcr.yaml
xfate123 May 16, 2020
b583c1b
Create xgboostjob_v1alpha1_iris_predict_gcr.yaml
xfate123 May 16, 2020
758ec4e
Update and rename xgboostjob_v1alpha1_iris_predict_gcr.yaml to xgboos…
xfate123 May 16, 2020
4125851
Update and rename xgboostjob_v1alpha1_iris_train_gcr.yaml to xgboostj…
xfate123 May 16, 2020
9a0e655
Update README.md
xfate123 May 16, 2020
a2a1702
Update xgboostjob_v1alpha1_iris_train_gcp.yaml
xfate123 May 16, 2020
05675a1
Update xgboostjob_v1alpha1_iris_predict_gcp.yaml
xfate123 May 16, 2020
dc71d6b
Update xgboostjob_v1alpha1_iris_predict_oss.yaml
xfate123 May 16, 2020
cf309d8
Update xgboostjob_v1alpha1_iris_train_oss.yaml
xfate123 May 16, 2020
ead8563
Update README.md
xfate123 May 16, 2020
fcf83ec
Update README.md
xfate123 May 16, 2020
ef7a7d0
Update README.md
xfate123 May 16, 2020
9b5d214
Update README.md
xfate123 May 16, 2020
309eee4
Update utils.py
xfate123 May 17, 2020
fb48969
Update utils.py
xfate123 May 17, 2020
af63ce3
Update requirements.txt
xfate123 May 17, 2020
ca9bed0
Update requirements.txt
xfate123 May 17, 2020
0c22468
Update utils.py
xfate123 May 17, 2020
1db400c
Update requirements.txt
xfate123 May 17, 2020
ca55228
Update requirements.txt
xfate123 May 17, 2020
2490274
Update xgboostjob_v1alpha1_iris_predict_gcp.yaml
xfate123 May 17, 2020
eeb1049
Update xgboostjob_v1alpha1_iris_predict_local.yaml
xfate123 May 17, 2020
fc7543f
Update xgboostjob_v1alpha1_iris_predict_oss.yaml
xfate123 May 17, 2020
8d6cf3c
Update xgboostjob_v1alpha1_iris_train_gcp.yaml
xfate123 May 17, 2020
341bd49
Update xgboostjob_v1alpha1_iris_train_local.yaml
xfate123 May 17, 2020
c8185e7
Update xgboostjob_v1alpha1_iris_train_oss.yaml
xfate123 May 17, 2020
925e26f
Update utils.py
xfate123 May 17, 2020
a313d2a
Update main.py
xfate123 May 17, 2020
cf82e5a
Update utils.py
xfate123 May 17, 2020
e57465a
Update utils.py
xfate123 May 17, 2020
06d2992
Update README.md
xfate123 May 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions config/samples/xgboost-dist/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
def main(args):

model_storage_type = args.model_storage_type
if (model_storage_type == "local" or model_storage_type == "oss"):
if (model_storage_type == "local" or model_storage_type == "oss" or model_storage_typr == 'gcp'):
print ( "The storage type is " + model_storage_type)
else:
raise Exception("Only supports storage types like local and OSS")
raise Exception("Only supports storage types like local, OSS and GCP")

if args.job_type == "Predict":
logging.info("starting the predict job")
Expand Down Expand Up @@ -66,11 +66,13 @@ def main(args):
parser.add_argument(
'--learning_rate',
help='Learning rate for the model',
type=int,
default=0.1
)
parser.add_argument(
'--early_stopping_rounds',
help='XGBoost argument for stopping early',
type=int,
default=50
)
parser.add_argument(
Expand All @@ -85,7 +87,11 @@ def main(args):
)
parser.add_argument(
'--oss_param',
help='oss parameter if you choose the model storage as OSS type',
help='oss parameter if you choose the model storage as OSS type'
)
parser.add_argument(
'--gcp_param',
help='gcp parameter if you choose the model storage as GCP type'
)

logging.basicConfig(format='%(message)s')
Expand Down
96 changes: 91 additions & 5 deletions config/samples/xgboost-dist/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import xgboost as xgb
import os
import tempfile
from googel.cloud import storage
from oauth2client.service_account import ServiceAccountCredentials
import oss2
import json
import pandas as pd
Expand Down Expand Up @@ -59,7 +61,7 @@ def read_train_data(rank, num_workers, path):
y = iris.target

start, end = get_range_data(len(x), rank, num_workers)
x = x[start:end, :]
x = x[start:end]
y = y[start:end]

x = pd.DataFrame(x)
Expand Down Expand Up @@ -87,7 +89,7 @@ def read_predict_data(rank, num_workers, path):
y = iris.target

start, end = get_range_data(len(x), rank, num_workers)
x = x[start:end, :]
x = x[start:end]
y = y[start:end]
x = pd.DataFrame(x)
y = pd.DataFrame(y)
Expand All @@ -113,7 +115,7 @@ def get_range_data(num_row, rank, num_workers):
x_start = rank * num_per_partition
x_end = (rank + 1) * num_per_partition

if x_end > num_row:
if x_end > num_row or (rank==num_workers-1 and x_end< num_row):
x_end = num_row

return x_start, x_end
Expand All @@ -140,10 +142,18 @@ def dump_model(model, type, model_path, args):
oss_param = parse_parameters(args.oss_param, ",", ":")
if oss_param is None:
raise Exception("Please config oss parameter to store model")

return False
oss_param['path'] = args.model_path
dump_model_to_oss(oss_param, model)
logging.info("Dump model into oss place %s", args.model_path)
elif type == 'gcp':
gcp_param = parse_parameters(args.gcp_param, ','.':')
if gcp_param is None:
raise Exception('Please config gcp parameter to store model')
return False
gcp_param['path'] = args.model_path
dump_model_to_gcp(gcp_param, model)
logging.info('Dump model into gcp place %s', args.model_path)

return True

Expand Down Expand Up @@ -171,6 +181,14 @@ def read_model(type, model_path, args):

model = read_model_from_oss(oss_param)
logging.info("read model from oss place %s", model_path)
elif type == 'gcp':
gcp_param = parse_parameters(args.gcp_param,',',':')
if gcp_param is None:
raise Exception('Please config gcp to read model')
return False
gcp_param['path'] = args.model_path
model = read_model_from_gcp(args.gcp_param)
logging.info('read model from gcp place %s', model_path)

return model

Expand Down Expand Up @@ -213,6 +231,37 @@ def dump_model_to_oss(oss_parameters, booster):
return False

return True
def dump_model_to_gcp(gcp_parameter,booster):
model_fname = os.path.join(tempfile.mkdtemp(), 'model')
text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text')
feature_importance = os.path.join(tempfile.mkdtemp(),
'feature_importance.json')

gcp_path = gcp_parameters['path']
logger.info('---- export model ----')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

export model to GCP ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's to GCP

booster.save_model(model_fname)
booster.dump_model(text_model_fname)
fscore_dict = booster.get_fscore()
with open(feature_importance, 'w') as file:
file.write(json.dumps(fscore_dict))
logger.info('---- chief dump model successfully!')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dump model to local ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learnt it from dump to oss module, I think the logic is dump the model to local first, and then upload from local to the cloud


if os.path.exists(model_fname):
logger.info('---- Upload Model start...')

while gcp_path[-1] == '/':
gcp_path = gcp_path[:-1]

upload_gcp(gcp_parameters, model_fname, gcp_path)
aux_path = gcp_path + '_dir/'
upload_gcp(gcp_parameters, model_fname, aux_path)
upload_gcp(gcp_parameters, text_model_fname, aux_path)
upload_gcp(gcp_parameters, feature_importance, aux_path)
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the log to say that this model is updated success?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for sure

raise Exception("fail to generate model")
return False

return True


def upload_oss(kw, local_file, oss_path):
Expand All @@ -237,6 +286,23 @@ def upload_oss(kw, local_file, oss_path):
except Exception():
raise ValueError('upload %s to %s failed' %
(os.path.abspath(local_file), oss_path))
def upload_gcp(kw, local_file, gcp_path):
if gcp_path[-1] == '/':
gcp_path = '%s%s' % (gcp_path, os.path.basename(local_file))
credentials_dict = {
'type': kw['type'],
'client_id': kw['client_id'],
'client_email': kw['client_email']
'private_key_id':kw['private_key_id']
'private_key': kw['private_key']
}
credentials=ServiceAccountCredentials.from_json_keyfile_dict(credential_dict)
client = storage.Client(credentials=credentials)
bucket=storage.get_bucket(kw['access_bucket'])
blob=bucket.blob(gcp_path)
blob.upload_from_filename(local_file)




def read_model_from_oss(kw):
Expand All @@ -263,7 +329,27 @@ def read_model_from_oss(kw):
bst.load_model(temp_model_fname)

return bst

def read_model_from_gcp(kw):
credentials_dict = {
'type': kw['type'],
'client_id': kw['client_id'],
'client_email': kw['client_email']
'private_key_id':kw['private_key_id']
'private_key': kw['private_key']
}
credentials=ServiceAccountCredentials.from_json_keyfile_dict(credential_dict)
client = storage.Client(credentials=credentials)
bucket=storage.get_bucket(kw['access_bucket'])
gcp_path = kw["path"]
blob = bucket.blob(gcp_path)
temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model')
try:
blob.download_to_filename(temp_model_fname)
logger.info("success to load model from gcp %s", gcp_path)
except Exception as e:
logging.error("fail to load model: " + e)
raise Exception("fail to load model from gcp %s", gcp_path)


def parse_parameters(input, splitter_between, splitter_in):
"""
Expand Down