Skip to content

Commit

Permalink
Merge pull request #2641 from ASFHyP3/combine-start-execution
Browse files Browse the repository at this point in the history
Combine start-execution manager and worker
  • Loading branch information
jtherrmann authored Mar 7, 2025
2 parents bfb2239 + 749c45c commit c6be542
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ apps/api/src/hyp3_api/api-spec/job_parameters.yml
apps/api/src/hyp3_api/job_validation_map.yml
apps/step-function.json
apps/**/*-cf.yml
apps/start-execution-worker/src/batch_params_by_job_type.json
apps/start-execution/src/batch_params_by_job_type.json
lib/dynamo/dynamo/*.json
lib/dynamo/dynamo/*.yml

Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.5.4]

### Changed
- Combined `start_execution_worker` and `start_execution_manager` into one `start_execution` AWS Lambda function.

## [9.5.3]

### Fixed
Expand Down
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ GET_FILES = ${PWD}/apps/get-files/src
HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src
SET_BATCH_OVERRIDES = ${PWD}/apps/set-batch-overrides/src
SCALE_CLUSTER = ${PWD}/apps/scale-cluster/src
START_EXECUTION_MANAGER = ${PWD}/apps/start-execution-manager/src
START_EXECUTION_WORKER = ${PWD}/apps/start-execution-worker/src
START_EXECUTION = ${PWD}/apps/start-execution/src
DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src
UPDATE_DB = ${PWD}/apps/update-db/src
UPLOAD_LOG = ${PWD}/apps/upload-log/src
DYNAMO = ${PWD}/lib/dynamo
LAMBDA_LOGGING = ${PWD}/lib/lambda_logging
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${LAMBDA_LOGGING}:${APPS}
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${LAMBDA_LOGGING}:${APPS}


build: render
python -m pip install --upgrade -r requirements-apps-api.txt -t ${API}; \
python -m pip install --upgrade -r requirements-apps-api-binary.txt --platform manylinux2014_x86_64 --only-binary=:all: -t ${API}; \
python -m pip install --upgrade -r requirements-apps-handle-batch-event.txt -t ${HANDLE_BATCH_EVENT}; \
python -m pip install --upgrade -r requirements-apps-scale-cluster.txt -t ${SCALE_CLUSTER}; \
python -m pip install --upgrade -r requirements-apps-start-execution-manager.txt -t ${START_EXECUTION_MANAGER}; \
python -m pip install --upgrade -r requirements-apps-start-execution-worker.txt -t ${START_EXECUTION_WORKER}; \
python -m pip install --upgrade -r requirements-apps-start-execution.txt -t ${START_EXECUTION}; \
python -m pip install --upgrade -r requirements-apps-disable-private-dns.txt -t ${DISABLE_PRIVATE_DNS}; \
python -m pip install --upgrade -r requirements-apps-update-db.txt -t ${UPDATE_DB}

Expand Down
2 changes: 1 addition & 1 deletion apps/render_cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def render_batch_params_by_job_type(job_types: dict) -> None:
for step in job_spec['steps']:
params.update(get_batch_param_names_for_job_step(step))
batch_params_by_job_type[job_type] = list(params)
with (Path('apps') / 'start-execution-worker' / 'src' / 'batch_params_by_job_type.json').open('w') as f:
with (Path('apps') / 'start-execution' / 'src' / 'batch_params_by_job_type.json').open('w') as f:
json.dump(batch_params_by_job_type, f, indent=2)


Expand Down
35 changes: 0 additions & 35 deletions apps/start-execution-manager/src/start_execution_manager.py

This file was deleted.

93 changes: 0 additions & 93 deletions apps/start-execution-worker/start-execution-worker-cf.yml.j2

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import boto3

import dynamo
from lambda_logging import log_exceptions, logger


Expand Down Expand Up @@ -47,8 +48,9 @@ def submit_jobs(jobs: list[dict]) -> None:


@log_exceptions
def lambda_handler(event: dict, _) -> None:
jobs = event['jobs']
logger.info(f'Submitting {len(jobs)} jobs')
submit_jobs(jobs)
logger.info('Successfully submitted jobs')
def lambda_handler(event: dict, context: Any) -> None:
pending_jobs = dynamo.jobs.get_jobs_waiting_for_execution(limit=500)
pending_jobs = dynamo.util.convert_decimals_to_numbers(pending_jobs)
logger.info(f'Got {len(pending_jobs)} pending jobs')

submit_jobs(pending_jobs)
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ AWSTemplateFormatVersion: 2010-09-09

Parameters:

JobsTable:
StepFunctionArn:
Type: String

StartExecutionWorkerArn:
JobsTable:
Type: String

{% if security_environment == 'EDC' %}
Expand Down Expand Up @@ -63,23 +63,22 @@ Resources:
Action: dynamodb:Query
Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${JobsTable}*"
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: !Ref StartExecutionWorkerArn
Action: states:StartExecution
Resource: !Ref StepFunctionArn

Lambda:
Type: AWS::Lambda::Function
Properties:
Environment:
Variables:
JOBS_TABLE_NAME: !Ref JobsTable
START_EXECUTION_WORKER_ARN: !Ref StartExecutionWorkerArn
STEP_FUNCTION_ARN: !Ref StepFunctionArn
Code: src/
Handler: start_execution_manager.lambda_handler
Handler: start_execution.lambda_handler
MemorySize: 128
Role: !GetAtt Role.Arn
Runtime: python3.13
Timeout: 10
Timeout: 55
{% if security_environment == 'EDC' %}
VpcConfig:
SecurityGroupIds:
Expand Down
15 changes: 2 additions & 13 deletions apps/workflow-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,17 @@ Resources:
{% endif %}
TemplateURL: check-processing-time/check-processing-time-cf.yml

StartExecutionManager:
StartExecution:
Type: AWS::CloudFormation::Stack
Properties:
Parameters:
JobsTable: !Ref JobsTable
StartExecutionWorkerArn: !GetAtt StartExecutionWorker.Outputs.LambdaArn
{% if security_environment == 'EDC' %}
SecurityGroupId: !Ref SecurityGroupId
SubnetIds: !Join [",", !Ref SubnetIds]
{% endif %}
TemplateURL: start-execution-manager/start-execution-manager-cf.yml

StartExecutionWorker:
Type: AWS::CloudFormation::Stack
Properties:
Parameters:
StepFunctionArn: !Ref StepFunction
{% if security_environment == 'EDC' %}
SecurityGroupId: !Ref SecurityGroupId
SubnetIds: !Join [",", !Ref SubnetIds]
{% endif %}
TemplateURL: start-execution-worker/start-execution-worker-cf.yml
TemplateURL: start-execution/start-execution-cf.yml

UploadLog:
Type: AWS::CloudFormation::Stack
Expand Down
3 changes: 1 addition & 2 deletions requirements-all.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
-r requirements-apps-api.txt
-r requirements-apps-handle-batch-event.txt
-r requirements-apps-scale-cluster.txt
-r requirements-apps-start-execution-manager.txt
-r requirements-apps-start-execution-worker.txt
-r requirements-apps-start-execution.txt
-r requirements-apps-disable-private-dns.txt
-r requirements-apps-update-db.txt
boto3==1.36.11
Expand Down
2 changes: 0 additions & 2 deletions requirements-apps-start-execution-worker.txt

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import os
from unittest.mock import call, patch

import start_execution_worker
import start_execution


def test_convert_to_string():
assert start_execution_worker.convert_to_string(1) == '1'
assert start_execution_worker.convert_to_string(True) == 'True'
assert start_execution_worker.convert_to_string([1, 2]) == '1 2'
assert start_execution_worker.convert_to_string(['abc', 'bcd']) == 'abc bcd'
assert start_execution_worker.convert_to_string('abc') == 'abc'
assert start_execution.convert_to_string(1) == '1'
assert start_execution.convert_to_string(True) == 'True'
assert start_execution.convert_to_string([1, 2]) == '1 2'
assert start_execution.convert_to_string(['abc', 'bcd']) == 'abc bcd'
assert start_execution.convert_to_string('abc') == 'abc'


def test_submit_jobs():
Expand Down Expand Up @@ -156,11 +156,11 @@ def test_submit_jobs():
)

with (
patch('start_execution_worker.STEP_FUNCTION.start_execution') as mock_start_execution,
patch('start_execution.STEP_FUNCTION.start_execution') as mock_start_execution,
patch.dict(os.environ, {'STEP_FUNCTION_ARN': 'test-state-machine-arn'}, clear=True),
patch('start_execution_worker.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type),
patch('start_execution.BATCH_PARAMS_BY_JOB_TYPE', batch_params_by_job_type),
):
start_execution_worker.submit_jobs(jobs)
start_execution.submit_jobs(jobs)

assert mock_start_execution.mock_calls == [
call(
Expand All @@ -182,7 +182,16 @@ def test_submit_jobs():


def test_lambda_handler():
with patch('start_execution_worker.submit_jobs') as mock_submit_jobs:
start_execution_worker.lambda_handler({'jobs': [1, 2, 3]}, None)
with (
patch('dynamo.jobs.get_jobs_waiting_for_execution') as mock_get_jobs_waiting_for_execution,
patch('dynamo.util.convert_decimals_to_numbers') as mock_convert_decimals_to_numbers,
patch('start_execution.submit_jobs') as mock_submit_jobs,
):
mock_get_jobs_waiting_for_execution.return_value = 'mock_jobs'
mock_convert_decimals_to_numbers.return_value = 'converted_jobs'

start_execution.lambda_handler({}, None)

assert mock_submit_jobs.mock_calls == [call([1, 2, 3])]
mock_get_jobs_waiting_for_execution.assert_called_once_with(limit=500)
mock_convert_decimals_to_numbers.assert_called_once_with('mock_jobs')
mock_submit_jobs.assert_called_once_with('converted_jobs')
Loading

0 comments on commit c6be542

Please sign in to comment.