diff --git a/gcp_deepvariant_runner.py b/gcp_deepvariant_runner.py index 082e02d..d76f802 100644 --- a/gcp_deepvariant_runner.py +++ b/gcp_deepvariant_runner.py @@ -185,35 +185,35 @@ class BamCategories(enum.Enum): 'make_examples_workers': 16, 'make_examples_cores_per_worker': 1, 'call_variants_workers': 1, - 'call_variants_cores_per_worker': 4, + 'call_variants_cores_per_worker': 2, 'gpu': True } _DEFAULT_FLAGS[BamCategories.WES_LARGE] = { 'make_examples_workers': 16, 'make_examples_cores_per_worker': 1, 'call_variants_workers': 2, - 'call_variants_cores_per_worker': 4, + 'call_variants_cores_per_worker': 2, 'gpu': True } _DEFAULT_FLAGS[BamCategories.WGS_SMALL] = { 'make_examples_workers': 32, 'make_examples_cores_per_worker': 1, 'call_variants_workers': 2, - 'call_variants_cores_per_worker': 4, + 'call_variants_cores_per_worker': 2, 'gpu': True } _DEFAULT_FLAGS[BamCategories.WGS_MEDIUM] = { 'make_examples_workers': 64, 'make_examples_cores_per_worker': 1, 'call_variants_workers': 4, - 'call_variants_cores_per_worker': 4, + 'call_variants_cores_per_worker': 2, 'gpu': True } _DEFAULT_FLAGS[BamCategories.WGS_LARGE] = { 'make_examples_workers': 128, 'make_examples_cores_per_worker': 1, 'call_variants_workers': 8, - 'call_variants_cores_per_worker': 4, + 'call_variants_cores_per_worker': 2, 'gpu': True } # Common computational flag values across all BAM categories. @@ -510,7 +510,10 @@ def _set_computational_flags_based_on_bam_size(pipeline_args): else: raise ValueError('Either gpu or tpu is needed for default flag settings.') # Following flags are independent of BAM file category. + pipeline_args.gcsfuse = True pipeline_args.preemptible = True + pipeline_args.max_preemptible_tries=2 + pipeline_args.max_non_preemptible_tries=1 if pipeline_args.gvcf_outfile: pipeline_args.postprocess_variants_disk_gb = _POSTPROCESS_VARIANTS_DISK_GVCF @@ -1155,12 +1158,12 @@ def run(argv=None): parser.add_argument( '--postprocess_variants_cores', type=int, - default=8, + default=4, help='Number of cores to use for postprocess_variants.') parser.add_argument( '--postprocess_variants_ram_gb', type=int, - default=30, + default=16, help='RAM (in GB) to use for postprocess_variants.') parser.add_argument( '--postprocess_variants_disk_gb', diff --git a/gcp_deepvariant_runner_test.py b/gcp_deepvariant_runner_test.py index 6005743..94e0f97 100644 --- a/gcp_deepvariant_runner_test.py +++ b/gcp_deepvariant_runner_test.py @@ -667,24 +667,76 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples( 'make_examples', '--set_optimized_flags_based_on_bam_size', '--docker_image_gpu', - 'gcr.io/dockerimage_gpu']) + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) expected_mock_calls = [] for i in range(expected_workers): expected_mock_calls.append( mock.call(mock.ANY, [ - _HasAllOf( - 'gcr.io/dockerimage', 'SHARDS={}'.format(expected_shards), - 'SHARD_START_INDEX={}'.format(i), - 'SHARD_END_INDEX={}'.format(i), - 'gs://bucket/staging/logs/make_examples/{}'.format(i), - '--machine-type', 'custom-{c}-{r}'.format( - c=expected_cores, r=expected_ram), '--disk-size', '200'), + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/0/*', + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), 'gs://bucket/staging/logs/make_examples/{}'.format(i) ])) - gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls(expected_mock_calls,) self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index, + SHARD_END_INDEX=worker_index, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @@ -705,24 +757,77 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples( 'make_examples', '--set_optimized_flags_based_on_bam_size', '--docker_image_gpu', - 'gcr.io/dockerimage_gpu']) + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + expected_mock_calls = [] for i in range(expected_workers): expected_mock_calls.append( mock.call(mock.ANY, [ - _HasAllOf( - 'gcr.io/dockerimage', 'SHARDS={}'.format(expected_shards), - 'SHARD_START_INDEX={}'.format(i), - 'SHARD_END_INDEX={}'.format(i), - 'gs://bucket/staging/logs/make_examples/{}'.format(i), - '--machine-type', 'custom-{c}-{r}'.format( - c=expected_cores, r=expected_ram), '--disk-size', '200'), + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 2))), # 2 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), 'gs://bucket/staging/logs/make_examples/{}'.format(i) ])) - - gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls(expected_mock_calls,) self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index, + SHARD_END_INDEX=worker_index, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @@ -743,29 +848,82 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples( 'make_examples', '--set_optimized_flags_based_on_bam_size', '--docker_image_gpu', - 'gcr.io/dockerimage_gpu']) + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + expected_mock_calls = [] for i in range(expected_workers): expected_mock_calls.append( mock.call(mock.ANY, [ - _HasAllOf( - 'gcr.io/dockerimage', 'SHARDS={}'.format(expected_shards), - 'SHARD_START_INDEX={}'.format(i), - 'SHARD_END_INDEX={}'.format(i), - 'gs://bucket/staging/logs/make_examples/{}'.format(i), - '--machine-type', 'custom-{c}-{r}'.format( - c=expected_cores, r=expected_ram), '--disk-size', '200'), + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 2))), # 2 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), 'gs://bucket/staging/logs/make_examples/{}'.format(i) ])) - - gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls(expected_mock_calls,) self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index, + SHARD_END_INDEX=worker_index, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') - def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples( + def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_makeExamples( self, mock_object_size, mock_can_write_to_bucket, mock_pool): mock_apply_async = mock_pool.return_value.apply_async mock_apply_async.return_value = None @@ -781,24 +939,77 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_make_examples( 'make_examples', '--set_optimized_flags_based_on_bam_size', '--docker_image_gpu', - 'gcr.io/dockerimage_gpu']) + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + expected_mock_calls = [] for i in range(expected_workers): expected_mock_calls.append( mock.call(mock.ANY, [ - _HasAllOf( - 'gcr.io/dockerimage', 'SHARDS={}'.format(expected_shards), - 'SHARD_START_INDEX={}'.format(i), - 'SHARD_END_INDEX={}'.format(i), - 'gs://bucket/staging/logs/make_examples/{}'.format(i), - '--machine-type', 'custom-{c}-{r}'.format( - c=expected_cores, r=expected_ram), '--disk-size', '200'), + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 4))), # 4 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), 'gs://bucket/staging/logs/make_examples/{}'.format(i) ])) - - gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls(expected_mock_calls,) self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index, + SHARD_END_INDEX=worker_index, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') @@ -819,24 +1030,77 @@ def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples( 'make_examples', '--set_optimized_flags_based_on_bam_size', '--docker_image_gpu', - 'gcr.io/dockerimage_gpu']) + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + expected_mock_calls = [] for i in range(expected_workers): expected_mock_calls.append( mock.call(mock.ANY, [ - _HasAllOf( - 'gcr.io/dockerimage', 'SHARDS={}'.format(expected_shards), - 'SHARD_START_INDEX={}'.format(i), - 'SHARD_END_INDEX={}'.format(i), - 'gs://bucket/staging/logs/make_examples/{}'.format(i), - '--machine-type', 'custom-{c}-{r}'.format( - c=expected_cores, r=expected_ram), '--disk-size', '200'), + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 8))), # 8 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), 'gs://bucket/staging/logs/make_examples/{}'.format(i) ])) - - gcp_deepvariant_runner.run(self._argv) mock_apply_async.assert_has_calls(expected_mock_calls,) self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index)) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index, + SHARD_END_INDEX=worker_index, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket')