diff --git a/.gitignore b/.gitignore index 8796a6956..3c641f3a7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ apps/api/src/hyp3_api/api-spec/job_parameters.yml apps/api/src/hyp3_api/job_validation_map.yml apps/step-function.json apps/**/*-cf.yml -apps/start-execution-worker/src/batch_params_by_job_type.json +apps/start-execution/src/batch_params_by_job_type.json lib/dynamo/dynamo/*.json lib/dynamo/dynamo/*.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 6281b6244..cf65df05f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.5.4] + +### Changed +- Combined `start_execution_worker` and `start_execution_manager` into one `start_execution` AWS Lambda function. + ## [9.5.3] ### Fixed diff --git a/Makefile b/Makefile index 58769a73e..d352a452c 100644 --- a/Makefile +++ b/Makefile @@ -5,14 +5,13 @@ GET_FILES = ${PWD}/apps/get-files/src HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src SET_BATCH_OVERRIDES = ${PWD}/apps/set-batch-overrides/src SCALE_CLUSTER = ${PWD}/apps/scale-cluster/src -START_EXECUTION_MANAGER = ${PWD}/apps/start-execution-manager/src -START_EXECUTION_WORKER = ${PWD}/apps/start-execution-worker/src +START_EXECUTION = ${PWD}/apps/start-execution/src DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src UPDATE_DB = ${PWD}/apps/update-db/src UPLOAD_LOG = ${PWD}/apps/upload-log/src DYNAMO = ${PWD}/lib/dynamo LAMBDA_LOGGING = ${PWD}/lib/lambda_logging -export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${LAMBDA_LOGGING}:${APPS} +export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${LAMBDA_LOGGING}:${APPS} build: render @@ -20,8 +19,7 @@ build: render python -m pip install --upgrade -r requirements-apps-api-binary.txt --platform manylinux2014_x86_64 --only-binary=:all: -t ${API}; \ python -m pip install --upgrade -r requirements-apps-handle-batch-event.txt -t ${HANDLE_BATCH_EVENT}; \ python -m pip install --upgrade -r requirements-apps-scale-cluster.txt -t ${SCALE_CLUSTER}; \ - python -m pip install --upgrade -r requirements-apps-start-execution-manager.txt -t ${START_EXECUTION_MANAGER}; \ - python -m pip install --upgrade -r requirements-apps-start-execution-worker.txt -t ${START_EXECUTION_WORKER}; \ + python -m pip install --upgrade -r requirements-apps-start-execution.txt -t ${START_EXECUTION}; \ python -m pip install --upgrade -r requirements-apps-disable-private-dns.txt -t ${DISABLE_PRIVATE_DNS}; \ python -m pip install --upgrade -r requirements-apps-update-db.txt -t ${UPDATE_DB} diff --git a/apps/render_cf.py b/apps/render_cf.py index fe07dc422..040a9a591 100644 --- a/apps/render_cf.py +++ b/apps/render_cf.py @@ -195,7 +195,7 @@ def render_batch_params_by_job_type(job_types: dict) -> None: for step in job_spec['steps']: params.update(get_batch_param_names_for_job_step(step)) batch_params_by_job_type[job_type] = list(params) - with (Path('apps') / 'start-execution-worker' / 'src' / 'batch_params_by_job_type.json').open('w') as f: + with (Path('apps') / 'start-execution' / 'src' / 'batch_params_by_job_type.json').open('w') as f: json.dump(batch_params_by_job_type, f, indent=2) diff --git a/apps/start-execution-manager/src/start_execution_manager.py b/apps/start-execution-manager/src/start_execution_manager.py deleted file mode 100644 index 185f3f080..000000000 --- a/apps/start-execution-manager/src/start_execution_manager.py +++ /dev/null @@ -1,35 +0,0 @@ -import json -import os - -import boto3 - -import dynamo -from lambda_logging import log_exceptions, logger - - -LAMBDA_CLIENT = boto3.client('lambda') - - -def invoke_worker(worker_function_arn: str, jobs: list[dict]) -> dict: - payload = json.dumps({'jobs': dynamo.util.convert_decimals_to_numbers(jobs)}) - return LAMBDA_CLIENT.invoke( - FunctionName=worker_function_arn, - InvocationType='Event', - Payload=payload, - ) - - -@log_exceptions -def lambda_handler(event: dict, _) -> None: - worker_function_arn = os.environ['START_EXECUTION_WORKER_ARN'] - logger.info(f'Worker function ARN: {worker_function_arn}') - - pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) - logger.info(f'Got {len(pending_jobs)} pending jobs') - - batch_size = 250 - for i in range(0, len(pending_jobs), batch_size): - jobs = pending_jobs[i : i + batch_size] - logger.info(f'Invoking worker for {len(jobs)} jobs') - response = invoke_worker(worker_function_arn, jobs) - logger.info(f'Got response status code {response["StatusCode"]}') diff --git a/apps/start-execution-worker/start-execution-worker-cf.yml.j2 b/apps/start-execution-worker/start-execution-worker-cf.yml.j2 deleted file mode 100644 index 808bd2abd..000000000 --- a/apps/start-execution-worker/start-execution-worker-cf.yml.j2 +++ /dev/null @@ -1,93 +0,0 @@ -AWSTemplateFormatVersion: 2010-09-09 - -Parameters: - - StepFunctionArn: - Type: String - - {% if security_environment == 'EDC' %} - SecurityGroupId: - Type: String - - SubnetIds: - Type: CommaDelimitedList - {% endif %} - -Outputs: - - LambdaArn: - Value: !GetAtt Lambda.Arn - -Resources: - - LogGroup: - Type: AWS::Logs::LogGroup - Properties: - LogGroupName: !Sub "/aws/lambda/${Lambda}" - RetentionInDays: 90 - - Role: - Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplRole::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - Action: sts:AssumeRole - Principal: - Service: lambda.amazonaws.com - Effect: Allow - ManagedPolicyArns: - - !Ref Policy - {% if security_environment == 'EDC' %} - - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole - {% endif %} - - Policy: - Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - logs:CreateLogStream - - logs:PutLogEvents - Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" - - Effect: Allow - Action: states:StartExecution - Resource: !Ref StepFunctionArn - - Lambda: - Type: AWS::Lambda::Function - Properties: - Environment: - Variables: - STEP_FUNCTION_ARN: !Ref StepFunctionArn - Code: src/ - Handler: start_execution_worker.lambda_handler - MemorySize: 128 - Role: !GetAtt Role.Arn - Runtime: python3.13 - Timeout: 45 - {% if security_environment == 'EDC' %} - VpcConfig: - SecurityGroupIds: - - !Ref SecurityGroupId - SubnetIds: !Ref SubnetIds - {% endif %} - - EventInvokeConfig: - Type: AWS::Lambda::EventInvokeConfig - Properties: - FunctionName: !Ref Lambda - Qualifier: $LATEST - MaximumRetryAttempts: 0 - MaximumEventAgeInSeconds: 60 diff --git a/apps/start-execution-worker/src/start_execution_worker.py b/apps/start-execution/src/start_execution.py similarity index 82% rename from apps/start-execution-worker/src/start_execution_worker.py rename to apps/start-execution/src/start_execution.py index b3a9d083a..de883ab2f 100644 --- a/apps/start-execution-worker/src/start_execution_worker.py +++ b/apps/start-execution/src/start_execution.py @@ -5,6 +5,7 @@ import boto3 +import dynamo from lambda_logging import log_exceptions, logger @@ -47,8 +48,9 @@ def submit_jobs(jobs: list[dict]) -> None: @log_exceptions -def lambda_handler(event: dict, _) -> None: - jobs = event['jobs'] - logger.info(f'Submitting {len(jobs)} jobs') - submit_jobs(jobs) - logger.info('Successfully submitted jobs') +def lambda_handler(event: dict, context: Any) -> None: + pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500) + pending_jobs = dynamo.util.convert_decimals_to_numbers(pending_jobs) + logger.info(f'Got {len(pending_jobs)} pending jobs') + + submit_jobs(pending_jobs) diff --git a/apps/start-execution-manager/start-execution-manager-cf.yml.j2 b/apps/start-execution/start-execution-cf.yml.j2 similarity index 91% rename from apps/start-execution-manager/start-execution-manager-cf.yml.j2 rename to apps/start-execution/start-execution-cf.yml.j2 index ab760a447..86e7d0fc1 100644 --- a/apps/start-execution-manager/start-execution-manager-cf.yml.j2 +++ b/apps/start-execution/start-execution-cf.yml.j2 @@ -2,10 +2,10 @@ AWSTemplateFormatVersion: 2010-09-09 Parameters: - JobsTable: + StepFunctionArn: Type: String - StartExecutionWorkerArn: + JobsTable: Type: String {% if security_environment == 'EDC' %} @@ -63,9 +63,8 @@ Resources: Action: dynamodb:Query Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${JobsTable}*" - Effect: Allow - Action: - - lambda:InvokeFunction - Resource: !Ref StartExecutionWorkerArn + Action: states:StartExecution + Resource: !Ref StepFunctionArn Lambda: Type: AWS::Lambda::Function @@ -73,13 +72,13 @@ Resources: Environment: Variables: JOBS_TABLE_NAME: !Ref JobsTable - START_EXECUTION_WORKER_ARN: !Ref StartExecutionWorkerArn + STEP_FUNCTION_ARN: !Ref StepFunctionArn Code: src/ - Handler: start_execution_manager.lambda_handler + Handler: start_execution.lambda_handler MemorySize: 128 Role: !GetAtt Role.Arn Runtime: python3.13 - Timeout: 10 + Timeout: 55 {% if security_environment == 'EDC' %} VpcConfig: SecurityGroupIds: diff --git a/apps/workflow-cf.yml.j2 b/apps/workflow-cf.yml.j2 index 011bbf813..7b5a542b8 100644 --- a/apps/workflow-cf.yml.j2 +++ b/apps/workflow-cf.yml.j2 @@ -206,28 +206,17 @@ Resources: {% endif %} TemplateURL: check-processing-time/check-processing-time-cf.yml - StartExecutionManager: + StartExecution: Type: AWS::CloudFormation::Stack Properties: Parameters: JobsTable: !Ref JobsTable - StartExecutionWorkerArn: !GetAtt StartExecutionWorker.Outputs.LambdaArn - {% if security_environment == 'EDC' %} - SecurityGroupId: !Ref SecurityGroupId - SubnetIds: !Join [",", !Ref SubnetIds] - {% endif %} - TemplateURL: start-execution-manager/start-execution-manager-cf.yml - - StartExecutionWorker: - Type: AWS::CloudFormation::Stack - Properties: - Parameters: StepFunctionArn: !Ref StepFunction {% if security_environment == 'EDC' %} SecurityGroupId: !Ref SecurityGroupId SubnetIds: !Join [",", !Ref SubnetIds] {% endif %} - TemplateURL: start-execution-worker/start-execution-worker-cf.yml + TemplateURL: start-execution/start-execution-cf.yml UploadLog: Type: AWS::CloudFormation::Stack diff --git a/requirements-all.txt b/requirements-all.txt index 14194893f..3c1dfc053 100644 --- a/requirements-all.txt +++ b/requirements-all.txt @@ -1,8 +1,7 @@ -r requirements-apps-api.txt -r requirements-apps-handle-batch-event.txt -r requirements-apps-scale-cluster.txt --r requirements-apps-start-execution-manager.txt --r requirements-apps-start-execution-worker.txt +-r requirements-apps-start-execution.txt -r requirements-apps-disable-private-dns.txt -r requirements-apps-update-db.txt boto3==1.36.11 diff --git a/requirements-apps-start-execution-worker.txt b/requirements-apps-start-execution-worker.txt deleted file mode 100644 index 25ebd1a2b..000000000 --- a/requirements-apps-start-execution-worker.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3==1.36.11 -./lib/lambda_logging/ diff --git a/requirements-apps-start-execution-manager.txt b/requirements-apps-start-execution.txt similarity index 100% rename from requirements-apps-start-execution-manager.txt rename to requirements-apps-start-execution.txt diff --git a/tests/test_start_execution_worker.py b/tests/test_start_execution.py similarity index 79% rename from tests/test_start_execution_worker.py rename to tests/test_start_execution.py index 3b1bd7863..e57b36d87 100644 --- a/tests/test_start_execution_worker.py +++ b/tests/test_start_execution.py @@ -2,15 +2,15 @@ import os from unittest.mock import call, patch -import start_execution_worker +import start_execution def test_convert_to_string(): - assert start_execution_worker.convert_to_string(1) == '1' - assert start_execution_worker.convert_to_string(True) == 'True' - assert start_execution_worker.convert_to_string([1, 2]) == '1 2' - assert start_execution_worker.convert_to_string(['abc', 'bcd']) == 'abc bcd' - assert start_execution_worker.convert_to_string('abc') == 'abc' + assert start_execution.convert_to_string(1) == '1' + assert start_execution.convert_to_string(True) == 'True' + assert start_execution.convert_to_string([1, 2]) == '1 2' + assert start_execution.convert_to_string(['abc', 'bcd']) == 'abc bcd' + assert start_execution.convert_to_string('abc') == 'abc' def test_submit_jobs(): @@ -156,11 +156,11 @@ def test_submit_jobs(): ) with ( - patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution, + patch('start_execution.STEP_FUNCTION.start_execution') as mock_start_execution, patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True), - patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), + patch('start_execution.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type), ): - start_execution_worker.submit_jobs(jobs) + start_execution.submit_jobs(jobs) assert mock_start_execution.mock_calls == [ call( @@ -182,7 +182,16 @@ def test_submit_jobs(): def test_lambda_handler(): - with patch('start_execution_worker.submit_jobs') as mock_submit_jobs: - start_execution_worker.lambda_handler({'jobs': [1, 2, 3]}, None) + with ( + patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, + patch('dynamo.util.convert_decimals_to_numbers') as mock_convert_decimals_to_numbers, + patch('start_execution.submit_jobs') as mock_submit_jobs, + ): + mock_get_jobs_waiting_for_execution.return_value = 'mock_jobs' + mock_convert_decimals_to_numbers.return_value = 'converted_jobs' + + start_execution.lambda_handler({}, None) - assert mock_submit_jobs.mock_calls == [call([1, 2, 3])] + mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) + mock_convert_decimals_to_numbers.assert_called_once_with('mock_jobs') + mock_submit_jobs.assert_called_once_with('converted_jobs') diff --git a/tests/test_start_execution_manager.py b/tests/test_start_execution_manager.py deleted file mode 100644 index b5dad1c1e..000000000 --- a/tests/test_start_execution_manager.py +++ /dev/null @@ -1,126 +0,0 @@ -import json -import os -from decimal import Decimal -from unittest.mock import call, patch - -import start_execution_manager - - -def test_invoke_worker(): - jobs: list[dict] = [ - { - 'job_id': 'job0', - 'decimal_float_field': Decimal('10.1'), - 'integer_float_field': Decimal('10'), - 'job_parameters': { - 'decimal_float_field': Decimal('10.1'), - 'integer_float_field': Decimal('10'), - 'decimal_list_field': [Decimal('10.1'), Decimal('10')], - }, - }, - {'job_id': 'job1'}, - ] - expected_payload = json.dumps( - { - 'jobs': [ - { - 'job_id': 'job0', - 'decimal_float_field': 10.1, - 'integer_float_field': 10, - 'job_parameters': { - 'decimal_float_field': 10.1, - 'integer_float_field': 10, - 'decimal_list_field': [10.1, 10], - }, - }, - {'job_id': 'job1'}, - ] - } - ) - with patch('start_execution_manager.LAMBDA_CLIENT.invoke') as mock_invoke: - mock_invoke.return_value = {'foo': 'bar'} - - assert start_execution_manager.invoke_worker('test-worker-arn', jobs) == {'foo': 'bar'} - - mock_invoke.assert_called_once_with( - FunctionName='test-worker-arn', - InvocationType='Event', - Payload=expected_payload, - ) - - -def test_lambda_handler_500_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_jobs = list(range(500)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_invoke_worker.return_value = {'StatusCode': None} - - start_execution_manager.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:250]), - call('test-worker-function-arn', mock_jobs[250:500]), - ] - - -def test_lambda_handler_400_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_jobs = list(range(400)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_invoke_worker.return_value = {'StatusCode': None} - - start_execution_manager.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs[0:250]), - call('test-worker-function-arn', mock_jobs[250:400]), - ] - - -def test_lambda_handler_50_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_jobs = list(range(50)) - mock_get_jobs_waiting_for_execution.return_value = mock_jobs - - mock_invoke_worker.return_value = {'StatusCode': None} - - start_execution_manager.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - assert mock_invoke_worker.mock_calls == [ - call('test-worker-function-arn', mock_jobs), - ] - - -def test_lambda_handler_no_jobs(): - with ( - patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution, - patch('start_execution_manager.invoke_worker') as mock_invoke_worker, - patch.dict(os.environ, {'START_EXECUTION_WORKER_ARN': 'test-worker-function-arn'}, clear=True), - ): - mock_get_jobs_waiting_for_execution.return_value = [] - - start_execution_manager.lambda_handler({}, None) - - mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500) - - mock_invoke_worker.assert_not_called()