Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics utilization #25

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ RUN curl -L -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-releas
ADD LICENSE /
ADD gcp_deepvariant_runner.py /opt/deepvariant_runner/src/
ADD gke_cluster.py /opt/deepvariant_runner/src/
ADD metrics.py /opt/deepvariant_runner/src/
ADD process_util.py /opt/deepvariant_runner/src/
ADD run_and_verify.sh /opt/deepvariant_runner/bin/
ADD cancel /opt/deepvariant_runner/bin/
Expand Down
117 changes: 117 additions & 0 deletions gcp_deepvariant_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import argparse
import datetime
import functools
import json
import logging
import multiprocessing
Expand All @@ -64,6 +65,8 @@
import uuid

import gke_cluster
import metrics
import process_util
from google.api_core import exceptions as google_exceptions
from google.cloud import storage

Expand All @@ -81,6 +84,12 @@
_DEFAULT_BOOT_DISK_SIZE_GB = '50'
_ROLE_STORAGE_OBJ_CREATOR = ['storage.objects.create']

# Metrics name.
_MAKE_EXAMPLES = 'MakeExamples'
_CALL_VARIANTS = 'CallVariants'
_POSTPROCESS_VARIANTS = 'PostprocessVariants'
_START = 'Start'

_GCSFUSE_IMAGE = 'gcr.io/cloud-genomics-pipelines/gcsfuse'
_GCSFUSE_LOCAL_DIR_TEMPLATE = '/mnt/google/input-gcsfused-{SHARD_INDEX}/'

Expand Down Expand Up @@ -273,6 +282,56 @@ def _write_actions_to_temp_file(actions):
return temp_file.name


def _get_project_number(project_id):
"""Returns GCP project number (-1 on failure)."""
if hasattr(_get_project_number,
'project_number') and _get_project_number.project_number != -1:
return _get_project_number.project_number
try:
args = [
'gcloud', 'projects', 'describe', project_id,
'--format=value(projectNumber)'
]
_get_project_number.project_number = process_util.run_command(
args, retries=2)
except RuntimeError:
# Error is already logged.
_get_project_number.project_number = -1
return _get_project_number.project_number


def report_runtime_metrics(method_name):
"""Decorator that reports runtime metrics."""

def decorated(func):
"""Pseudo decorator."""
@functools.wraps(func)
def wrapper(pipeline_args, *args, **kwargs):
"""Wrapper that measures time elapsed, and reports it.

if stop_collecting_anonymous_usage_metrics is set it only calls the method
without collecting any metrics.
"""
if pipeline_args.stop_collecting_anonymous_usage_metrics:
func(pipeline_args, *args, **kwargs)
else:
status = '_Failure'
start = time.time()
try:
func(pipeline_args, *args, **kwargs)
status = '_Success'
finally:
metrics_name = method_name + status
metrics.add(
_get_project_number(pipeline_args.project),
metrics_name,
duration_seconds=int(time.time() - start))

return wrapper

return decorated


