-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics utilization #25
base: master
Are you sure you want to change the base?
Changes from all commits
ec4c557
ae775d0
2aaddc9
fcc3426
9c6c110
3978915
f05ca90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ | |
|
||
import argparse | ||
import datetime | ||
import functools | ||
import json | ||
import logging | ||
import multiprocessing | ||
|
@@ -64,6 +65,8 @@ | |
import uuid | ||
|
||
import gke_cluster | ||
import metrics | ||
import process_util | ||
from google.api_core import exceptions as google_exceptions | ||
from google.cloud import storage | ||
|
||
|
@@ -81,6 +84,12 @@ | |
_DEFAULT_BOOT_DISK_SIZE_GB = '50' | ||
_ROLE_STORAGE_OBJ_CREATOR = ['storage.objects.create'] | ||
|
||
# Metrics name. | ||
_MAKE_EXAMPLES = 'MakeExamples' | ||
_CALL_VARIANTS = 'CallVariants' | ||
_POSTPROCESS_VARIANTS = 'PostprocessVariants' | ||
_START = 'Start' | ||
|
||
_GCSFUSE_IMAGE = 'gcr.io/cloud-genomics-pipelines/gcsfuse' | ||
_GCSFUSE_LOCAL_DIR_TEMPLATE = '/mnt/google/input-gcsfused-{SHARD_INDEX}/' | ||
|
||
|
@@ -273,6 +282,56 @@ def _write_actions_to_temp_file(actions): | |
return temp_file.name | ||
|
||
|
||
def _get_project_number(project_id): | ||
"""Returns GCP project number (-1 on failure).""" | ||
if hasattr(_get_project_number, | ||
'project_number') and _get_project_number.project_number != -1: | ||
return _get_project_number.project_number | ||
try: | ||
args = [ | ||
'gcloud', 'projects', 'describe', project_id, | ||
'--format=value(projectNumber)' | ||
] | ||
_get_project_number.project_number = process_util.run_command( | ||
args, retries=2) | ||
except RuntimeError: | ||
# Error is already logged. | ||
_get_project_number.project_number = -1 | ||
return _get_project_number.project_number | ||
|
||
|
||
def report_runtime_metrics(method_name): | ||
"""Decorator that reports runtime metrics.""" | ||
|
||
def decorated(func): | ||
"""Pseudo decorator.""" | ||
@functools.wraps(func) | ||
def wrapper(pipeline_args, *args, **kwargs): | ||
"""Wrapper that measures time elapsed, and reports it. | ||
|
||
if stop_collecting_anonymous_usage_metrics is set it only calls the method | ||
without collecting any metrics. | ||
""" | ||
if pipeline_args.stop_collecting_anonymous_usage_metrics: | ||
func(pipeline_args, *args, **kwargs) | ||
else: | ||
status = '_Failure' | ||
start = time.time() | ||
try: | ||
func(pipeline_args, *args, **kwargs) | ||
status = '_Success' | ||
finally: | ||
metrics_name = method_name + status | ||
metrics.add( | ||
_get_project_number(pipeline_args.project), | ||
metrics_name, | ||
duration_seconds=int(time.time() - start)) | ||
|
||
return wrapper | ||
|
||
return decorated | ||
|
||
|
||
def _run_job(run_args, log_path): | ||
"""Runs a job using the pipelines CLI tool. | ||
|
||
|
@@ -386,6 +445,7 @@ def _meets_gcp_label_restrictions(label): | |
label) is not None | ||
|
||
|
||
@report_runtime_metrics(method_name=_MAKE_EXAMPLES) | ||
def _run_make_examples(pipeline_args): | ||
"""Runs the make_examples job.""" | ||
|
||
|
@@ -612,6 +672,7 @@ def get_extra_args(): | |
_wait_for_results(threads, results) | ||
|
||
|
||
@report_runtime_metrics(method_name=_CALL_VARIANTS) | ||
def _run_call_variants(pipeline_args): | ||
"""Runs the call_variants job.""" | ||
if pipeline_args.tpu: | ||
|
@@ -620,6 +681,7 @@ def _run_call_variants(pipeline_args): | |
_run_call_variants_with_pipelines_api(pipeline_args) | ||
|
||
|
||
@report_runtime_metrics(method_name=_POSTPROCESS_VARIANTS) | ||
def _run_postprocess_variants(pipeline_args): | ||
"""Runs the postprocess_variants job.""" | ||
|
||
|
@@ -1040,10 +1102,65 @@ def run(argv=None): | |
'jobs. By default, the pipeline runs all 3 jobs (make_examples, ' | ||
'call_variants, postprocess_variants) in sequence. ' | ||
'This option may be used to run parts of the pipeline.')) | ||
parser.add_argument( | ||
'--stop_collecting_anonymous_usage_metrics', | ||
default=False, | ||
action='store_true', | ||
help=('This tool collects some anonymous metrics related to utilized ' | ||
'resources, such as how many workers, how many CPU cores, how much ' | ||
'ram, etc. was used to run each step of DeepVariant. We use these ' | ||
'metrics to further improve the usibility of our tool. You can ' | ||
'compeletly stop collecting these metrics by setting this flag.')) | ||
|
||
pipeline_args = parser.parse_args(argv) | ||
_validate_and_complete_args(pipeline_args) | ||
|
||
def get_image_version(docker_image): | ||
return docker_image.split(':')[-1] | ||
|
||
def get_model_version(model): | ||
return model.split('/')[-1] | ||
|
||
if not pipeline_args.stop_collecting_anonymous_usage_metrics: | ||
metrics.add( | ||
_get_project_number(pipeline_args.project), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure whether it still makes sense to collect the metrics when getting the project number failed (-1). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it does. Ideally we like to know how many unique users we have and getting their project_id is needed for that. But in case we cannot get the project_id still it will be informative if we find out other information about the run (how many workers, failure or success, ...). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know in which case it may fail to get the project_id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can imagine there might be some GCP IAM settings that prevents a service account from reading the project's metadata, I am guessing in those cases it will also raise an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to
The first case does not occur to us (otherwise the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general we aren't allowed to log project ID (it's PII). I think we got approval to log a SHA1 hash of the project ID. Note that the permissions required to get the project ID are non-trivial and so it might be likely that the SA doesn't have them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you Saman for the detailed explanation! I think the second case should not occur to us neither, since the user is running the pipeline in that project, which they must have access to. So I agree that it probably will never return -1 :). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I misread this as ID not number for some reason. Project number is fine :) |
||
_START, | ||
# General run related settings. | ||
image_version=get_image_version(pipeline_args.docker_image), | ||
model_version=get_model_version(pipeline_args.model), | ||
input_file_format=( | ||
_BAM_FILE_SUFFIX if pipeline_args.bam.endswith(_BAM_FILE_SUFFIX) | ||
else _CRAM_FILE_SUFFIX), | ||
zones=pipeline_args.zones, | ||
genomic_regions=( | ||
len(pipeline_args.regions) if pipeline_args.regions else 0), | ||
shards=pipeline_args.shards, | ||
jobs_to_run=pipeline_args.jobs_to_run, | ||
gvcf=True if pipeline_args.gvcf_outfile else False, | ||
max_non_preemptible_tries=pipeline_args.max_non_preemptible_tries, | ||
max_preemptible_tries=pipeline_args.max_preemptible_tries, | ||
preemptible=pipeline_args.preemptible, | ||
# Make_examples stage related settings. | ||
gcsfuse=pipeline_args.gcsfuse, | ||
make_examples_workers=pipeline_args.make_examples_workers, | ||
make_examples_cores_per_worker=( | ||
pipeline_args.make_examples_cores_per_worker), | ||
make_examples_ram_per_worker_gb=( | ||
pipeline_args.make_examples_ram_per_worker_gb), | ||
# Call_variants stage related settings. | ||
gpu=pipeline_args.gpu, | ||
accelerator_type=pipeline_args.accelerator_type, | ||
tpu=pipeline_args.tpu, | ||
existing_gke_cluster=True if pipeline_args.gke_cluster_name else False, | ||
call_variants_workers=pipeline_args.call_variants_workers, | ||
call_variants_cores_per_worker=( | ||
pipeline_args.call_variants_cores_per_worker), | ||
call_variants_ram_per_worker_gb=( | ||
pipeline_args.call_variants_ram_per_worker_gb), | ||
# Postprocess stage related settings. | ||
postprocess_variants_cores=pipeline_args.postprocess_variants_cores, | ||
postprocess_variants_ram_gb=pipeline_args.postprocess_variants_ram_gb) | ||
|
||
# TODO(b/112148076): Fail fast: validate GKE cluster early on in the pipeline. | ||
if _MAKE_EXAMPLES_JOB_NAME in pipeline_args.jobs_to_run: | ||
logging.info('Running make_examples...') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think add the argument
collecting_anonymous_usage_metrics
would be easier to read. Especially it is used asif not pipeline_args.stop_collecting_anonymous_usage_metrics:
. And theaction
andhelp
seem not relevant?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the if statement would be more readable if this flag was not 'negative'. However, I really like this flag to explicitly indicate it stops metrics collection.
Please let me know if this does not make sense.