Skip to content

Commit

Permalink
remove _gcs_object_exist() method
Browse files Browse the repository at this point in the history
  • Loading branch information
samanvp committed Apr 1, 2019
1 parent ee78ffd commit 9f4f9fc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 71 deletions.
26 changes: 9 additions & 17 deletions gcp_deepvariant_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,20 +315,11 @@ def _is_valid_gcs_path(gcs_path):
gcs_path: (str) a path to directory or an obj on GCS.
"""
return (urlparse.urlparse(gcs_path).scheme == 'gs' and
urlparse.urlparse(gcs_path).netloc != '')


def _gcs_object_exist(gcs_obj_path):
"""Returns true if the given path is a valid object on GCS.
Args:
gcs_obj_path: (str) a path to an obj on GCS.
"""
return _get_gcs_object_size(gcs_obj_path) != 0
urlparse.urlparse(gcs_path).netloc)


def _get_gcs_object_size(gcs_obj_path):
"""Returns the size of the given GCS object.
"""Returns size of GCS object or 0, if object is missing or access is denied.
Args:
gcs_obj_path: (str) a path to an obj on GCS.
Expand Down Expand Up @@ -762,7 +753,7 @@ def _validate_and_complete_args(pipeline_args):
'model is neither WGS nor WES: %s' % pipeline_args.model)
if is_wes and is_wgs:
raise ValueError('Unable to automatically set computational flags. Given '
'model matches both WGS and WES: %s' % pipeline_args.model)
'model matches both WGS & WES: %s' % pipeline_args.model)
if not pipeline_args.bam.endswith(_BAM_FILE_SUFFIX):
raise ValueError(
'Only able to automatically set computational flags for BAM files.')
Expand Down Expand Up @@ -836,15 +827,16 @@ def _validate_and_complete_args(pipeline_args):
pipeline_args.bai = pipeline_args.bam + _BAI_FILE_SUFFIX

# Ensuring all input files exist...
if not _gcs_object_exist(pipeline_args.ref):
if _get_gcs_object_size(pipeline_args.ref) == 0:
raise ValueError('Given reference file via --ref does not exist')
if not _gcs_object_exist(pipeline_args.ref_fai):
if _get_gcs_object_size(pipeline_args.ref_fai) == 0:
raise ValueError('Given FAI index file via --ref_fai does not exist')
if (pipeline_args.ref_gzi and not _gcs_object_exist(pipeline_args.ref_gzi)):
if (pipeline_args.ref_gzi and
_get_gcs_object_size(pipeline_args.ref_gzi) == 0):
raise ValueError('Given GZI index file via --ref_gzi does not exist')
if not _gcs_object_exist(pipeline_args.bam):
if _get_gcs_object_size(pipeline_args.bam) == 0:
raise ValueError('Given BAM file via --bam does not exist')
if not _gcs_object_exist(pipeline_args.bai):
if _get_gcs_object_size(pipeline_args.bai) == 0:
raise ValueError('Given BAM index file via --bai does not exist')
# ...and we can write to output buckets.
if not _can_write_to_bucket(_get_gcs_bucket(pipeline_args.staging)):
Expand Down
74 changes: 20 additions & 54 deletions gcp_deepvariant_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def setUp(self):

@mock.patch('gcp_deepvariant_runner._run_job')
@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool,
mock_run_job):
Expand Down Expand Up @@ -159,7 +159,7 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool,

@mock.patch('gcp_deepvariant_runner._run_job')
@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket,
mock_obj_exist, mock_pool, mock_run_job):
Expand Down Expand Up @@ -209,7 +209,7 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket,
'gs://bucket/staging/logs/postprocess_variants')

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
Expand Down Expand Up @@ -263,7 +263,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist,
self.assertEqual(mock_apply_async.call_count, 3)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
mock_obj_exist, mock_pool):
Expand Down Expand Up @@ -310,7 +310,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
self.assertEqual(mock_apply_async.call_count, 3)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
Expand Down Expand Up @@ -354,7 +354,7 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist,
self.assertEqual(mock_apply_async.call_count, 3)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
Expand Down Expand Up @@ -403,7 +403,7 @@ def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist,
@mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None)
@mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod')
@mock.patch.object(gke_cluster.GkeCluster, '_cluster_exists')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunCallVariants_TPU(self, mock_can_write_to_bucket, mock_obj_exist,
mock_cluster_exists, mock_deploy_pod, mock_init):
Expand Down Expand Up @@ -459,7 +459,7 @@ def testRunFailCallVariants_TPU(self):
gcp_deepvariant_runner.run(self._argv)

@mock.patch('gcp_deepvariant_runner._run_job')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist,
mock_run_job):
Expand Down Expand Up @@ -549,15 +549,12 @@ def testRunFailsSetOptimizedFlagsMissingBamFile(self):
gcp_deepvariant_runner.run(self._argv)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1
expected_workers = 16
Expand Down Expand Up @@ -606,15 +603,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1
expected_workers = 16
Expand Down Expand Up @@ -664,15 +658,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1
expected_workers = 32
Expand Down Expand Up @@ -722,15 +713,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1
expected_workers = 64
Expand Down Expand Up @@ -780,15 +768,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1
expected_workers = 128
Expand Down Expand Up @@ -838,15 +823,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_callVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True

mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1
Expand Down Expand Up @@ -899,15 +881,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_callVariants(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_callVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True

mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1
Expand Down Expand Up @@ -960,15 +939,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_callVariants(
self.assertEqual(mock_apply_async.call_count, expected_workers)

@mock.patch.object(multiprocessing, 'Pool')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_callVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_pool):
self, mock_object_size, mock_can_write_to_bucket, mock_pool):
mock_apply_async = mock_pool.return_value.apply_async
mock_apply_async.return_value = None
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True

mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1
Expand Down Expand Up @@ -1023,13 +999,11 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_callVariants(
@mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None)
@mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod')
@mock.patch.object(gke_cluster.GkeCluster, 'delete_cluster')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_callVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
self, mock_object_size, mock_can_write_to_bucket,
mock_delete_cluster, mock_deploy_pod, mock_init):
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True

mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1
Expand Down Expand Up @@ -1069,13 +1043,11 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_callVariants(
@mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None)
@mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod')
@mock.patch.object(gke_cluster.GkeCluster, 'delete_cluster')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_callVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
self, mock_object_size, mock_can_write_to_bucket,
mock_delete_cluster, mock_deploy_pod, mock_init):
mock_obj_exist.return_value = True
mock_can_write_to_bucket.return_value = True

mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1
Expand Down Expand Up @@ -1113,13 +1085,10 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_callVariants(
wait=True)

@mock.patch('gcp_deepvariant_runner._run_job')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_postProcessVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_run_job):
mock_obj_exist.return_value = True
self, mock_object_size, mock_can_write_to_bucket, mock_run_job):
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1
call_variant_workers = 4
Expand Down Expand Up @@ -1167,13 +1136,10 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_postProcessVariants(
'gs://bucket/staging/logs/postprocess_variants')

@mock.patch('gcp_deepvariant_runner._run_job')
@mock.patch('gcp_deepvariant_runner._gcs_object_exist')
@mock.patch('gcp_deepvariant_runner._can_write_to_bucket')
@mock.patch('gcp_deepvariant_runner._get_gcs_object_size')
def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_postProcessVariants(
self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist,
mock_run_job):
mock_obj_exist.return_value = True
self, mock_object_size, mock_can_write_to_bucket, mock_run_job):
mock_can_write_to_bucket.return_value = True
mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1
call_variant_workers = 1
Expand Down

0 comments on commit 9f4f9fc

Please sign in to comment.