This repository has been archived by the owner on Feb 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
add gcp storage to xgboost-operator #81
Open
xfate123
wants to merge
37
commits into
kubeflow:master
Choose a base branch
from
xfate123:add-gcp-storage(draft)
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
2240810
add gcp storage to xgboost-operator, still working on in
xfate123 ba00fb4
Update utils.py
xfate123 f3d8619
Update utils.py
xfate123 d904b9c
Update xgboostjob_v1alpha1_iris_predict.yaml
xfate123 813e3cc
Update and rename xgboostjob_v1alpha1_iris_predict.yaml to xgboostjob…
xfate123 4056365
Rename xgboostjob_v1alpha1_iris_train.yaml to xgboostjob_v1alpha1_iri…
xfate123 bae957b
Create xgboostjob_v1alpha1_iris_train_gcr.yaml
xfate123 b583c1b
Create xgboostjob_v1alpha1_iris_predict_gcr.yaml
xfate123 758ec4e
Update and rename xgboostjob_v1alpha1_iris_predict_gcr.yaml to xgboos…
xfate123 4125851
Update and rename xgboostjob_v1alpha1_iris_train_gcr.yaml to xgboostj…
xfate123 9a0e655
Update README.md
xfate123 a2a1702
Update xgboostjob_v1alpha1_iris_train_gcp.yaml
xfate123 05675a1
Update xgboostjob_v1alpha1_iris_predict_gcp.yaml
xfate123 dc71d6b
Update xgboostjob_v1alpha1_iris_predict_oss.yaml
xfate123 cf309d8
Update xgboostjob_v1alpha1_iris_train_oss.yaml
xfate123 ead8563
Update README.md
xfate123 fcf83ec
Update README.md
xfate123 ef7a7d0
Update README.md
xfate123 9b5d214
Update README.md
xfate123 309eee4
Update utils.py
xfate123 fb48969
Update utils.py
xfate123 af63ce3
Update requirements.txt
xfate123 ca9bed0
Update requirements.txt
xfate123 0c22468
Update utils.py
xfate123 1db400c
Update requirements.txt
xfate123 ca55228
Update requirements.txt
xfate123 2490274
Update xgboostjob_v1alpha1_iris_predict_gcp.yaml
xfate123 eeb1049
Update xgboostjob_v1alpha1_iris_predict_local.yaml
xfate123 fc7543f
Update xgboostjob_v1alpha1_iris_predict_oss.yaml
xfate123 8d6cf3c
Update xgboostjob_v1alpha1_iris_train_gcp.yaml
xfate123 341bd49
Update xgboostjob_v1alpha1_iris_train_local.yaml
xfate123 c8185e7
Update xgboostjob_v1alpha1_iris_train_oss.yaml
xfate123 925e26f
Update utils.py
xfate123 a313d2a
Update main.py
xfate123 cf82e5a
Update utils.py
xfate123 e57465a
Update utils.py
xfate123 06d2992
Update README.md
xfate123 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ | |
import xgboost as xgb | ||
import os | ||
import tempfile | ||
from googel.cloud import storage | ||
from oauth2client.service_account import ServiceAccountCredentials | ||
import oss2 | ||
import json | ||
import pandas as pd | ||
|
@@ -59,7 +61,7 @@ def read_train_data(rank, num_workers, path): | |
y = iris.target | ||
|
||
start, end = get_range_data(len(x), rank, num_workers) | ||
x = x[start:end, :] | ||
x = x[start:end] | ||
y = y[start:end] | ||
|
||
x = pd.DataFrame(x) | ||
|
@@ -87,7 +89,7 @@ def read_predict_data(rank, num_workers, path): | |
y = iris.target | ||
|
||
start, end = get_range_data(len(x), rank, num_workers) | ||
x = x[start:end, :] | ||
x = x[start:end] | ||
y = y[start:end] | ||
x = pd.DataFrame(x) | ||
y = pd.DataFrame(y) | ||
|
@@ -113,7 +115,7 @@ def get_range_data(num_row, rank, num_workers): | |
x_start = rank * num_per_partition | ||
x_end = (rank + 1) * num_per_partition | ||
|
||
if x_end > num_row: | ||
if x_end > num_row or (rank==num_workers-1 and x_end< num_row): | ||
x_end = num_row | ||
|
||
return x_start, x_end | ||
|
@@ -140,10 +142,18 @@ def dump_model(model, type, model_path, args): | |
oss_param = parse_parameters(args.oss_param, ",", ":") | ||
if oss_param is None: | ||
raise Exception("Please config oss parameter to store model") | ||
|
||
return False | ||
oss_param['path'] = args.model_path | ||
dump_model_to_oss(oss_param, model) | ||
logging.info("Dump model into oss place %s", args.model_path) | ||
elif type == 'gcp': | ||
gcp_param = parse_parameters(args.gcp_param, ','.':') | ||
if gcp_param is None: | ||
raise Exception('Please config gcp parameter to store model') | ||
return False | ||
gcp_param['path'] = args.model_path | ||
dump_model_to_gcp(gcp_param, model) | ||
logging.info('Dump model into gcp place %s', args.model_path) | ||
|
||
return True | ||
|
||
|
@@ -171,6 +181,14 @@ def read_model(type, model_path, args): | |
|
||
model = read_model_from_oss(oss_param) | ||
logging.info("read model from oss place %s", model_path) | ||
elif type == 'gcp': | ||
gcp_param = parse_parameters(args.gcp_param,',',':') | ||
if gcp_param is None: | ||
raise Exception('Please config gcp to read model') | ||
return False | ||
gcp_param['path'] = args.model_path | ||
model = read_model_from_gcp(args.gcp_param) | ||
logging.info('read model from gcp place %s', model_path) | ||
|
||
return model | ||
|
||
|
@@ -213,6 +231,37 @@ def dump_model_to_oss(oss_parameters, booster): | |
return False | ||
|
||
return True | ||
def dump_model_to_gcp(gcp_parameter,booster): | ||
model_fname = os.path.join(tempfile.mkdtemp(), 'model') | ||
text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text') | ||
feature_importance = os.path.join(tempfile.mkdtemp(), | ||
'feature_importance.json') | ||
|
||
gcp_path = gcp_parameters['path'] | ||
logger.info('---- export model ----') | ||
booster.save_model(model_fname) | ||
booster.dump_model(text_model_fname) | ||
fscore_dict = booster.get_fscore() | ||
with open(feature_importance, 'w') as file: | ||
file.write(json.dumps(fscore_dict)) | ||
logger.info('---- chief dump model successfully!') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dump model to local ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I learnt it from dump to oss module, I think the logic is dump the model to local first, and then upload from local to the cloud |
||
|
||
if os.path.exists(model_fname): | ||
logger.info('---- Upload Model start...') | ||
|
||
while gcp_path[-1] == '/': | ||
gcp_path = gcp_path[:-1] | ||
|
||
upload_gcp(gcp_parameters, model_fname, gcp_path) | ||
aux_path = gcp_path + '_dir/' | ||
upload_gcp(gcp_parameters, model_fname, aux_path) | ||
upload_gcp(gcp_parameters, text_model_fname, aux_path) | ||
upload_gcp(gcp_parameters, feature_importance, aux_path) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add the log to say that this model is updated success? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for sure |
||
raise Exception("fail to generate model") | ||
return False | ||
|
||
return True | ||
|
||
|
||
def upload_oss(kw, local_file, oss_path): | ||
|
@@ -237,6 +286,23 @@ def upload_oss(kw, local_file, oss_path): | |
except Exception(): | ||
raise ValueError('upload %s to %s failed' % | ||
(os.path.abspath(local_file), oss_path)) | ||
def upload_gcp(kw, local_file, gcp_path): | ||
if gcp_path[-1] == '/': | ||
gcp_path = '%s%s' % (gcp_path, os.path.basename(local_file)) | ||
credentials_dict = { | ||
'type': kw['type'], | ||
'client_id': kw['client_id'], | ||
'client_email': kw['client_email'] | ||
'private_key_id':kw['private_key_id'] | ||
'private_key': kw['private_key'] | ||
} | ||
credentials=ServiceAccountCredentials.from_json_keyfile_dict(credential_dict) | ||
client = storage.Client(credentials=credentials) | ||
bucket=storage.get_bucket(kw['access_bucket']) | ||
blob=bucket.blob(gcp_path) | ||
blob.upload_from_filename(local_file) | ||
|
||
|
||
|
||
|
||
def read_model_from_oss(kw): | ||
|
@@ -263,7 +329,27 @@ def read_model_from_oss(kw): | |
bst.load_model(temp_model_fname) | ||
|
||
return bst | ||
|
||
def read_model_from_gcp(kw): | ||
credentials_dict = { | ||
'type': kw['type'], | ||
'client_id': kw['client_id'], | ||
'client_email': kw['client_email'] | ||
'private_key_id':kw['private_key_id'] | ||
'private_key': kw['private_key'] | ||
} | ||
credentials=ServiceAccountCredentials.from_json_keyfile_dict(credential_dict) | ||
client = storage.Client(credentials=credentials) | ||
bucket=storage.get_bucket(kw['access_bucket']) | ||
gcp_path = kw["path"] | ||
blob = bucket.blob(gcp_path) | ||
temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model') | ||
try: | ||
blob.download_to_filename(temp_model_fname) | ||
logger.info("success to load model from gcp %s", gcp_path) | ||
except Exception as e: | ||
logging.error("fail to load model: " + e) | ||
raise Exception("fail to load model from gcp %s", gcp_path) | ||
|
||
|
||
def parse_parameters(input, splitter_between, splitter_in): | ||
""" | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
export model to GCP ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's to GCP