diff --git a/gcp_deepvariant_runner.py b/gcp_deepvariant_runner.py index f23b687..43c63a0 100644 --- a/gcp_deepvariant_runner.py +++ b/gcp_deepvariant_runner.py @@ -318,17 +318,8 @@ def _is_valid_gcs_path(gcs_path): urlparse.urlparse(gcs_path).netloc != '') -def _gcs_object_exist(gcs_obj_path): - """Returns true if the given path is a valid object on GCS. - - Args: - gcs_obj_path: (str) a path to an obj on GCS. - """ - return _get_gcs_object_size(gcs_obj_path) != 0 - - def _get_gcs_object_size(gcs_obj_path): - """Returns the size of the given GCS object. + """Returns size of GCS object or 0, if object is missing or access is denied. Args: gcs_obj_path: (str) a path to an obj on GCS. @@ -762,7 +753,7 @@ def _validate_and_complete_args(pipeline_args): 'model is neither WGS nor WES: %s' % pipeline_args.model) if is_wes and is_wgs: raise ValueError('Unable to automatically set computational flags. Given ' - 'model matches both WGS and WES: %s' % pipeline_args.model) + 'model matches both WGS & WES: %s' % pipeline_args.model) if not pipeline_args.bam.endswith(_BAM_FILE_SUFFIX): raise ValueError( 'Only able to automatically set computational flags for BAM files.') @@ -836,15 +827,16 @@ def _validate_and_complete_args(pipeline_args): pipeline_args.bai = pipeline_args.bam + _BAI_FILE_SUFFIX # Ensuring all input files exist... - if not _gcs_object_exist(pipeline_args.ref): + if _get_gcs_object_size(pipeline_args.ref) == 0: raise ValueError('Given reference file via --ref does not exist') - if not _gcs_object_exist(pipeline_args.ref_fai): + if _get_gcs_object_size(pipeline_args.ref_fai) == 0: raise ValueError('Given FAI index file via --ref_fai does not exist') - if (pipeline_args.ref_gzi and not _gcs_object_exist(pipeline_args.ref_gzi)): + if (pipeline_args.ref_gzi and + _get_gcs_object_size(pipeline_args.ref_gzi) == 0): raise ValueError('Given GZI index file via --ref_gzi does not exist') - if not _gcs_object_exist(pipeline_args.bam): + if _get_gcs_object_size(pipeline_args.bam) == 0: raise ValueError('Given BAM file via --bam does not exist') - if not _gcs_object_exist(pipeline_args.bai): + if _get_gcs_object_size(pipeline_args.bai) == 0: raise ValueError('Given BAM index file via --bai does not exist') # ...and we can write to output buckets. if not _can_write_to_bucket(_get_gcs_bucket(pipeline_args.staging)): diff --git a/gcp_deepvariant_runner_test.py b/gcp_deepvariant_runner_test.py index f18b32f..827c3cb 100644 --- a/gcp_deepvariant_runner_test.py +++ b/gcp_deepvariant_runner_test.py @@ -117,7 +117,7 @@ def setUp(self): @mock.patch('gcp_deepvariant_runner._run_job') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, mock_run_job): @@ -159,7 +159,7 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, @mock.patch('gcp_deepvariant_runner._run_job') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, mock_run_job): @@ -209,7 +209,7 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/postprocess_variants') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -263,7 +263,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, self.assertEqual(mock_apply_async.call_count, 3) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -310,7 +310,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, self.assertEqual(mock_apply_async.call_count, 3) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -354,7 +354,7 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, self.assertEqual(mock_apply_async.call_count, 3) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -403,7 +403,7 @@ def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, @mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None) @mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod') @mock.patch.object(gke_cluster.GkeCluster, '_cluster_exists') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants_TPU(self, mock_can_write_to_bucket, mock_obj_exist, mock_cluster_exists, mock_deploy_pod, mock_init): @@ -459,7 +459,7 @@ def testRunFailCallVariants_TPU(self): gcp_deepvariant_runner.run(self._argv) @mock.patch('gcp_deepvariant_runner._run_job') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist, mock_run_job): @@ -549,15 +549,12 @@ def testRunFailsSetOptimizedFlagsMissingBamFile(self): gcp_deepvariant_runner.run(self._argv) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1 expected_workers = 16 @@ -606,15 +603,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1 expected_workers = 16 @@ -664,15 +658,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 expected_workers = 32 @@ -722,15 +713,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1 expected_workers = 64 @@ -780,15 +768,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 expected_workers = 128 @@ -838,15 +823,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_callVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1 @@ -899,15 +881,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_callVariants( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_callVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1 @@ -960,15 +939,12 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_callVariants( self.assertEqual(mock_apply_async.call_count, expected_workers) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_callVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_pool): + self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 @@ -1023,13 +999,11 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_callVariants( @mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None) @mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod') @mock.patch.object(gke_cluster.GkeCluster, 'delete_cluster') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_callVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, + self, mock_object_size, mock_can_write_to_bucket, mock_delete_cluster, mock_deploy_pod, mock_init): - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1 @@ -1069,13 +1043,11 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_callVariants( @mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None) @mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod') @mock.patch.object(gke_cluster.GkeCluster, 'delete_cluster') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_callVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, + self, mock_object_size, mock_can_write_to_bucket, mock_delete_cluster, mock_deploy_pod, mock_init): - mock_obj_exist.return_value = True mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 @@ -1113,13 +1085,10 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_callVariants( wait=True) @mock.patch('gcp_deepvariant_runner._run_job') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_postProcessVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_run_job): - mock_obj_exist.return_value = True + self, mock_object_size, mock_can_write_to_bucket, mock_run_job): mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 call_variant_workers = 4 @@ -1167,13 +1136,10 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_postProcessVariants( 'gs://bucket/staging/logs/postprocess_variants') @mock.patch('gcp_deepvariant_runner._run_job') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_postProcessVariants( - self, mock_object_size, mock_can_write_to_bucket, mock_obj_exist, - mock_run_job): - mock_obj_exist.return_value = True + self, mock_object_size, mock_can_write_to_bucket, mock_run_job): mock_can_write_to_bucket.return_value = True mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 call_variant_workers = 1