From 6a01ecd611504e732ea607c60e7eecb4121f4158 Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Thu, 14 Nov 2024 22:22:24 +1100 Subject: [PATCH 1/2] Add ora reference for tn workflows Resolves #697 Still need to run and test in dev, updated workflow that supports ora reference as input --- config/constants.ts | 7 + config/stacks/tumorNormalPipelineManager.ts | 2 + .../deploy/index.ts | 27 ++- .../add_ora_reference_py/add_ora_reference.py | 148 ++++++++++++++ .../set_tn_cwl_inputs_sfn.asl.json | 183 +++++++++++++----- 5 files changed, 320 insertions(+), 47 deletions(-) create mode 100644 lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py diff --git a/config/constants.ts b/config/constants.ts index 900b3a961..b496ecee9 100644 --- a/config/constants.ts +++ b/config/constants.ts @@ -293,6 +293,13 @@ export const dragenIcav2ReferenceUriMappingSSMParameterPath = // } // ] // ' +export const dragenIcav2OraReferenceUriSSMParameterPath = + '/icav2/umccr-prod/dragen_ora_reference_uri'; + +// Deployed under dev // FIXME +// 'icav2://reference-data/dragen-ora/v2/ora_reference_v2.tar.gz +// + export const icav2GencodeAnnotationUriMappingSSMParameterPath = '/icav2/umccr-prod/wts_qc_annotation_mapping'; diff --git a/config/stacks/tumorNormalPipelineManager.ts b/config/stacks/tumorNormalPipelineManager.ts index f69fb0d52..81383ab58 100644 --- a/config/stacks/tumorNormalPipelineManager.ts +++ b/config/stacks/tumorNormalPipelineManager.ts @@ -4,6 +4,7 @@ import { icaEventPipeStackName, icav2AccessTokenSecretName, dragenIcav2ReferenceUriMappingSSMParameterPath, + dragenIcav2OraReferenceUriSSMParameterPath, tnIcav2PipelineIdSSMParameterPath, tnIcav2PipelineManagerDynamodbTableName, tnIcav2PipelineWorkflowType, @@ -54,5 +55,6 @@ export const getTnIcav2PipelineManagerStackProps = ( /* SSM Workflow Parameters */ defaultReferenceVersion: tnDefaultReferenceVersion, referenceUriSsmPath: dragenIcav2ReferenceUriMappingSSMParameterPath, + oraReferenceUriSsmPath: dragenIcav2OraReferenceUriSSMParameterPath, }; }; diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts index a25050ef6..35295f687 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts @@ -20,6 +20,7 @@ export interface TnIcav2PipelineManagerConfig { icav2TokenSecretId: string; // "/icav2/umccr-prod/service-production-jwt-token-secret-arn" pipelineIdSsmPath: string; // List of parameters the workflow session state machine will need access to referenceUriSsmPath: string; // "/icav2/umccr-prod/reference-genome-uri" + oraReferenceUriSsmPath: string; // "/icav2/umccr-prod/ora-reference-uri-ssm-path" defaultReferenceVersion: string; /* Table to store analyis metadata */ dynamodbTableName: string; @@ -79,6 +80,11 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { props.referenceUriSsmPath, props.referenceUriSsmPath ); + const oraReferenceSsmObj = ssm.StringParameter.fromStringParameterName( + this, + props.oraReferenceUriSsmPath, + props.oraReferenceUriSsmPath + ); // Get event bus object this.eventBusObj = events.EventBus.fromEventBusName(this, 'event_bus', props.eventBusName); @@ -107,6 +113,19 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { timeout: Duration.seconds(60), } ); + const addOraReferenceLambdaObj = new PythonFunction( + this, + 'add_ora_reference_lambda_python_function', + { + entry: path.join(__dirname, '../lambdas/add_ora_reference_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'add_ora_reference.py', + handler: 'handler', + memorySize: 1024, + timeout: Duration.seconds(60), + } + ); // Specify the statemachine and replace the arn placeholders with the lambda arns defined above const configureInputsSfn = new sfn.StateMachine( @@ -124,6 +143,7 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { __table_name__: this.dynamodbTableObj.tableName, /* SSM Parameters */ __reference_version_uri_ssm_parameter_name__: referenceSsmObj.parameterName, + __ora_reference_uri_ssm_parameter_path__: oraReferenceSsmObj.parameterName, __default_reference_version__: props.defaultReferenceVersion, // We collect the reference version AND the pipeline versions /* Lambdas */ @@ -131,6 +151,8 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { convertFastqListRowsToCwlInputObjectsLambdaObj.currentVersion.functionArn, __get_boolean_parameters_lambda_function_arn__: getBooleanParametersFromEventInputLambdaObj.currentVersion.functionArn, + __add_ora_reference_lambda_function_arn__: + addOraReferenceLambdaObj.currentVersion.functionArn, }, } ); @@ -139,6 +161,7 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { [ convertFastqListRowsToCwlInputObjectsLambdaObj, getBooleanParametersFromEventInputLambdaObj, + addOraReferenceLambdaObj, ].forEach((lambdaObj) => { lambdaObj.currentVersion.grantInvoke(configureInputsSfn); }); @@ -147,7 +170,9 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { this.dynamodbTableObj.grantReadWriteData(configureInputsSfn); // Allow state machine to read ssm parameters - referenceSsmObj.grantRead(configureInputsSfn); + [referenceSsmObj, oraReferenceSsmObj].forEach((ssmObj) => { + ssmObj.grantRead(configureInputsSfn); + }); /* Part 2: Configure the lambdas and outputs step function diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py new file mode 100644 index 000000000..eb98093dc --- /dev/null +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 + +""" +Add ora reference +""" +from typing import Dict, Optional, List + + +def handler(event, context) -> Dict[str, bool]: + """ + Get the boolean parameters from the event input + :param event: + :param context: + :return: Dictionary of boolean parameters + """ + + # Collect the event data input + event_data_input: Dict = event['event_data_input'] + + # Get the fastq list rows + tumor_fastq_list_rows: Optional[List] = event_data_input.get('tumorFastqListRows', None) + normal_fastq_list_rows: Optional[List] = event_data_input.get('fastqListRows', None) + + # If tumorFastqListRows is None and fastqListRows is None, return false + if tumor_fastq_list_rows is None and normal_fastq_list_rows is None: + return { + "add_ora_step": False + } + + for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]: + if fastq_list_row_iter is not None: + # If fastqListRows is not None, return true + # Iterate over each of the fastq list rows, if one of the read1FileUri or read2FileUri end with .fastq.ora + # return true + if any( + [ + row.get('read1FileUri', '').endswith('.fastq.ora') or + row.get('read2FileUri', '').endswith('.fastq.ora') + for row in fastq_list_row_iter + ] + ): + return { + "add_ora_step": True + } + + # Got to here? Return false + return { + "add_ora_step": False + } + + +# if __name__ == "__main__": +# import json +# +# print( +# json.dumps( +# handler( +# { +# "event_data_input": { +# "tumorFastqListRows": [ +# { +# "rgid": "ATGAGGCC.CAATTAAC.2", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 2, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz" +# }, +# { +# "rgid": "ATGAGGCC.CAATTAAC.3", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 3, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz" +# } +# ], +# "fastqListRows": [ +# { +# "rgid": "GCACGGAC.TGCGAGAC.4", +# "rgsm": "L2400191", +# "rglb": "L2400191", +# "lane": 4, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.gz" +# } +# ], +# "dragenReferenceVersion": "v9-r3" +# } +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "add_ora_step": false +# # } + +# if __name__ == "__main__": +# import json +# +# print( +# json.dumps( +# handler( +# { +# "event_data_input": { +# "tumorFastqListRows": [ +# { +# "rgid": "ATGAGGCC.CAATTAAC.2", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 2, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz" +# }, +# { +# "rgid": "ATGAGGCC.CAATTAAC.3", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 3, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz" +# } +# ], +# "fastqListRows": [ +# { +# "rgid": "GCACGGAC.TGCGAGAC.4", +# "rgsm": "L2400191", +# "rglb": "L2400191", +# "lane": 4, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.ora", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.ora" +# } +# ], +# "dragenReferenceVersion": "v9-r3" +# } +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "add_ora_step": true +# # } diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json index e300e44f2..cf05fc062 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json @@ -41,7 +41,7 @@ } ], "Comment": "Fastq List Row Inputs", - "Next": "Convert Fastq List Rows To CWL Input Objects (Tumor and Normal)" + "Next": "Handle FQLR" }, { "And": [ @@ -74,58 +74,105 @@ ], "Default": "Fail" }, - "Fail": { - "Type": "Fail", - "Comment": "Not one of fastqListRows, bamInput or cramInput were defined" - }, - "Convert Fastq List Rows To CWL Input Objects (Tumor and Normal)": { + "Handle FQLR": { "Type": "Parallel", "Branches": [ { - "StartAt": "Convert Fastq List Rows to CWL Input Objects (Tumor)", + "StartAt": "Convert Fastq List Rows To CWL Input Objects (Tumor and Normal)", "States": { - "Convert Fastq List Rows to CWL Input Objects (Tumor)": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}", - "Payload": { - "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.tumorFastqListRows" - } - }, - "Retry": [ + "Convert Fastq List Rows To CWL Input Objects (Tumor and Normal)": { + "Type": "Parallel", + "Branches": [ { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException", - "States.TaskFailed" - ], - "IntervalSeconds": 60, - "MaxAttempts": 3, - "BackoffRate": 2 + "StartAt": "Convert Fastq List Rows to CWL Input Objects (Tumor)", + "States": { + "Convert Fastq List Rows to CWL Input Objects (Tumor)": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}", + "Payload": { + "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.tumorFastqListRows" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 60, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "End": true, + "ResultSelector": { + "fastq_list_rows.$": "$.Payload.fastq_list_rows" + }, + "ResultPath": "$.convert_tumor_fastq_list_rows_to_cwl_input_objects_step" + } + } + }, + { + "StartAt": "Convert Fastq List Rows to CWL Input Objects (Normal)", + "States": { + "Convert Fastq List Rows to CWL Input Objects (Normal)": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload": { + "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.fastqListRows" + }, + "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 60, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultPath": "$.convert_fastq_list_rows_to_cwl_input_objects_step", + "ResultSelector": { + "fastq_list_rows.$": "$.Payload.fastq_list_rows" + }, + "End": true + } + } } ], "End": true, "ResultSelector": { - "fastq_list_rows.$": "$.Payload.fastq_list_rows" - }, - "ResultPath": "$.convert_tumor_fastq_list_rows_to_cwl_input_objects_step" + "cwl_data_inputs": { + "tumor_fastq_list_rows.$": "$.[0].convert_tumor_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows", + "fastq_list_rows.$": "$.[1].convert_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows" + } + } } } }, { - "StartAt": "Convert Fastq List Rows to CWL Input Objects (Normal)", + "StartAt": "Add ORA Reference Bool", "States": { - "Convert Fastq List Rows to CWL Input Objects (Normal)": { + "Add ORA Reference Bool": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "Payload": { - "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.fastqListRows" + "event_data_input.$": "$.get_input_parameters_from_event_step.inputs" }, - "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}" + "FunctionName": "${__add_ora_reference_lambda_function_arn__}" }, "Retry": [ { @@ -133,31 +180,75 @@ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", - "Lambda.TooManyRequestsException", - "States.TaskFailed" + "Lambda.TooManyRequestsException" ], - "IntervalSeconds": 60, + "IntervalSeconds": 1, "MaxAttempts": 3, - "BackoffRate": 2 + "BackoffRate": 2, + "JitterStrategy": "FULL" } ], - "ResultPath": "$.convert_fastq_list_rows_to_cwl_input_objects_step", "ResultSelector": { - "fastq_list_rows.$": "$.Payload.fastq_list_rows" + "add_ora_step": "$.Payload.add_ora_step" }, - "End": true + "ResultPath": "$.add_ora_step_path", + "Next": "Need Ora Reference" + }, + "Need Ora Reference": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.add_ora_step_path.add_ora_step", + "BooleanEquals": true, + "Comment": "Needs ORA Reference Tar as Input", + "Next": "Get the ORA Reference Version" + } + ], + "Default": "Set Output to Null" + }, + "Set Output to Null": { + "Type": "Pass", + "End": true, + "Result": { + "cwl_data_inputs": {} + } + }, + "Get the ORA Reference Version": { + "Type": "Task", + "Parameters": { + "Name": "${__ora_reference_uri_ssm_parameter_path__}" + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameter", + "End": true, + "ResultSelector": { + "cwl_data_inputs": { + "ora_reference_tar": { + "class": "File", + "location.$": "$.Parameter.Value" + } + } + } } } } ], - "End": true, + "Next": "Merge Data References", "ResultSelector": { - "cwl_data_inputs": { - "tumor_fastq_list_rows.$": "$.[0].convert_tumor_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows", - "fastq_list_rows.$": "$.[1].convert_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows" - } + "fastq_list_row_cwl_data_inputs.$": "$.[0].cwl_data_inputs", + "ora_reference_data_inputs.$": "$.[1].cwl_data_inputs" + } + }, + "Merge Data References": { + "Type": "Pass", + "End": true, + "Parameters": { + "cwl_data_inputs.$": "States.JsonMerge($.fastq_list_row_cwl_data_inputs, $.ora_reference_data_inputs, false)" } }, + "Fail": { + "Type": "Fail", + "Comment": "Not one of fastqListRows, bamInput or cramInput were defined" + }, "Convert Bam Inputs to CWL Input Objects": { "Type": "Pass", "Parameters": { From 1b023c7160a986a16a11dac0cd09f0da104f95f3 Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Fri, 15 Nov 2024 15:31:50 +1100 Subject: [PATCH 2/2] Add ora step is a json path --- .../step_functions_templates/set_tn_cwl_inputs_sfn.asl.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json index cf05fc062..ef3f033a2 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json @@ -189,7 +189,7 @@ } ], "ResultSelector": { - "add_ora_step": "$.Payload.add_ora_step" + "add_ora_step.$": "$.Payload.add_ora_step" }, "ResultPath": "$.add_ora_step_path", "Next": "Need Ora Reference"