def _run_job(run_args, log_path):
"""Runs a job using the pipelines CLI tool.

Expand Down Expand Up @@ -386,6 +445,7 @@ def _meets_gcp_label_restrictions(label):
label) is not None


@report_runtime_metrics(method_name=_MAKE_EXAMPLES)
def _run_make_examples(pipeline_args):
"""Runs the make_examples job."""

Expand Down Expand Up @@ -612,6 +672,7 @@ def get_extra_args():
_wait_for_results(threads, results)


@report_runtime_metrics(method_name=_CALL_VARIANTS)
def _run_call_variants(pipeline_args):
"""Runs the call_variants job."""
if pipeline_args.tpu:
Expand All @@ -620,6 +681,7 @@ def _run_call_variants(pipeline_args):
_run_call_variants_with_pipelines_api(pipeline_args)


@report_runtime_metrics(method_name=_POSTPROCESS_VARIANTS)
def _run_postprocess_variants(pipeline_args):
"""Runs the postprocess_variants job."""

Expand Down Expand Up @@ -1040,10 +1102,65 @@ def run(argv=None):
'jobs. By default, the pipeline runs all 3 jobs (make_examples, '
'call_variants, postprocess_variants) in sequence. '
'This option may be used to run parts of the pipeline.'))
parser.add_argument(
'--stop_collecting_anonymous_usage_metrics',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think add the argument collecting_anonymous_usage_metrics would be easier to read. Especially it is used as if not pipeline_args.stop_collecting_anonymous_usage_metrics:. And the action and help seem not relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the if statement would be more readable if this flag was not 'negative'. However, I really like this flag to explicitly indicate it stops metrics collection.

Please let me know if this does not make sense.

default=False,
action='store_true',
help=('This tool collects some anonymous metrics related to utilized '
'resources, such as how many workers, how many CPU cores, how much '
'ram, etc. was used to run each step of DeepVariant. We use these '
'metrics to further improve the usibility of our tool. You can '
'compeletly stop collecting these metrics by setting this flag.'))

pipeline_args = parser.parse_args(argv)
_validate_and_complete_args(pipeline_args)

def get_image_version(docker_image):
return docker_image.split(':')[-1]

def get_model_version(model):
return model.split('/')[-1]

if not pipeline_args.stop_collecting_anonymous_usage_metrics:
metrics.add(
_get_project_number(pipeline_args.project),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it still makes sense to collect the metrics when getting the project number failed (-1).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does. Ideally we like to know how many unique users we have and getting their project_id is needed for that. But in case we cannot get the project_id still it will be informative if we find out other information about the run (how many workers, failure or success, ...).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know in which case it may fail to get the project_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine there might be some GCP IAM settings that prevents a service account from reading the project's metadata, I am guessing in those cases it will also raise an error.
It is also possible we can successfully fetch the project_id in all runs and never return -1. But in any case in _get_project_number() we have to run the gcloud projects describe command in a try-catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to gcloud projects describe --help:

This command can fail for the following reasons:

  • The project specified does not exist.
  • The active account does not have permission to access the given project

The first case does not occur to us (otherwise the gcloud alpha genomics pipelines run would't go through). So we will not log the right project_id when serviceaccount does not have enough permission.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general we aren't allowed to log project ID (it's PII). I think we got approval to log a SHA1 hash of the project ID.

Note that the permissions required to get the project ID are non-trivial and so it might be likely that the SA doesn't have them.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Saman for the detailed explanation! I think the second case should not occur to us neither, since the user is running the pipeline in that project, which they must have access to. So I agree that it probably will never return -1 :).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I misread this as ID not number for some reason. Project number is fine :)

_START,
# General run related settings.
image_version=get_image_version(pipeline_args.docker_image),
model_version=get_model_version(pipeline_args.model),
input_file_format=(
_BAM_FILE_SUFFIX if pipeline_args.bam.endswith(_BAM_FILE_SUFFIX)
else _CRAM_FILE_SUFFIX),
zones=pipeline_args.zones,
genomic_regions=(
len(pipeline_args.regions) if pipeline_args.regions else 0),
shards=pipeline_args.shards,
jobs_to_run=pipeline_args.jobs_to_run,
gvcf=True if pipeline_args.gvcf_outfile else False,
max_non_preemptible_tries=pipeline_args.max_non_preemptible_tries,
max_preemptible_tries=pipeline_args.max_preemptible_tries,
preemptible=pipeline_args.preemptible,
# Make_examples stage related settings.
gcsfuse=pipeline_args.gcsfuse,
make_examples_workers=pipeline_args.make_examples_workers,
make_examples_cores_per_worker=(
pipeline_args.make_examples_cores_per_worker),
make_examples_ram_per_worker_gb=(
pipeline_args.make_examples_ram_per_worker_gb),
# Call_variants stage related settings.
gpu=pipeline_args.gpu,
accelerator_type=pipeline_args.accelerator_type,
tpu=pipeline_args.tpu,
existing_gke_cluster=True if pipeline_args.gke_cluster_name else False,
call_variants_workers=pipeline_args.call_variants_workers,
call_variants_cores_per_worker=(
pipeline_args.call_variants_cores_per_worker),
call_variants_ram_per_worker_gb=(
pipeline_args.call_variants_ram_per_worker_gb),
# Postprocess stage related settings.
postprocess_variants_cores=pipeline_args.postprocess_variants_cores,
postprocess_variants_ram_gb=pipeline_args.postprocess_variants_ram_gb)

# TODO(b/112148076): Fail fast: validate GKE cluster early on in the pipeline.
if _MAKE_EXAMPLES_JOB_NAME in pipeline_args.jobs_to_run:
logging.info('Running make_examples...')
Expand Down
Loading