From 6c6ee0db09cfff641615dfd1993788ca847e8a0e Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Tue, 4 Mar 2025 11:57:55 -0900 Subject: [PATCH 01/43] start combining functions --- apps/start-execution/src/start-execution.py | 71 +++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 apps/start-execution/src/start-execution.py diff --git a/apps/start-execution/src/start-execution.py b/apps/start-execution/src/start-execution.py new file mode 100644 index 000000000..f0f5104f4 --- /dev/null +++ b/apps/start-execution/src/start-execution.py @@ -0,0 +1,71 @@ +import json +import os + +import boto3 + +import dynamo +from lambda_logging import log_exceptions, logger + +LAMBDA_CLIENT = boto3.client('lambda') + +STEP_FUNCTION = boto3.client('stepfunctions') + +batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' +if batch_params_file.exists(): + BATCH_PARAMS_BY_JOB_TYPE = json.loads(batch_params_file.read_text()) +else: + # Allows mocking with unittest.mock.patch + BATCH_PARAMS_BY_JOB_TYPE = {} + + +def convert_to_string(obj: Any) -> str: + if isinstance(obj, list): + return ' '.join([str(item) for item in obj]) + return str(obj) + + +def get_batch_job_parameters(job: dict) -> dict[str, str]: + # Convert parameters to strings so they can be passed to Batch; see: + # https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html#Batch-SubmitJob-request-parameters + return { + key: convert_to_string(value) + for key, value in job['job_parameters'].items() + if key in BATCH_PARAMS_BY_JOB_TYPE[job['job_type']] + } + + +def submit_jobs(jobs: list[dict]) -> None: + step_function_arn = os.environ['STEP_FUNCTION_ARN'] + logger.info(f'Step function ARN: {step_function_arn}') + for job in jobs: + job['batch_job_parameters'] = get_batch_job_parameters(job) + STEP_FUNCTION.start_execution( + stateMachineArn=step_function_arn, + input=json.dumps(job, sort_keys=True), + name=job['job_id'], + ) + + +def invoke_worker(worker_function_arn: str, jobs: list[dict]) -> dict: + payload = json.dumps({'jobs': dynamo.util.convert_decimals_to_numbers(jobs)}) + return LAMBDA_CLIENT.invoke( + FunctionName=worker_function_arn, + InvocationType='Event', + Payload=payload, + ) + + +@log_exceptions +def lambda_handler(event: dict, _) -> None: + worker_function_arn = os.environ['START_EXECUTION_WORKER_ARN'] + logger.info(f'Worker function ARN: {worker_function_arn}') + + pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) + logger.info(f'Got {len(pending_jobs)} pending jobs') + + batch_size = 250 + for i in range(0, len(pending_jobs), batch_size): + jobs = pending_jobs[i : i + batch_size] + logger.info(f'Invoking worker for {len(jobs)} jobs') + submit_jobs(jobs) + From 1b4cfc2273255a7a488ad3e0fe440d5619a6c4d9 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Tue, 4 Mar 2025 14:24:02 -0900 Subject: [PATCH 02/43] clean up lambda handler --- apps/start-execution/src/start-execution.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/apps/start-execution/src/start-execution.py b/apps/start-execution/src/start-execution.py index f0f5104f4..db803c8e4 100644 --- a/apps/start-execution/src/start-execution.py +++ b/apps/start-execution/src/start-execution.py @@ -1,5 +1,7 @@ import json import os +from pathlib import Path +from typing import Any import boto3 @@ -46,26 +48,13 @@ def submit_jobs(jobs: list[dict]) -> None: ) -def invoke_worker(worker_function_arn: str, jobs: list[dict]) -> dict: - payload = json.dumps({'jobs': dynamo.util.convert_decimals_to_numbers(jobs)}) - return LAMBDA_CLIENT.invoke( - FunctionName=worker_function_arn, - InvocationType='Event', - Payload=payload, - ) - - @log_exceptions -def lambda_handler(event: dict, _) -> None: - worker_function_arn = os.environ['START_EXECUTION_WORKER_ARN'] - logger.info(f'Worker function ARN: {worker_function_arn}') - +def lambda_handler() -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) logger.info(f'Got {len(pending_jobs)} pending jobs') batch_size = 250 for i in range(0, len(pending_jobs), batch_size): - jobs = pending_jobs[i : i + batch_size] + jobs = pending_jobs[i: i + batch_size] logger.info(f'Invoking worker for {len(jobs)} jobs') submit_jobs(jobs) - From eecfdbe6d3d115eb8b13a2f0af8e3d5d4e8a8d0e Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Tue, 4 Mar 2025 16:08:28 -0900 Subject: [PATCH 03/43] update Makefile --- Makefile | 8 +- requirements-apps-start-execution-worker.txt | 2 - ...t => requirements-apps-start-execution.txt | 0 ...ion_manager.py => test_start_execution.py} | 0 tests/test_start_execution_worker.py | 188 ------------------ 5 files changed, 3 insertions(+), 195 deletions(-) delete mode 100644 requirements-apps-start-execution-worker.txt rename requirements-apps-start-execution-manager.txt => requirements-apps-start-execution.txt (100%) rename tests/{test_start_execution_manager.py => test_start_execution.py} (100%) delete mode 100644 tests/test_start_execution_worker.py diff --git a/Makefile b/Makefile index ea68987db..d4c7c2e79 100644 --- a/Makefile +++ b/Makefile @@ -5,13 +5,12 @@ GET_FILES = ${PWD}/apps/get-files/src HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src SET_BATCH_OVERRIDES = ${PWD}/apps/set-batch-overrides/src SCALE_CLUSTER = ${PWD}/apps/scale-cluster/src -START_EXECUTION_MANAGER = ${PWD}/apps/start-execution-manager/src -START_EXECUTION_WORKER = ${PWD}/apps/start-execution-worker/src +START_EXECUTION = ${PWD}/apps/start-execution/src DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src UPDATE_DB = ${PWD}/apps/update-db/src UPLOAD_LOG = ${PWD}/apps/upload-log/src DYNAMO = ${PWD}/lib/dynamo -export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${APPS} +export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${APPS} build: render @@ -19,8 +18,7 @@ build: render python -m pip install --upgrade -r requirements-apps-api-binary.txt --platform manylinux2014_x86_64 --only-binary=:all: -t ${API}; \ python -m pip install --upgrade -r requirements-apps-handle-batch-event.txt -t ${HANDLE_BATCH_EVENT}; \ python -m pip install --upgrade -r requirements-apps-scale-cluster.txt -t ${SCALE_CLUSTER}; \ - python -m pip install --upgrade -r requirements-apps-start-execution-manager.txt -t ${START_EXECUTION_MANAGER}; \ - python -m pip install --upgrade -r requirements-apps-start-execution-worker.txt -t ${START_EXECUTION_WORKER}; \ + python -m pip install --upgrade -r requirements-apps-start-execution.txt -t ${START_EXECUTION}; \ python -m pip install --upgrade -r requirements-apps-disable-private-dns.txt -t ${DISABLE_PRIVATE_DNS}; \ python -m pip install --upgrade -r requirements-apps-update-db.txt -t ${UPDATE_DB} diff --git a/requirements-apps-start-execution-worker.txt b/requirements-apps-start-execution-worker.txt deleted file mode 100644 index 25ebd1a2b..000000000 --- a/requirements-apps-start-execution-worker.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3==1.36.11 -./lib/lambda_logging/ diff --git a/requirements-apps-start-execution-manager.txt b/requirements-apps-start-execution.txt similarity index 100% rename from requirements-apps-start-execution-manager.txt rename to requirements-apps-start-execution.txt diff --git a/tests/test_start_execution_manager.py b/tests/test_start_execution.py similarity index 100% rename from tests/test_start_execution_manager.py rename to tests/test_start_execution.py diff --git a/tests/test_start_execution_worker.py b/tests/test_start_execution_worker.py deleted file mode 100644 index 3b1bd7863..000000000 --- a/tests/test_start_execution_worker.py +++ /dev/null @@ -1,188 +0,0 @@ -import json -import os -from unittest.mock import call, patch - -import start_execution_worker - - -def test_convert_to_string(): - assert start_execution_worker.convert_to_string(1) == '1' - assert start_execution_worker.convert_to_string(True) == 'True' - assert start_execution_worker.convert_to_string([1, 2]) == '1 2' - assert start_execution_worker.convert_to_string(['abc', 'bcd']) == 'abc bcd' - assert start_execution_worker.convert_to_string('abc') == 'abc' - - -def test_submit_jobs(): - batch_params_by_job_type = { - 'JOB_0': ['granules', 'string_field', 'boolean_field', 'float_field', 'integer_field'], - 'JOB_1': ['string_field', 'boolean_field'], - 'JOB_2': [], - } - - jobs = [ - { - 'job_id': 'job0', - 'job_type': 'JOB_0', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - }, - { - 'job_id': 'job1', - 'job_type': 'JOB_1', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - }, - { - 'job_id': 'job2', - 'job_type': 'JOB_2', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - }, - ] - - expected_input_job0 = json.dumps( - { - 'job_id': 'job0', - 'job_type': 'JOB_0', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - 'batch_job_parameters': { - 'granules': 'granule1 granule2', - 'string_field': 'value1', - 'boolean_field': 'True', - 'float_field': '10.1', - 'integer_field': '10', - }, - }, - sort_keys=True, - ) - - expected_input_job1 = json.dumps( - { - 'job_id': 'job1', - 'job_type': 'JOB_1', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - 'batch_job_parameters': { - 'string_field': 'value1', - 'boolean_field': 'True', - }, - }, - sort_keys=True, - ) - - expected_input_job2 = json.dumps( - { - 'job_id': 'job2', - 'job_type': 'JOB_2', - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - 'job_parameters': { - 'granules': [ - 'granule1', - 'granule2', - ], - 'string_field': 'value1', - 'boolean_field': True, - 'float_field': 10.1, - 'integer_field': 10, - }, - 'batch_job_parameters': {}, - }, - sort_keys=True, - ) - - with ( - patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution, - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True), - patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), - ): - start_execution_worker.submit_jobs(jobs) - - assert mock_start_execution.mock_calls == [ - call( - stateMachineArn='test-state-machine-arn', - input=expected_input_job0, - name='job0', - ), - call( - stateMachineArn='test-state-machine-arn', - input=expected_input_job1, - name='job1', - ), - call( - stateMachineArn='test-state-machine-arn', - input=expected_input_job2, - name='job2', - ), - ] - - -def test_lambda_handler(): - with patch('start_execution_worker.submit_jobs') as mock_submit_jobs: - start_execution_worker.lambda_handler({'jobs': [1, 2, 3]}, None) - - assert mock_submit_jobs.mock_calls == [call([1, 2, 3])] From 339d689c6e109f3e1dde2b2fe157b1b1f378e2cd Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Tue, 4 Mar 2025 16:08:47 -0900 Subject: [PATCH 04/43] add in test --- tests/test_start_execution.py | 263 ++++++++++++++++++++++------------ 1 file changed, 168 insertions(+), 95 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 711ff3d10..0c4a7b4ba 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -3,124 +3,197 @@ from decimal import Decimal from unittest.mock import call, patch -import start_execution_manager +import start_execution -def test_invoke_worker(): - jobs: list[dict] = [ +def test_convert_to_string(): + assert start_execution.convert_to_string(1) == '1' + assert start_execution.convert_to_string(True) == 'True' + assert start_execution.convert_to_string([1, 2]) == '1 2' + assert start_execution.convert_to_string(['abc', 'bcd']) == 'abc bcd' + assert start_execution.convert_to_string('abc') == 'abc' + + +def test_submit_jobs(): + batch_params_by_job_type = { + 'JOB_0': ['granules', 'string_field', 'boolean_field', 'float_field', 'integer_field'], + 'JOB_1': ['string_field', 'boolean_field'], + 'JOB_2': [], + } + + jobs = [ { 'job_id': 'job0', - 'decimal_float_field': Decimal('10.1'), - 'integer_float_field': Decimal('10'), + 'job_type': 'JOB_0', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + }, + { + 'job_id': 'job1', + 'job_type': 'JOB_1', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + }, + { + 'job_id': 'job2', + 'job_type': 'JOB_2', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, 'job_parameters': { - 'decimal_float_field': Decimal('10.1'), - 'integer_float_field': Decimal('10'), - 'decimal_list_field': [Decimal('10.1'), Decimal('10')], + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, }, }, - {'job_id': 'job1'}, ] - expected_payload = json.dumps( + + expected_input_job0 = json.dumps( { - 'jobs': [ - { - 'job_id': 'job0', - 'decimal_float_field': 10.1, - 'integer_float_field': 10, - 'job_parameters': { - 'decimal_float_field': 10.1, - 'integer_float_field': 10, - 'decimal_list_field': [10.1, 10], - }, - }, - {'job_id': 'job1'}, - ] - } + 'job_id': 'job0', + 'job_type': 'JOB_0', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': { + 'granules': 'granule1 granule2', + 'string_field': 'value1', + 'boolean_field': 'True', + 'float_field': '10.1', + 'integer_field': '10', + }, + }, + sort_keys=True, ) - with patch('start_execution_manager.LAMBDA_CLIENT.invoke') as mock_invoke: - mock_invoke.return_value = {'foo': 'bar'} - - assert start_execution_manager.invoke_worker('test-worker-arn', jobs) == {'foo': 'bar'} - mock_invoke.assert_called_once_with( - FunctionName='test-worker-arn', - InvocationType='Event', - Payload=expected_payload, - ) + expected_input_job1 = json.dumps( + { + 'job_id': 'job1', + 'job_type': 'JOB_1', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': { + 'string_field': 'value1', + 'boolean_field': 'True', + }, + }, + sort_keys=True, + ) + expected_input_job2 = json.dumps( + { + 'job_id': 'job2', + 'job_type': 'JOB_2', + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + 'job_parameters': { + 'granules': [ + 'granule1', + 'granule2', + ], + 'string_field': 'value1', + 'boolean_field': True, + 'float_field': 10.1, + 'integer_field': 10, + }, + 'batch_job_parameters': {}, + }, + sort_keys=True, + ) -def test_lambda_handler_500_jobs(): with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), + patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution, + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True), + patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), ): - mock_jobs = list(range(500)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_invoke_worker.return_value = {'StatusCode': None} - - start_execution_manager.lambda_handler(None, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:250]), - call('test-worker-function-arn', mock_jobs[250:500]), + start_execution.submit_jobs(jobs) + + assert mock_start_execution.mock_calls == [ + call( + stateMachineArn='test-state-machine-arn', + input=expected_input_job0, + name='job0', + ), + call( + stateMachineArn='test-state-machine-arn', + input=expected_input_job1, + name='job1', + ), + call( + stateMachineArn='test-state-machine-arn', + input=expected_input_job2, + name='job2', + ), ] -def test_lambda_handler_400_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_jobs = list(range(400)) +def test_lambda_handler_500_jobs(): + with patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution: + mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs - mock_invoke_worker.return_value = {'StatusCode': None} + mock_get_jobs_waiting_for_execution.return_value = {'StatusCode': None} - start_execution_manager.lambda_handler(None, None) + start_execution.lambda_handler() mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_invoke_worker.mock_calls == [ + assert mock_get_jobs_waiting_for_execution.mock_calls == [ call('test-worker-function-arn', mock_jobs[0:250]), - call('test-worker-function-arn', mock_jobs[250:400]), - ] - - -def test_lambda_handler_50_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_jobs = list(range(50)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_invoke_worker.return_value = {'StatusCode': None} - - start_execution_manager.lambda_handler(None, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs), + call('test-worker-function-arn', mock_jobs[250:500]), ] - - -def test_lambda_handler_no_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_get_jobs_waiting_for_execution.return_value = [] - - start_execution_manager.lambda_handler(None, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - mock_invoke_worker.assert_not_called() From 2c054de3e05c25cf5fa6f58b56b66f4f4041f5ce Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Tue, 4 Mar 2025 16:15:32 -0900 Subject: [PATCH 05/43] update requirements --- requirements-all.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements-all.txt b/requirements-all.txt index 8f580acb7..10fc9d65e 100644 --- a/requirements-all.txt +++ b/requirements-all.txt @@ -1,8 +1,7 @@ -r requirements-apps-api.txt -r requirements-apps-handle-batch-event.txt -r requirements-apps-scale-cluster.txt --r requirements-apps-start-execution-manager.txt --r requirements-apps-start-execution-worker.txt +-r requirements-apps-start-execution.txt -r requirements-apps-disable-private-dns.txt -r requirements-apps-update-db.txt boto3==1.36.11 From 129a653a7b3dc7ba116d92e2886fb477d084c2b3 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 14:47:55 -0900 Subject: [PATCH 06/43] rename file --- .../src/{start-execution.py => start_execution.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename apps/start-execution/src/{start-execution.py => start_execution.py} (100%) diff --git a/apps/start-execution/src/start-execution.py b/apps/start-execution/src/start_execution.py similarity index 100% rename from apps/start-execution/src/start-execution.py rename to apps/start-execution/src/start_execution.py From 5c8ab558c5954fed7fd5b11e70818530f6fa4364 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 14:58:26 -0900 Subject: [PATCH 07/43] update test --- tests/test_start_execution.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 0c4a7b4ba..e76a215e7 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -157,9 +157,9 @@ def test_submit_jobs(): ) with ( - patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution, + patch('start_execution.STEP_FUNCTION.start_execution') as mock_start_execution, patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True), - patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), + patch('start_execution.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), ): start_execution.submit_jobs(jobs) @@ -189,7 +189,7 @@ def test_lambda_handler_500_jobs(): mock_get_jobs_waiting_for_execution.return_value = {'StatusCode': None} - start_execution.lambda_handler() + start_execution.lambda_handler(None, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) From 80a4868f3ffbe3cef18f85a8231b25b001651ac4 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 15:05:59 -0900 Subject: [PATCH 08/43] add in missing lambda handler args --- apps/start-execution/src/start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index db803c8e4..adeb145bb 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -49,7 +49,7 @@ def submit_jobs(jobs: list[dict]) -> None: @log_exceptions -def lambda_handler() -> None: +def lambda_handler(event: dict, _) -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) logger.info(f'Got {len(pending_jobs)} pending jobs') From fff557947b027708b25bed571f3ae5fa65a578e1 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 15:28:16 -0900 Subject: [PATCH 09/43] add in mock submit jobs --- tests/test_start_execution.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index e76a215e7..ecd9355b6 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -183,7 +183,10 @@ def test_submit_jobs(): def test_lambda_handler_500_jobs(): - with patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution: + with ( + patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('start_execution.submit_jobs') as mock_submit_jobs + ): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs From d0b640ab9fc087cd42c91a48b0d6fac8678fb12b Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:02:27 -0900 Subject: [PATCH 10/43] add in mock submit jobs --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index ecd9355b6..1fc86395c 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -185,7 +185,7 @@ def test_submit_jobs(): def test_lambda_handler_500_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution.submit_jobs') as mock_submit_jobs + patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True) ): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs From edc7c1ddd393f804d09343ac19f64933dc0e0f2f Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:05:29 -0900 Subject: [PATCH 11/43] add in mock submit jobs --- tests/test_start_execution.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 1fc86395c..7e7873e1f 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -190,8 +190,6 @@ def test_lambda_handler_500_jobs(): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs - mock_get_jobs_waiting_for_execution.return_value = {'StatusCode': None} - start_execution.lambda_handler(None, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) From ff7a361cb2018a4f3e4f9dee5812de7157ab513c Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:09:10 -0900 Subject: [PATCH 12/43] add in mock submit jobs --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 7e7873e1f..ae8e5e8be 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -185,7 +185,7 @@ def test_submit_jobs(): def test_lambda_handler_500_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True) + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) ): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs From ca17b5410e103f64e4d3dd044213df73ba11f739 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:11:38 -0900 Subject: [PATCH 13/43] add in mock submit jobs --- tests/test_start_execution.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index ae8e5e8be..53ae80c35 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -185,6 +185,7 @@ def test_submit_jobs(): def test_lambda_handler_500_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) ): mock_jobs = list(range(500)) From ff62f6c5c90033882c3a896daf8d6d0aa353aace Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:18:39 -0900 Subject: [PATCH 14/43] add in mock submit jobs --- tests/test_start_execution.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 53ae80c35..1004ef713 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -185,12 +185,15 @@ def test_submit_jobs(): def test_lambda_handler_500_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('start_execution.submit_jobs') as mock_submit_jobs, patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) ): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs + mock_submit_jobs.return_value = None + start_execution.lambda_handler(None, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) From 1e3ea7e0569deb3d7aadbf1ee2b3641862fecd60 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:40:04 -0900 Subject: [PATCH 15/43] add in mock submit jobs --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 1004ef713..4c39e5072 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -198,7 +198,7 @@ def test_lambda_handler_500_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_get_jobs_waiting_for_execution.mock_calls == [ + assert mock_submit_jobs.mock_calls == [ call('test-worker-function-arn', mock_jobs[0:250]), call('test-worker-function-arn', mock_jobs[250:500]), ] From c4edbfd2357b76cf29c9b3a30b6bad79c8817d0f Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Wed, 5 Mar 2025 16:43:32 -0900 Subject: [PATCH 16/43] add in mock submit jobs --- tests/test_start_execution.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 4c39e5072..76967e8a4 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -199,6 +199,6 @@ def test_lambda_handler_500_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) assert mock_submit_jobs.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:250]), - call('test-worker-function-arn', mock_jobs[250:500]), + call(mock_jobs[0:250]), + call(mock_jobs[250:500]), ] From fc97a77d5a287fbd06a1457c7f0b609f1babd977 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 09:43:41 -0900 Subject: [PATCH 17/43] add in mock submit jobs --- tests/test_start_execution.py | 61 +++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 76967e8a4..4aff8df2e 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -202,3 +202,64 @@ def test_lambda_handler_500_jobs(): call(mock_jobs[0:250]), call(mock_jobs[250:500]), ] + + +def test_lambda_handler_400_jobs(): + with ( + patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('start_execution.submit_jobs') as mock_submit_jobs, + patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + ): + mock_jobs = list(range(400)) + mock_get_jobs_waiting_for_execution.return_value = mock_jobs + + mock_submit_jobs.return_value = None + + start_execution.lambda_handler(None, None) + + mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) + + assert mock_submit_jobs.mock_calls == [ + call(mock_jobs[0:250]), + call(mock_jobs[250:400]), + ] + + +def test_lambda_handler_50_jobs(): + with ( + patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('start_execution.submit_jobs') as mock_submit_jobs, + patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + ): + mock_jobs = list(range(50)) + mock_get_jobs_waiting_for_execution.return_value = mock_jobs + + mock_submit_jobs.return_value = None + + start_execution.lambda_handler(None, None) + + mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) + + assert mock_submit_jobs.mock_calls == [ + call(mock_jobs[0:50]) + ] + + +def test_lambda_handler_no_jobs(): + with ( + patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('start_execution.submit_jobs') as mock_submit_jobs, + patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + ): + mock_get_jobs_waiting_for_execution.return_value = [] + + mock_submit_jobs.return_value = None + + start_execution.lambda_handler(None, None) + + mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) + + mock_submit_jobs.assert_not_called() From 5f8dc15a0f093a682c217eca0f6c659e9ea7f96f Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 09:45:49 -0900 Subject: [PATCH 18/43] ruff check --- apps/start-execution/src/start_execution.py | 2 +- tests/test_start_execution.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index adeb145bb..2e85833de 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -55,6 +55,6 @@ def lambda_handler(event: dict, _) -> None: batch_size = 250 for i in range(0, len(pending_jobs), batch_size): - jobs = pending_jobs[i: i + batch_size] + jobs = pending_jobs[i : i + batch_size] logger.info(f'Invoking worker for {len(jobs)} jobs') submit_jobs(jobs) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 4aff8df2e..1e078fb81 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -187,7 +187,7 @@ def test_lambda_handler_500_jobs(): patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(500)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs @@ -209,7 +209,7 @@ def test_lambda_handler_400_jobs(): patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(400)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs @@ -231,7 +231,7 @@ def test_lambda_handler_50_jobs(): patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(50)) mock_get_jobs_waiting_for_execution.return_value = mock_jobs @@ -242,9 +242,7 @@ def test_lambda_handler_50_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == [ - call(mock_jobs[0:50]) - ] + assert mock_submit_jobs.mock_calls == [call(mock_jobs[0:50])] def test_lambda_handler_no_jobs(): @@ -252,7 +250,7 @@ def test_lambda_handler_no_jobs(): patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True) + patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_get_jobs_waiting_for_execution.return_value = [] From aea8bd4599bc44c2aa212636fdce2ff9de397659 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 09:51:18 -0900 Subject: [PATCH 19/43] ruff check --- apps/start-execution/src/start_execution.py | 2 +- tests/test_start_execution.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 2e85833de..5238d9215 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -4,10 +4,10 @@ from typing import Any import boto3 - import dynamo from lambda_logging import log_exceptions, logger + LAMBDA_CLIENT = boto3.client('lambda') STEP_FUNCTION = boto3.client('stepfunctions') diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 1e078fb81..cda72ada5 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -1,6 +1,5 @@ import json import os -from decimal import Decimal from unittest.mock import call, patch import start_execution From 7a69bddf092bc2ecbf3747cb98a1b2b3390d4046 Mon Sep 17 00:00:00 2001 From: Andrew Johnston Date: Thu, 6 Mar 2025 11:08:25 -0900 Subject: [PATCH 20/43] combine cloudformation templates for start-execution --- apps/render_cf.py | 2 +- .../src/start_execution_manager.py | 35 ------- .../src/start_execution_worker.py | 54 ----------- .../start-execution-worker-cf.yml.j2 | 93 ------------------- .../start-execution-cf.yml.j2} | 15 ++- apps/workflow-cf.yml.j2 | 15 +-- 6 files changed, 10 insertions(+), 204 deletions(-) delete mode 100644 apps/start-execution-manager/src/start_execution_manager.py delete mode 100644 apps/start-execution-worker/src/start_execution_worker.py delete mode 100644 apps/start-execution-worker/start-execution-worker-cf.yml.j2 rename apps/{start-execution-manager/start-execution-manager-cf.yml.j2 => start-execution/start-execution-cf.yml.j2} (91%) diff --git a/apps/render_cf.py b/apps/render_cf.py index cf47ecdc9..fec5f226f 100644 --- a/apps/render_cf.py +++ b/apps/render_cf.py @@ -195,7 +195,7 @@ def render_batch_params_by_job_type(job_types: dict) -> None: for step in job_spec['steps']: params.update(get_batch_param_names_for_job_step(step)) batch_params_by_job_type[job_type] = list(params) - with (Path('apps') / 'start-execution-worker' / 'src' / 'batch_params_by_job_type.json').open('w') as f: + with (Path('apps') / 'start-execution' / 'src' / 'batch_params_by_job_type.json').open('w') as f: json.dump(batch_params_by_job_type, f, indent=2) diff --git a/apps/start-execution-manager/src/start_execution_manager.py b/apps/start-execution-manager/src/start_execution_manager.py deleted file mode 100644 index 185f3f080..000000000 --- a/apps/start-execution-manager/src/start_execution_manager.py +++ /dev/null @@ -1,35 +0,0 @@ -import json -import os - -import boto3 - -import dynamo -from lambda_logging import log_exceptions, logger - - -LAMBDA_CLIENT = boto3.client('lambda') - - -def invoke_worker(worker_function_arn: str, jobs: list[dict]) -> dict: - payload = json.dumps({'jobs': dynamo.util.convert_decimals_to_numbers(jobs)}) - return LAMBDA_CLIENT.invoke( - FunctionName=worker_function_arn, - InvocationType='Event', - Payload=payload, - ) - - -@log_exceptions -def lambda_handler(event: dict, _) -> None: - worker_function_arn = os.environ['START_EXECUTION_WORKER_ARN'] - logger.info(f'Worker function ARN: {worker_function_arn}') - - pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) - logger.info(f'Got {len(pending_jobs)} pending jobs') - - batch_size = 250 - for i in range(0, len(pending_jobs), batch_size): - jobs = pending_jobs[i : i + batch_size] - logger.info(f'Invoking worker for {len(jobs)} jobs') - response = invoke_worker(worker_function_arn, jobs) - logger.info(f'Got response status code {response["StatusCode"]}') diff --git a/apps/start-execution-worker/src/start_execution_worker.py b/apps/start-execution-worker/src/start_execution_worker.py deleted file mode 100644 index b3a9d083a..000000000 --- a/apps/start-execution-worker/src/start_execution_worker.py +++ /dev/null @@ -1,54 +0,0 @@ -import json -import os -from pathlib import Path -from typing import Any - -import boto3 - -from lambda_logging import log_exceptions, logger - - -STEP_FUNCTION = boto3.client('stepfunctions') - -batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' -if batch_params_file.exists(): - BATCH_PARAMS_BY_JOB_TYPE = json.loads(batch_params_file.read_text()) -else: - # Allows mocking with unittest.mock.patch - BATCH_PARAMS_BY_JOB_TYPE = {} - - -def convert_to_string(obj: Any) -> str: - if isinstance(obj, list): - return ' '.join([str(item) for item in obj]) - return str(obj) - - -def get_batch_job_parameters(job: dict) -> dict[str, str]: - # Convert parameters to strings so they can be passed to Batch; see: - # https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html#Batch-SubmitJob-request-parameters - return { - key: convert_to_string(value) - for key, value in job['job_parameters'].items() - if key in BATCH_PARAMS_BY_JOB_TYPE[job['job_type']] - } - - -def submit_jobs(jobs: list[dict]) -> None: - step_function_arn = os.environ['STEP_FUNCTION_ARN'] - logger.info(f'Step function ARN: {step_function_arn}') - for job in jobs: - job['batch_job_parameters'] = get_batch_job_parameters(job) - STEP_FUNCTION.start_execution( - stateMachineArn=step_function_arn, - input=json.dumps(job, sort_keys=True), - name=job['job_id'], - ) - - -@log_exceptions -def lambda_handler(event: dict, _) -> None: - jobs = event['jobs'] - logger.info(f'Submitting {len(jobs)} jobs') - submit_jobs(jobs) - logger.info('Successfully submitted jobs') diff --git a/apps/start-execution-worker/start-execution-worker-cf.yml.j2 b/apps/start-execution-worker/start-execution-worker-cf.yml.j2 deleted file mode 100644 index 808bd2abd..000000000 --- a/apps/start-execution-worker/start-execution-worker-cf.yml.j2 +++ /dev/null @@ -1,93 +0,0 @@ -AWSTemplateFormatVersion: 2010-09-09 - -Parameters: - - StepFunctionArn: - Type: String - - {% if security_environment == 'EDC' %} - SecurityGroupId: - Type: String - - SubnetIds: - Type: CommaDelimitedList - {% endif %} - -Outputs: - - LambdaArn: - Value: !GetAtt Lambda.Arn - -Resources: - - LogGroup: - Type: AWS::Logs::LogGroup - Properties: - LogGroupName: !Sub "/aws/lambda/${Lambda}" - RetentionInDays: 90 - - Role: - Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplRole::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - Action: sts:AssumeRole - Principal: - Service: lambda.amazonaws.com - Effect: Allow - ManagedPolicyArns: - - !Ref Policy - {% if security_environment == 'EDC' %} - - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole - {% endif %} - - Policy: - Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - logs:CreateLogStream - - logs:PutLogEvents - Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" - - Effect: Allow - Action: states:StartExecution - Resource: !Ref StepFunctionArn - - Lambda: - Type: AWS::Lambda::Function - Properties: - Environment: - Variables: - STEP_FUNCTION_ARN: !Ref StepFunctionArn - Code: src/ - Handler: start_execution_worker.lambda_handler - MemorySize: 128 - Role: !GetAtt Role.Arn - Runtime: python3.13 - Timeout: 45 - {% if security_environment == 'EDC' %} - VpcConfig: - SecurityGroupIds: - - !Ref SecurityGroupId - SubnetIds: !Ref SubnetIds - {% endif %} - - EventInvokeConfig: - Type: AWS::Lambda::EventInvokeConfig - Properties: - FunctionName: !Ref Lambda - Qualifier: $LATEST - MaximumRetryAttempts: 0 - MaximumEventAgeInSeconds: 60 diff --git a/apps/start-execution-manager/start-execution-manager-cf.yml.j2 b/apps/start-execution/start-execution-cf.yml.j2 similarity index 91% rename from apps/start-execution-manager/start-execution-manager-cf.yml.j2 rename to apps/start-execution/start-execution-cf.yml.j2 index ab760a447..86e7d0fc1 100644 --- a/apps/start-execution-manager/start-execution-manager-cf.yml.j2 +++ b/apps/start-execution/start-execution-cf.yml.j2 @@ -2,10 +2,10 @@ AWSTemplateFormatVersion: 2010-09-09 Parameters: - JobsTable: + StepFunctionArn: Type: String - StartExecutionWorkerArn: + JobsTable: Type: String {% if security_environment == 'EDC' %} @@ -63,9 +63,8 @@ Resources: Action: dynamodb:Query Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${JobsTable}*" - Effect: Allow - Action: - - lambda:InvokeFunction - Resource: !Ref StartExecutionWorkerArn + Action: states:StartExecution + Resource: !Ref StepFunctionArn Lambda: Type: AWS::Lambda::Function @@ -73,13 +72,13 @@ Resources: Environment: Variables: JOBS_TABLE_NAME: !Ref JobsTable - START_EXECUTION_WORKER_ARN: !Ref StartExecutionWorkerArn + STEP_FUNCTION_ARN: !Ref StepFunctionArn Code: src/ - Handler: start_execution_manager.lambda_handler + Handler: start_execution.lambda_handler MemorySize: 128 Role: !GetAtt Role.Arn Runtime: python3.13 - Timeout: 10 + Timeout: 55 {% if security_environment == 'EDC' %} VpcConfig: SecurityGroupIds: diff --git a/apps/workflow-cf.yml.j2 b/apps/workflow-cf.yml.j2 index 011bbf813..7b5a542b8 100644 --- a/apps/workflow-cf.yml.j2 +++ b/apps/workflow-cf.yml.j2 @@ -206,28 +206,17 @@ Resources: {% endif %} TemplateURL: check-processing-time/check-processing-time-cf.yml - StartExecutionManager: + StartExecution: Type: AWS::CloudFormation::Stack Properties: Parameters: JobsTable: !Ref JobsTable - StartExecutionWorkerArn: !GetAtt StartExecutionWorker.Outputs.LambdaArn - {% if security_environment == 'EDC' %} - SecurityGroupId: !Ref SecurityGroupId - SubnetIds: !Join [",", !Ref SubnetIds] - {% endif %} - TemplateURL: start-execution-manager/start-execution-manager-cf.yml - - StartExecutionWorker: - Type: AWS::CloudFormation::Stack - Properties: - Parameters: StepFunctionArn: !Ref StepFunction {% if security_environment == 'EDC' %} SecurityGroupId: !Ref SecurityGroupId SubnetIds: !Join [",", !Ref SubnetIds] {% endif %} - TemplateURL: start-execution-worker/start-execution-worker-cf.yml + TemplateURL: start-execution/start-execution-cf.yml UploadLog: Type: AWS::CloudFormation::Stack From ee1c499ea08bf0e729fd7e70fd549a5094b19364 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 11:42:53 -0900 Subject: [PATCH 21/43] add in cf template --- apps/start-execution/start_execution_cf.yml | 87 +++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 apps/start-execution/start_execution_cf.yml diff --git a/apps/start-execution/start_execution_cf.yml b/apps/start-execution/start_execution_cf.yml new file mode 100644 index 000000000..57981738e --- /dev/null +++ b/apps/start-execution/start_execution_cf.yml @@ -0,0 +1,87 @@ +WSTemplateFormatVersion: 2010-09-09 + +Parameters: + + JobsTable: + Type: String + + StartExecutionWorkerArn: + Type: String + + +Resources: + + LogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${Lambda}" + RetentionInDays: 90 + + Role: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + Action: sts:AssumeRole + Principal: + Service: lambda.amazonaws.com + Effect: Allow + ManagedPolicyArns: + - !Ref Policy + + Policy: + Type: AWS::IAM::ManagedPolicy + Properties: + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" + - Effect: Allow + Action: dynamodb:Query + Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${JobsTable}*" + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Ref StartExecutionWorkerArn + + Lambda: + Type: AWS::Lambda::Function + Properties: + Environment: + Variables: + JOBS_TABLE_NAME: !Ref JobsTable + START_EXECUTION_WORKER_ARN: !Ref StartExecutionWorkerArn + Code: src/ + Handler: start_execution_manager.lambda_handler + MemorySize: 128 + Role: !GetAtt Role.Arn + Runtime: python3.13 + Timeout: 10 + + EventInvokeConfig: + Type: AWS::Lambda::EventInvokeConfig + Properties: + FunctionName: !Ref Lambda + Qualifier: $LATEST + MaximumRetryAttempts: 0 + + Schedule: + Type: AWS::Events::Rule + Properties: + ScheduleExpression: "rate(1 minute)" + Targets: + - Arn: !GetAtt Lambda.Arn + Id: lambda + + EventPermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !GetAtt Lambda.Arn + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt Schedule.Arn From 82ef12f9529dc5dae139928c52a8690f844b84d6 Mon Sep 17 00:00:00 2001 From: Jacquelyn Smale <34557291+jacquelynsmale@users.noreply.github.com> Date: Thu, 6 Mar 2025 11:55:38 -0900 Subject: [PATCH 22/43] Update apps/start-execution/src/start_execution.py Co-authored-by: Andrew Johnston --- apps/start-execution/src/start_execution.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 5238d9215..5f9e59fbd 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -53,8 +53,4 @@ def lambda_handler(event: dict, _) -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) logger.info(f'Got {len(pending_jobs)} pending jobs') - batch_size = 250 - for i in range(0, len(pending_jobs), batch_size): - jobs = pending_jobs[i : i + batch_size] - logger.info(f'Invoking worker for {len(jobs)} jobs') - submit_jobs(jobs) + submit_jobs(jobs) From 260523d3952fe41de95196672e0a3f2e02c0b169 Mon Sep 17 00:00:00 2001 From: Jacquelyn Smale <34557291+jacquelynsmale@users.noreply.github.com> Date: Thu, 6 Mar 2025 11:55:45 -0900 Subject: [PATCH 23/43] Update apps/start-execution/src/start_execution.py Co-authored-by: Andrew Johnston --- apps/start-execution/src/start_execution.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 5f9e59fbd..7db1e96e5 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -51,6 +51,7 @@ def submit_jobs(jobs: list[dict]) -> None: @log_exceptions def lambda_handler(event: dict, _) -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) + pending_jobs = dynamo.util.convert_decimals_to_numbers(pending_jobs) logger.info(f'Got {len(pending_jobs)} pending jobs') submit_jobs(jobs) From 2da8e507fa446082a315b3f300bbd36d91c3c768 Mon Sep 17 00:00:00 2001 From: Jacquelyn Smale <34557291+jacquelynsmale@users.noreply.github.com> Date: Thu, 6 Mar 2025 11:55:53 -0900 Subject: [PATCH 24/43] Update apps/start-execution/src/start_execution.py Co-authored-by: Andrew Johnston --- apps/start-execution/src/start_execution.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 7db1e96e5..d53956e2c 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -8,8 +8,6 @@ from lambda_logging import log_exceptions, logger -LAMBDA_CLIENT = boto3.client('lambda') - STEP_FUNCTION = boto3.client('stepfunctions') batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' From c255e7fc52d145aa8c181338816aed2f5c1c6b65 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:25:30 -0900 Subject: [PATCH 25/43] fix improts --- apps/start-execution/src/start_execution.py | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index d53956e2c..569d8639e 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -7,7 +7,6 @@ import dynamo from lambda_logging import log_exceptions, logger - STEP_FUNCTION = boto3.client('stepfunctions') batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' From 1faa0b453c81cb6f7eee25193b4b227ef00dda1c Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:31:18 -0900 Subject: [PATCH 26/43] fix typo --- apps/start-execution/src/start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 569d8639e..f7572b140 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -51,4 +51,4 @@ def lambda_handler(event: dict, _) -> None: pending_jobs = dynamo.util.convert_decimals_to_numbers(pending_jobs) logger.info(f'Got {len(pending_jobs)} pending jobs') - submit_jobs(jobs) + submit_jobs(pending_jobs) From 34b612b5147e68ae8bc028aed627b39fdd827437 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:33:51 -0900 Subject: [PATCH 27/43] fix tests --- tests/test_start_execution.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index cda72ada5..a31aa3198 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -197,10 +197,7 @@ def test_lambda_handler_500_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == [ - call(mock_jobs[0:250]), - call(mock_jobs[250:500]), - ] + assert mock_submit_jobs.mock_calls == [call(mock_jobs)] def test_lambda_handler_400_jobs(): @@ -219,10 +216,7 @@ def test_lambda_handler_400_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == [ - call(mock_jobs[0:250]), - call(mock_jobs[250:400]), - ] + assert mock_submit_jobs.mock_calls == [call(mock_jobs)] def test_lambda_handler_50_jobs(): @@ -241,7 +235,7 @@ def test_lambda_handler_50_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == [call(mock_jobs[0:50])] + assert mock_submit_jobs.mock_calls == [call(mock_jobs)] def test_lambda_handler_no_jobs(): From f4870796c807c433794fea3d3f5b60df16239cae Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:36:22 -0900 Subject: [PATCH 28/43] fix tests --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index a31aa3198..bbeb78071 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -253,4 +253,4 @@ def test_lambda_handler_no_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - mock_submit_jobs.assert_not_called() + assert mock_submit_jobs.mock_calls == [call()] From 83ea22126862051b7c17c570be7962dbbe3a898c Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:39:31 -0900 Subject: [PATCH 29/43] fix tests --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index bbeb78071..2df7bf590 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -253,4 +253,4 @@ def test_lambda_handler_no_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == [call()] + assert mock_submit_jobs.mock_calls == call([]) From fe56877f8f4bf1738d533f1002968ae714f3d7ea Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:43:37 -0900 Subject: [PATCH 30/43] fix tests --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 2df7bf590..5444fb13d 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -253,4 +253,4 @@ def test_lambda_handler_no_jobs(): mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - assert mock_submit_jobs.mock_calls == call([]) + assert mock_submit_jobs.mock_calls == [call([])] From 47c0b7ed93c98a2ceebd6ff05bc0f700477f54de Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 12:46:36 -0900 Subject: [PATCH 31/43] fix imports --- apps/start-execution/src/start_execution.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index f7572b140..3dcb975c0 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -7,6 +7,7 @@ import dynamo from lambda_logging import log_exceptions, logger + STEP_FUNCTION = boto3.client('stepfunctions') batch_params_file = Path(__file__).parent / 'batch_params_by_job_type.json' From 3d8c3cd075f00764dba54e4b0d93c70f9ff195aa Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 13:01:46 -0900 Subject: [PATCH 32/43] fix imports --- apps/start-execution/src/start_execution.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 3dcb975c0..620a33e07 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -4,6 +4,7 @@ from typing import Any import boto3 + import dynamo from lambda_logging import log_exceptions, logger From 273610fe9daa6ca79ddf438b0dbc46a4dcd3e963 Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 13:28:55 -0900 Subject: [PATCH 33/43] fix merge conflict --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index d4c7c2e79..d352a452c 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,8 @@ DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src UPDATE_DB = ${PWD}/apps/update-db/src UPLOAD_LOG = ${PWD}/apps/upload-log/src DYNAMO = ${PWD}/lib/dynamo -export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${APPS} +LAMBDA_LOGGING = ${PWD}/lib/lambda_logging +export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${LAMBDA_LOGGING}:${APPS} build: render From 7b52f1b798779d07d3b53b3bcb89a611812f5bab Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 14:53:34 -0900 Subject: [PATCH 34/43] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa151e628..bc245a511 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.5.4] + +### Changed +- Combined `start-execution-worker` and `start-execution-manager` into one `start-execution` python script and cf template. + ## [9.5.3] ### Fixed From 37be2a7e731efb54b634e3fee73f565ca595361d Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 15:06:12 -0900 Subject: [PATCH 35/43] add in lambda dict --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 5444fb13d..5783ac20c 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -193,7 +193,7 @@ def test_lambda_handler_500_jobs(): mock_submit_jobs.return_value = None - start_execution.lambda_handler(None, None) + start_execution.lambda_handler(dict(None), None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) From dbc6868a5acddf5069b2b9d2414c41a432a6378d Mon Sep 17 00:00:00 2001 From: jacquelynsmale Date: Thu, 6 Mar 2025 15:09:06 -0900 Subject: [PATCH 36/43] add in lambda dict --- tests/test_start_execution.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 5783ac20c..02fbb8274 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -193,7 +193,7 @@ def test_lambda_handler_500_jobs(): mock_submit_jobs.return_value = None - start_execution.lambda_handler(dict(None), None) + start_execution.lambda_handler({}, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) @@ -212,7 +212,7 @@ def test_lambda_handler_400_jobs(): mock_submit_jobs.return_value = None - start_execution.lambda_handler(None, None) + start_execution.lambda_handler({}, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) @@ -231,7 +231,7 @@ def test_lambda_handler_50_jobs(): mock_submit_jobs.return_value = None - start_execution.lambda_handler(None, None) + start_execution.lambda_handler({}, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) @@ -249,7 +249,7 @@ def test_lambda_handler_no_jobs(): mock_submit_jobs.return_value = None - start_execution.lambda_handler(None, None) + start_execution.lambda_handler({}, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) From dcdc98c6bb5b717b014349232533dd7c518f9f90 Mon Sep 17 00:00:00 2001 From: Andrew Johnston Date: Fri, 7 Mar 2025 09:48:16 -0900 Subject: [PATCH 37/43] remove extraneous rendered .cf template --- apps/start-execution/start_execution_cf.yml | 87 --------------------- 1 file changed, 87 deletions(-) delete mode 100644 apps/start-execution/start_execution_cf.yml diff --git a/apps/start-execution/start_execution_cf.yml b/apps/start-execution/start_execution_cf.yml deleted file mode 100644 index 57981738e..000000000 --- a/apps/start-execution/start_execution_cf.yml +++ /dev/null @@ -1,87 +0,0 @@ -WSTemplateFormatVersion: 2010-09-09 - -Parameters: - - JobsTable: - Type: String - - StartExecutionWorkerArn: - Type: String - - -Resources: - - LogGroup: - Type: AWS::Logs::LogGroup - Properties: - LogGroupName: !Sub "/aws/lambda/${Lambda}" - RetentionInDays: 90 - - Role: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - Action: sts:AssumeRole - Principal: - Service: lambda.amazonaws.com - Effect: Allow - ManagedPolicyArns: - - !Ref Policy - - Policy: - Type: AWS::IAM::ManagedPolicy - Properties: - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - logs:CreateLogStream - - logs:PutLogEvents - Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" - - Effect: Allow - Action: dynamodb:Query - Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${JobsTable}*" - - Effect: Allow - Action: - - lambda:InvokeFunction - Resource: !Ref StartExecutionWorkerArn - - Lambda: - Type: AWS::Lambda::Function - Properties: - Environment: - Variables: - JOBS_TABLE_NAME: !Ref JobsTable - START_EXECUTION_WORKER_ARN: !Ref StartExecutionWorkerArn - Code: src/ - Handler: start_execution_manager.lambda_handler - MemorySize: 128 - Role: !GetAtt Role.Arn - Runtime: python3.13 - Timeout: 10 - - EventInvokeConfig: - Type: AWS::Lambda::EventInvokeConfig - Properties: - FunctionName: !Ref Lambda - Qualifier: $LATEST - MaximumRetryAttempts: 0 - - Schedule: - Type: AWS::Events::Rule - Properties: - ScheduleExpression: "rate(1 minute)" - Targets: - - Arn: !GetAtt Lambda.Arn - Id: lambda - - EventPermission: - Type: AWS::Lambda::Permission - Properties: - FunctionName: !GetAtt Lambda.Arn - Action: lambda:InvokeFunction - Principal: events.amazonaws.com - SourceArn: !GetAtt Schedule.Arn From e4b57fd72b036b42a7ce423de7bd1c6e0fefb35d Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 09:49:34 -0900 Subject: [PATCH 38/43] update gitignore for start-execution --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8796a6956..3c641f3a7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ apps/api/src/hyp3_api/api-spec/job_parameters.yml apps/api/src/hyp3_api/job_validation_map.yml apps/step-function.json apps/**/*-cf.yml -apps/start-execution-worker/src/batch_params_by_job_type.json +apps/start-execution/src/batch_params_by_job_type.json lib/dynamo/dynamo/*.json lib/dynamo/dynamo/*.yml From c46e85780ba13c776e18fa95155017a9a95465cc Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 09:59:55 -0900 Subject: [PATCH 39/43] remove START_EXECUTION_WORKER_ARN patches in tests --- tests/test_start_execution.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 02fbb8274..85a96026d 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -185,7 +185,6 @@ def test_lambda_handler_500_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(500)) @@ -204,7 +203,6 @@ def test_lambda_handler_400_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(400)) @@ -223,7 +221,6 @@ def test_lambda_handler_50_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_jobs = list(range(50)) @@ -242,7 +239,6 @@ def test_lambda_handler_no_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): mock_get_jobs_waiting_for_execution.return_value = [] From 8936274c685622e62d9231120b2f7cf712e55853 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 11:01:37 -0900 Subject: [PATCH 40/43] update start execution lambda handler signature to match our other lambdas --- apps/start-execution/src/start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/start-execution/src/start_execution.py b/apps/start-execution/src/start_execution.py index 620a33e07..de883ab2f 100644 --- a/apps/start-execution/src/start_execution.py +++ b/apps/start-execution/src/start_execution.py @@ -48,7 +48,7 @@ def submit_jobs(jobs: list[dict]) -> None: @log_exceptions -def lambda_handler(event: dict, _) -> None: +def lambda_handler(event: dict, context: Any) -> None: pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) pending_jobs = dynamo.util.convert_decimals_to_numbers(pending_jobs) logger.info(f'Got {len(pending_jobs)} pending jobs') From 528f97715ff8dbc9efe660f5aac55cfac05fa8b8 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 11:35:13 -0900 Subject: [PATCH 41/43] combine start exec lambda handler tests into one --- tests/test_start_execution.py | 67 ++++------------------------------- 1 file changed, 6 insertions(+), 61 deletions(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 85a96026d..2fd4c9d58 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -181,72 +181,17 @@ def test_submit_jobs(): ] -def test_lambda_handler_500_jobs(): +def test_lambda_handler_jobs(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('dynamo.util.convert_decimals_to_numbers') as mock_convert_decimals_to_numbers, patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), ): - mock_jobs = list(range(500)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_submit_jobs.return_value = None - - start_execution.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_submit_jobs.mock_calls == [call(mock_jobs)] - - -def test_lambda_handler_400_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), - ): - mock_jobs = list(range(400)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_submit_jobs.return_value = None - - start_execution.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_submit_jobs.mock_calls == [call(mock_jobs)] - - -def test_lambda_handler_50_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), - ): - mock_jobs = list(range(50)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_submit_jobs.return_value = None + mock_get_jobs_waiting_for_execution.return_value = 'mock_jobs' + mock_convert_decimals_to_numbers.return_value = 'converted_jobs' start_execution.lambda_handler({}, None) mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_submit_jobs.mock_calls == [call(mock_jobs)] - - -def test_lambda_handler_no_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution.submit_jobs') as mock_submit_jobs, - patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-step-function-arn'}, clear=True), - ): - mock_get_jobs_waiting_for_execution.return_value = [] - - mock_submit_jobs.return_value = None - - start_execution.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_submit_jobs.mock_calls == [call([])] + mock_convert_decimals_to_numbers.assert_called_once_with('mock_jobs') + mock_submit_jobs.assert_called_once_with('converted_jobs') From b979e33740aa7caa3ba78c5671a08902d37346f2 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 11:39:56 -0900 Subject: [PATCH 42/43] rename test --- tests/test_start_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_start_execution.py b/tests/test_start_execution.py index 2fd4c9d58..e57b36d87 100644 --- a/tests/test_start_execution.py +++ b/tests/test_start_execution.py @@ -181,7 +181,7 @@ def test_submit_jobs(): ] -def test_lambda_handler_jobs(): +def test_lambda_handler(): with ( patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, patch('dynamo.util.convert_decimals_to_numbers') as mock_convert_decimals_to_numbers, From 749c45c1a1255be84c8275e0cf175a06b00423f6 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 7 Mar 2025 11:49:50 -0900 Subject: [PATCH 43/43] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6904c09f4..cf65df05f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [9.5.4] ### Changed -- Combined `start-execution-worker` and `start-execution-manager` into one `start-execution` python script and cf template. +- Combined `start_execution_worker` and `start_execution_manager` into one `start_execution` AWS Lambda function. ## [9.5.3]