diff --git a/docs/schemas/events/sequencerunmanager/SequenceRunStateChange.schema.json b/docs/schemas/events/sequencerunmanager/SequenceRunStateChange.schema.json index 8c47b1e06..402917409 100644 --- a/docs/schemas/events/sequencerunmanager/SequenceRunStateChange.schema.json +++ b/docs/schemas/events/sequencerunmanager/SequenceRunStateChange.schema.json @@ -58,7 +58,7 @@ ], "properties": { "id": { - "type": "integer" + "type": "string" }, "instrumentRunId": { "type": "string" @@ -96,4 +96,4 @@ } } } -} +} \ No newline at end of file diff --git a/docs/schemas/events/sequencerunmanager/example/SRSC__started.json b/docs/schemas/events/sequencerunmanager/example/SRSC__started.json index 076e9af39..3fa1973e6 100644 --- a/docs/schemas/events/sequencerunmanager/example/SRSC__started.json +++ b/docs/schemas/events/sequencerunmanager/example/SRSC__started.json @@ -8,7 +8,7 @@ "region": "ap-southeast-2", "resources": [], "detail": { - "id": 1, + "id": "seq.0123456ABCDEFG", "instrumentRunId": "540424_A01001_0193_BBBBBBDRX5", "runVolumeName": "bssh.abcdefgh1234567899fa94fe79523959", "runFolderPath": "/Runs/540424_A01001_0193_BBBBBBDRX5_r.88888888ASU26EYuyeJHew", @@ -18,4 +18,4 @@ "endTime": null, "status": "STARTED" } -} +} \ No newline at end of file diff --git a/docs/schemas/events/sequencerunmanager/example/SRSC__succeeded.json b/docs/schemas/events/sequencerunmanager/example/SRSC__succeeded.json index 9097fd585..484c6e587 100644 --- a/docs/schemas/events/sequencerunmanager/example/SRSC__succeeded.json +++ b/docs/schemas/events/sequencerunmanager/example/SRSC__succeeded.json @@ -8,7 +8,7 @@ "region": "ap-southeast-2", "resources": [], "detail": { - "id": 1, + "id": "seq.0123456ABCDEFG", "instrumentRunId": "540424_A01001_0193_BBBBBBDRX5", "runVolumeName": "bssh.abcdefgh1234567899fa94fe79523959", "runFolderPath": "/Runs/540424_A01001_0193_BBBBBBDRX5_r.88888888ASU26EYuyeJHew", @@ -18,4 +18,4 @@ "endTime": "2024-04-27T03:45:02.113245Z", "status": "SUCCEEDED" } -} +} \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md b/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md index 8d5577fdb..084f53937 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md +++ b/lib/workload/stateless/stacks/sequence-run-manager/deploy/README.md @@ -17,10 +17,10 @@ Hot-deploy against dev: export AWS_PROFILE=umccr-dev-admin yarn cdk-stateless list -yarn cdk-stateless synth -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless diff -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack -yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/BetaDeployment/SequenceRunManagerStack +yarn cdk-stateless synth -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless diff -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless deploy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack +yarn cdk-stateless destroy -e OrcaBusStatelessPipeline/OrcaBusBeta/SequenceRunManagerStack ``` CloudFormation template: diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts index 27f5b0381..8dcadd03e 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/sequence-run-manager/deploy/stack.ts @@ -3,7 +3,8 @@ import * as cdk from 'aws-cdk-lib'; import { aws_lambda, aws_secretsmanager, Duration, Stack } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import { ISecurityGroup, IVpc, SecurityGroup, Vpc, VpcLookupOptions } from 'aws-cdk-lib/aws-ec2'; -import { EventBus, IEventBus } from 'aws-cdk-lib/aws-events'; +import { EventBus, IEventBus, Rule } from 'aws-cdk-lib/aws-events'; +import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets'; import { PythonFunction, PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha'; import { HttpLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations'; import { HttpMethod, HttpRoute, HttpRouteKey } from 'aws-cdk-lib/aws-apigatewayv2'; @@ -133,27 +134,47 @@ export class SequenceRunManagerStack extends Stack { */ const procSqsFn = this.createPythonFunction('ProcHandler', { index: 'sequence_run_manager_proc/lambdas/bssh_event.py', - handler: 'sqs_handler', + handler: 'event_handler', timeout: Duration.minutes(2), memorySize: 512, reservedConcurrentExecutions: 1, }); this.mainBus.grantPutEventsTo(procSqsFn); - // this.setupEventRule(procSqsFn); // TODO comment this out for now + this.setupEventRule(procSqsFn); // TODO comment this out for now } - // private setupEventRule(fn: aws_lambda.Function) { - // const eventRule = new Rule(this, this.id + 'EventRule', { - // ruleName: this.id + 'EventRule', - // description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', - // eventBus: this.props.mainBus, - // }); - // - // eventRule.addTarget(new aws_events_targets.LambdaFunction(fn)); - // eventRule.addEventPattern({ - // source: ['ORCHESTRATOR'], // FIXME complete source to destination event mapping - // detailType: ['SequenceRunStateChange'], - // }); - // } + private setupEventRule(fn: aws_lambda.Function) { + /** + * For sequence run manager, we are using orcabus events ( source from BSSH ENS event pipe) to trigger the lambda function. + * event rule to filter the events that we are interested in. + * event pattern: see below + * process lambda will record the event to the database, and emit the 'SequenceRunStateChange' event to the event bus. + * + */ + const eventRule = new Rule(this, this.stackName + 'EventRule', { + ruleName: this.stackName + 'EventRule', + description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', + eventBus: this.mainBus, + }); + eventRule.addEventPattern({ + detailType: ['Event from aws:sqs'], + detail: { + 'ica-event': { + // mandatory fields (gdsFolderPath, gdsVolumeName(starts with bssh), instrumentRunId, dateModified) + gdsFolderPath: [{ exists: true }], + gdsVolumeName: [{ prefix: 'bssh' }], + instrumentRunId: [{ exists: true }], + dateModified: [{ exists: true }], + + // optional fields (flowcell barcode, sample sheet name, reagent barcode, ica project id, api url, name) + acl: [{ prefix: 'wid:' }, { prefix: 'tid:' }], + id: [{ exists: true }], + status: [{ exists: true }], + }, + }, + }); + + eventRule.addTarget(new LambdaFunction(fn)); + } } diff --git a/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt b/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt index 81d833f7f..e5c259fde 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt +++ b/lib/workload/stateless/stacks/sequence-run-manager/deps/requirements.txt @@ -22,3 +22,4 @@ serverless-wsgi==3.0.5 # for sequencerunstatechange package six==1.16.0 regex==2024.9.11 +ulid-py==1.1.0 \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py index b9df3d664..38b6b64bb 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0001_initial.py @@ -1,18 +1,66 @@ -# Generated by Django 4.2.1 on 2023-06-14 07:37 +# Generated by Django 5.1.2 on 2024-12-04 11:11 +import django.core.validators +import django.db.models.deletion from django.db import migrations, models class Migration(migrations.Migration): + initial = True dependencies = [] operations = [ + migrations.CreateModel( + name="Comment", + fields=[ + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), + ("comment", models.TextField()), + ("association_id", models.CharField(max_length=255)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("created_by", models.CharField(max_length=255)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("is_deleted", models.BooleanField(default=False)), + ], + options={ + "abstract": False, + }, + ), migrations.CreateModel( name="Sequence", fields=[ - ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), ("instrument_run_id", models.CharField(max_length=255, unique=True)), ("run_volume_name", models.TextField()), ("run_folder_path", models.TextField()), @@ -21,10 +69,10 @@ class Migration(migrations.Migration): "status", models.CharField( choices=[ - ("started", "Started"), - ("failed", "Failed"), - ("succeeded", "Succeeded"), - ("aborted", "Aborted"), + ("STARTED", "Started"), + ("FAILED", "Failed"), + ("SUCCEEDED", "Succeeded"), + ("ABORTED", "Aborted"), ], max_length=255, ), @@ -51,6 +99,60 @@ class Migration(migrations.Migration): "sequence_run_name", models.CharField(blank=True, max_length=255, null=True), ), + ("v1pre3_id", models.CharField(blank=True, max_length=255, null=True)), + ( + "ica_project_id", + models.CharField(blank=True, max_length=255, null=True), + ), + ("api_url", models.TextField(blank=True, null=True)), + ], + options={ + "constraints": [ + models.CheckConstraint( + condition=models.Q( + ("run_folder_path__isnull", False), + models.Q( + ("api_url__isnull", False), + ("ica_project_id__isnull", False), + ("v1pre3_id__isnull", False), + ), + _connector="OR", + ), + name="check_run_folder_path_or_bssh_keys_not_null", + ) + ], + }, + ), + migrations.CreateModel( + name="State", + fields=[ + ( + "orcabus_id", + models.CharField( + editable=False, + primary_key=True, + serialize=False, + unique=True, + validators=[ + django.core.validators.RegexValidator( + code="invalid_orcabus_id", + message="ULID is expected to be 26 characters long", + regex="^[\\w]{26}$", + ) + ], + ), + ), + ("status", models.CharField(max_length=255)), + ("timestamp", models.DateTimeField()), + ("comment", models.CharField(blank=True, max_length=255, null=True)), + ( + "sequence", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="states", + to="sequence_run_manager.sequence", + ), + ), ], options={ "abstract": False, diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py deleted file mode 100644 index 68dec7401..000000000 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/migrations/0002_alter_sequence_status.py +++ /dev/null @@ -1,26 +0,0 @@ -# Generated by Django 5.0.5 on 2024-06-12 01:28 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ("sequence_run_manager", "0001_initial"), - ] - - operations = [ - migrations.AlterField( - model_name="sequence", - name="status", - field=models.CharField( - choices=[ - ("STARTED", "Started"), - ("FAILED", "Failed"), - ("SUCCEEDED", "Succeeded"), - ("ABORTED", "Aborted"), - ], - max_length=255, - ), - ), - ] diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py index 4660a2f00..5c3a4778a 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/__init__.py @@ -1,3 +1,5 @@ # https://docs.djangoproject.com/en/4.1/topics/db/models/#organizing-models-in-a-package -from .sequence import Sequence +from .sequence import Sequence, SequenceStatus +from .comment import Comment +from .state import State diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py index f5e6aafb2..276c1d338 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/base.py @@ -1,9 +1,11 @@ import logging import operator +import ulid from functools import reduce from typing import List from django.core.exceptions import FieldError +from django.core.validators import RegexValidator from django.db import models from django.db.models import ( Q, @@ -23,6 +25,12 @@ logger = logging.getLogger(__name__) +orcabus_id_validator = RegexValidator( + regex=r'^[\w]{26}$', + message='ULID is expected to be 26 characters long', + code='invalid_orcabus_id' + ) + class OrcaBusBaseManager(models.Manager): @staticmethod @@ -78,6 +86,32 @@ def exclude_params(params): class OrcaBusBaseModel(models.Model): class Meta: abstract = True + + orcabus_id_prefix = None + + orcabus_id = models.CharField( + primary_key=True, + unique=True, + editable=False, + blank=False, + null=False, + validators=[orcabus_id_validator] + ) + + def save(self, *args, **kwargs): + # handle the OrcaBus ID + if not self.orcabus_id: + # if no OrcaBus ID was provided, then generate one + self.orcabus_id = ulid.new().str + else: + # check provided OrcaBus ID + if len(self.orcabus_id) > 26: + # assume the OrcaBus ID carries the prefix + # we strip it off and continue to the validation + l = len(self.orcabus_id_prefix) + self.orcabus_id = str(self.orcabus_id)[l:] + self.full_clean() # make sure we are validating the inputs (especially the OrcaBus ID) + return super(OrcaBusBaseModel, self).save(*args, **kwargs) @classmethod def get_fields(cls): diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py new file mode 100644 index 000000000..5e7963d2a --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/comment.py @@ -0,0 +1,27 @@ +import logging + +from django.db import models + +from sequence_run_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager + +logger = logging.getLogger(__name__) + +class CommentManager(OrcaBusBaseManager): + pass + + +class Comment(OrcaBusBaseModel): + # primary key + orcabus_id_prefix = 'cmt.' + + comment = models.TextField(null=False, blank=False) + association_id = models.CharField(max_length=255, null=False, blank=False) # comment association object id + created_at = models.DateTimeField(auto_now_add=True) + created_by = models.CharField(max_length=255, null=False, blank=False) + updated_at = models.DateTimeField(auto_now=True) + is_deleted = models.BooleanField(default=False) + + objects = CommentManager() + + def __str__(self): + return f"ID: {self.orcabus_id}, comment: {self.comment}, from {self.created_by}, for {self.association_id}" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py index ba2c470fa..7c5bdb973 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/sequence.py @@ -30,7 +30,8 @@ def from_value(cls, value): def from_seq_run_status(cls, value): """ See Run Status - https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm + https://help.basespace.illumina.com/automate/statuses + https://support.illumina.com/help/BaseSpace_Sequence_Hub/Source/Informatics/BS/Statuses_swBS.htm (deprecated) Note that we don't necessary support all these statuses. In the following check, those values come from observed values from our BSSH run events. @@ -61,7 +62,15 @@ def get_by_keyword(self, **kwargs) -> QuerySet: class Sequence(OrcaBusBaseModel): # primary key - id = models.BigAutoField(primary_key=True) + orcabus_id_prefix = 'seq.' + + # must have (run_folder_path) or (v1pre3_id and ica_project_id and api_url) + # NOTE: we use this to retrieve further details for icav2 bssh event + # for reference: https://github.com/umccr/orcabus/pull/748#issuecomment-2516246960 + class Meta: + constraints = [ + models.CheckConstraint(check=models.Q(run_folder_path__isnull=False) | models.Q(v1pre3_id__isnull=False, ica_project_id__isnull=False, api_url__isnull=False), name='check_run_folder_path_or_bssh_keys_not_null') + ] # mandatory non-nullable base fields instrument_run_id = models.CharField( @@ -95,6 +104,9 @@ class Sequence(OrcaBusBaseModel): max_length=255, null=True, blank=True ) # legacy `name` + v1pre3_id = models.CharField(max_length=255, null=True, blank=True) + ica_project_id = models.CharField(max_length=255, null=True, blank=True) + api_url = models.TextField(null=True, blank=True) # run_config = models.JSONField(null=True, blank=True) # TODO could be it's own model # sample_sheet_config = models.JSONField(null=True, blank=True) # TODO could be it's own model @@ -102,7 +114,7 @@ class Sequence(OrcaBusBaseModel): def __str__(self): return ( - f"ID '{self.id}', " + f"ID '{self.orcabus_id}', " f"Sequence Run ID '{self.sequence_run_id}', " f"Sequence Run Name '{self.sequence_run_name}', " f"Run Data URI '{self.run_data_uri}', " diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py new file mode 100644 index 000000000..2beb3fe16 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/models/state.py @@ -0,0 +1,26 @@ +import logging + +from django.db import models + +from sequence_run_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager +from sequence_run_manager.models.sequence import Sequence + +logger = logging.getLogger(__name__) + +class StateManager(OrcaBusBaseManager): + pass + + +class State(OrcaBusBaseModel): + orcabus_id_prefix = 'sqs.' + + status = models.CharField(max_length=255, null=False, blank=False) + timestamp = models.DateTimeField() + comment = models.CharField(max_length=255, null=True, blank=True) + + sequence = models.ForeignKey(Sequence, on_delete=models.CASCADE, related_name='states') + + objects = StateManager() + + def __str__(self): + return f"ID: {self.orcabus_id}, status: {self.status}, for {self.sequence}" diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py deleted file mode 100644 index fffbd48ad..000000000 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers.py +++ /dev/null @@ -1,9 +0,0 @@ -from rest_framework import serializers - -from sequence_run_manager.models import Sequence - - -class SequenceSerializer(serializers.ModelSerializer): - class Meta: - model = Sequence - fields = "__all__" diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/__init__.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py new file mode 100644 index 000000000..2b57a52a5 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/base.py @@ -0,0 +1,39 @@ +import re +from rest_framework import serializers + + +def to_camel_case(snake_str): + components = re.split(r'[_\-\s]', snake_str) + return components[0].lower() + ''.join(x.title() for x in components[1:]) + + +class SerializersBase(serializers.ModelSerializer): + prefix = '' + + def __init__(self, *args, camel_case_data=False, **kwargs): + super().__init__(*args, **kwargs) + self.use_camel_case = camel_case_data + + def to_representation(self, instance): + representation = super().to_representation(instance) + representation['orcabus_id'] = self.prefix + str(representation['orcabus_id']) + + if self.use_camel_case: + return {to_camel_case(key): value for key, value in representation.items()} + return representation + + +class OptionalFieldsMixin: + def make_fields_optional(self): + # Make all fields optional + for field in self.fields.values(): + field.required = False + + # If the fields are CharField, you might also want to allow them to be blank + for field_name, field in self.fields.items(): + if isinstance(field, serializers.CharField): + field.allow_blank = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.make_fields_optional() diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py new file mode 100644 index 000000000..d1f68eec9 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/comment.py @@ -0,0 +1,13 @@ +from rest_framework import serializers + +from sequence_run_manager.models import Comment +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + +class CommentBaseSerializer(SerializersBase): + orcabus_id_prefix = Comment.orcabus_id_prefix + +class CommentSerializer(CommentBaseSerializer): + class Meta: + model = Comment + fields = "__all__" + diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py new file mode 100644 index 000000000..d87dfca5f --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/sequence.py @@ -0,0 +1,26 @@ +from rest_framework import serializers + +from sequence_run_manager.models import Sequence +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + + +class SequenceBaseSerializer(SerializersBase): + orcabus_id_prefix = Sequence.orcabus_id_prefix + + +class SequenceListParamSerializer(OptionalFieldsMixin, SequenceBaseSerializer): + class Meta: + model = Sequence + fields = "__all__" + +class SequenceMinSerializer(SequenceBaseSerializer): + class Meta: + model = Sequence + fields = ["orcabus_id", "instrument_run_id", "start_time", "end_time", "status"] + +class SequenceSerializer(SequenceBaseSerializer): + class Meta: + model = Sequence + fields = "__all__" + + \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py new file mode 100644 index 000000000..ce9aef77b --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/serializers/state.py @@ -0,0 +1,17 @@ +from rest_framework import serializers + +from sequence_run_manager.models import State, Sequence +from sequence_run_manager.serializers.base import SerializersBase, OptionalFieldsMixin + +class StateBaseSerializer(SerializersBase): + orcabus_id_prefix = State.orcabus_id_prefix + +class StateSerializer(StateBaseSerializer): + class Meta: + model = State + fields = "__all__" + + def to_representation(self, instance): + representation = super().to_representation(instance) + representation['sequence'] = Sequence.orcabus_id_prefix + representation['sequence'] + return representation diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/tests/test_viewsets.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/tests/test_viewsets.py index 497ec154e..26298b441 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/tests/test_viewsets.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/tests/test_viewsets.py @@ -3,7 +3,7 @@ from django.test import TestCase from django.utils.timezone import now -from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.models.sequence import Sequence, SequenceStatus from sequence_run_manager.urls.base import api_base logger = logging.getLogger() @@ -19,7 +19,7 @@ def setUp(self): run_volume_name="gds_name", run_folder_path="/to/gds/folder/path", run_data_uri="gds://gds_name/to/gds/folder/path", - status="Complete", + status=SequenceStatus.from_seq_run_status("Complete"), start_time=now(), sample_sheet_name="SampleSheet.csv", sequence_run_id="r.AAAAAA", diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py index f0dd4cc77..924d83dac 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/urls/base.py @@ -2,6 +2,8 @@ from sequence_run_manager.routers import OptionalSlashDefaultRouter from sequence_run_manager.viewsets.sequence import SequenceViewSet +from sequence_run_manager.viewsets.state import StateViewSet +from sequence_run_manager.viewsets.comment import CommentViewSet from sequence_run_manager.settings.base import API_VERSION api_namespace = "api" @@ -11,8 +13,10 @@ router = OptionalSlashDefaultRouter() router.register(r"sequence", SequenceViewSet, basename="sequence") +router.register("sequence/(?P[^/.]+)/comment", CommentViewSet, basename="sequence-comment") +router.register("sequence/(?P[^/.]+)/state", StateViewSet, basename="sequence-states") + urlpatterns = [ - # path("iam/", include(router.urls)), path(f"{api_base}", include(router.urls)), ] diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py new file mode 100644 index 000000000..701a96685 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/base.py @@ -0,0 +1,45 @@ +from abc import ABC +from rest_framework import filters +from django.shortcuts import get_object_or_404 +from sequence_run_manager.pagination import StandardResultsSetPagination +from rest_framework.response import Response +from rest_framework.viewsets import ReadOnlyModelViewSet + + +class BaseViewSet(ReadOnlyModelViewSet, ABC): + lookup_value_regex = "[^/]+" # This is to allow for special characters in the URL + orcabus_id_prefix = '' + ordering_fields = "__all__" + ordering = ["-orcabus_id"] + pagination_class = StandardResultsSetPagination + filter_backends = [filters.OrderingFilter, filters.SearchFilter] + + def retrieve(self, request, *args, **kwargs): + """ + Since we have custom orcabus_id prefix for each model, we need to remove the prefix before retrieving it. + """ + pk = self.kwargs.get('pk') + if pk and pk.startswith(self.orcabus_id_prefix): + pk = pk[len(self.orcabus_id_prefix):] + + obj = get_object_or_404(self.get_queryset(), pk=pk) + serializer = self.serializer_class(obj) + return Response(serializer.data) + + def get_query_params(self): + """ + Sanitize query params if needed + e.g. remove prefixes for each orcabus_id + """ + query_params = self.request.query_params.copy() + orcabus_id = query_params.getlist("orcabus_id", None) + if orcabus_id: + id_list = [] + for key in orcabus_id: + if key.startswith(self.orcabus_id_prefix): + id_list.append(key[len(self.orcabus_id_prefix):]) + else: + id_list.append(key) + query_params.setlist('orcabus_id', id_list) + + return query_params diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py new file mode 100644 index 000000000..8cef5277e --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/comment.py @@ -0,0 +1,87 @@ +from rest_framework import mixins +from rest_framework.viewsets import GenericViewSet +from rest_framework import status +from rest_framework.response import Response +from rest_framework.exceptions import PermissionDenied +from rest_framework.decorators import action + +from sequence_run_manager.models.comment import Comment +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.serializers.comment import CommentSerializer + +class CommentViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.ListModelMixin, GenericViewSet): + serializer_class = CommentSerializer + search_fields = Comment.get_base_fields() + orcabus_id_prefix = Comment.orcabus_id_prefix + http_method_names = ['get', 'post', 'patch', 'delete'] + pagination_class = None + + def get_queryset(self): + return Comment.objects.filter( + association_id=self.kwargs["orcabus_id"], + is_deleted=False + ) + + def perform_create(self, serializer): + serializer.save(association_id=self.kwargs["orcabus_id"]) + + def create(self, request, *args, **kwargs): + seq_orcabus_id = self.kwargs["orcabus_id"] + + # Check if the SequenceRun exists + try: + Sequence.objects.get(orcabus_id=seq_orcabus_id) + except Sequence.DoesNotExist: + return Response({"detail": "SequenceRun not found."}, status=status.HTTP_404_NOT_FOUND) + + # Check if created_by and comment are provided + if not request.data.get('created_by') or not request.data.get('comment'): + return Response({"detail": "created_by and comment are required."}, status=status.HTTP_400_BAD_REQUEST) + + # Add workflow_run_id to the request data + mutable_data = request.data.copy() + mutable_data['association_id'] = seq_orcabus_id + + serializer = self.get_serializer(data=mutable_data) + serializer.is_valid(raise_exception=True) + self.perform_create(serializer) + headers = self.get_success_headers(serializer.data) + return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) + + def perform_create(self, serializer): + serializer.save() # Assuming you're using email as the user identifier + + def update(self, request, *args, **kwargs): + partial = kwargs.pop('partial', False) + instance = self.get_object() + + # Check if the user updating the comment is the same as the one who created it + if instance.created_by != request.data.get('created_by'): + raise PermissionDenied("You don't have permission to update this comment.") + + # Ensure only the comment field can be updated + if set(request.data.keys()) - {'comment', 'created_by'}: + return Response({"detail": "Only the comment field can be updated."}, + status=status.HTTP_400_BAD_REQUEST) + + serializer = self.get_serializer(instance, data=request.data, partial=partial) + serializer.is_valid(raise_exception=True) + self.perform_update(serializer) + headers = self.get_success_headers(serializer.data) + return Response(serializer.data, status=status.HTTP_200_OK, headers=headers) + + def perform_update(self, serializer): + serializer.save() + + @action(detail=True, methods=['delete']) + def soft_delete(self, request, *args, **kwargs): + instance = self.get_object() + + # Check if the user deleting the comment is the same as the one who created it + if instance.created_by != request.data.get('created_by'): + raise PermissionDenied("You don't have permission to delete this comment.") + + instance.is_deleted = True + instance.save() + + return Response({"detail": "Comment successfully marked as deleted."}, status=status.HTTP_204_NO_CONTENT) \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py index 736bcfc95..6041ae7f6 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/sequence.py @@ -1,18 +1,21 @@ -from rest_framework import filters -from rest_framework.viewsets import ReadOnlyModelViewSet -from sequence_run_manager.models.sequence import Sequence -from sequence_run_manager.pagination import StandardResultsSetPagination -from sequence_run_manager.serializers import SequenceSerializer +from drf_spectacular.utils import extend_schema +from sequence_run_manager.viewsets.base import BaseViewSet +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.serializers.sequence import SequenceSerializer, SequenceListParamSerializer, SequenceMinSerializer -class SequenceViewSet(ReadOnlyModelViewSet): +class SequenceViewSet(BaseViewSet): serializer_class = SequenceSerializer - pagination_class = StandardResultsSetPagination - filter_backends = [filters.OrderingFilter, filters.SearchFilter] - ordering_fields = "__all__" - ordering = ["-id"] search_fields = Sequence.get_base_fields() + orcabus_id_prefix = Sequence.orcabus_id_prefix def get_queryset(self): return Sequence.objects.get_by_keyword(**self.request.query_params) + + @extend_schema(parameters=[ + SequenceListParamSerializer + ]) + def list(self, request, *args, **kwargs): + self.serializer_class = SequenceMinSerializer + return super().list(request, *args, **kwargs) diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py new file mode 100644 index 000000000..3ec62824b --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager/viewsets/state.py @@ -0,0 +1,16 @@ +from rest_framework.viewsets import GenericViewSet +from rest_framework import mixins + +from sequence_run_manager.models.state import State +from sequence_run_manager.serializers.state import StateSerializer + + +class StateViewSet(mixins.ListModelMixin, GenericViewSet): + serializer_class = StateSerializer + search_fields = State.get_base_fields() + orcabus_id_prefix = State.orcabus_id_prefix + pagination_class = None + + def get_queryset(self): + return State.objects.filter(sequence=self.kwargs["orcabus_id"]) + diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequence.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequence.py index 9b1840c5c..a1c7a4930 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequence.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequence.py @@ -49,7 +49,7 @@ def to_event(self) -> SequenceRunStateChange: raise SequenceRuleError("Sequence status is null or not loaded yet") return SequenceRunStateChange( - id=self.sequence.id, + id=Sequence.orcabus_id_prefix + self.sequence.orcabus_id, instrumentRunId=self.sequence.instrument_run_id, runVolumeName=self.sequence.run_volume_name, runFolderPath=self.sequence.run_folder_path, diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/AWSEvent.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/AWSEvent.py index ef33f45f2..6b64d30d2 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/AWSEvent.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/AWSEvent.py @@ -19,8 +19,8 @@ class AWSEvent(object): } _attribute_map = { - 'detail': 'detail', 'account': 'account', + 'detail': 'detail', 'detail_type': 'detail-type', 'id': 'id', 'region': 'region', @@ -30,10 +30,9 @@ class AWSEvent(object): 'version': 'version' } - def __init__(self, detail=None, account=None, detail_type=None, id=None, region=None, resources=None, source=None, - time=None, version=None): # noqa: E501 - self._detail = None + def __init__(self, account=None, detail=None, detail_type=None, id=None, region=None, resources=None, source=None, time=None, version=None): # noqa: E501 self._account = None + self._detail = None self._detail_type = None self._id = None self._region = None @@ -42,8 +41,8 @@ def __init__(self, detail=None, account=None, detail_type=None, id=None, region= self._time = None self._version = None self.discriminator = None - self.detail = detail self.account = account + self.detail = detail self.detail_type = detail_type self.id = id self.region = region @@ -52,15 +51,6 @@ def __init__(self, detail=None, account=None, detail_type=None, id=None, region= self.time = time self.version = version - @property - def detail(self): - - return self._detail - - @detail.setter - def detail(self, detail): - - self._detail = detail @property def account(self): @@ -70,8 +60,22 @@ def account(self): @account.setter def account(self, account): + self._account = account + + @property + def detail(self): + + return self._detail + + @detail.setter + def detail(self, detail): + + + self._detail = detail + + @property def detail_type(self): @@ -80,8 +84,10 @@ def detail_type(self): @detail_type.setter def detail_type(self, detail_type): + self._detail_type = detail_type + @property def id(self): @@ -90,8 +96,10 @@ def id(self): @id.setter def id(self, id): + self._id = id + @property def region(self): @@ -100,8 +108,10 @@ def region(self): @region.setter def region(self, region): + self._region = region + @property def resources(self): @@ -110,8 +120,10 @@ def resources(self): @resources.setter def resources(self, resources): + self._resources = resources + @property def source(self): @@ -120,8 +132,10 @@ def source(self): @source.setter def source(self, source): + self._source = source + @property def time(self): @@ -130,8 +144,10 @@ def time(self): @time.setter def time(self, time): + self._time = time + @property def version(self): @@ -140,6 +156,7 @@ def version(self): @version.setter def version(self, version): + self._version = version def to_dict(self): @@ -182,3 +199,4 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/SequenceRunStateChange.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/SequenceRunStateChange.py index b04e582e7..80c7df329 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/SequenceRunStateChange.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/domain/sequencerunstatechange/SequenceRunStateChange.py @@ -7,51 +7,63 @@ class SequenceRunStateChange(object): _types = { - 'id': 'int', + 'endTime': 'Object', + 'id': 'str', 'instrumentRunId': 'str', - 'runVolumeName': 'str', - 'runFolderPath': 'str', 'runDataUri': 'str', + 'runFolderPath': 'str', + 'runVolumeName': 'str', 'sampleSheetName': 'str', 'startTime': 'datetime', - 'endTime': 'datetime', 'status': 'str' } _attribute_map = { + 'endTime': 'endTime', 'id': 'id', 'instrumentRunId': 'instrumentRunId', - 'runVolumeName': 'runVolumeName', - 'runFolderPath': 'runFolderPath', 'runDataUri': 'runDataUri', + 'runFolderPath': 'runFolderPath', + 'runVolumeName': 'runVolumeName', 'sampleSheetName': 'sampleSheetName', 'startTime': 'startTime', - 'endTime': 'endTime', 'status': 'status' } - def __init__(self, id=None, instrumentRunId=None, runVolumeName=None, runFolderPath=None, runDataUri=None, - sampleSheetName=None, startTime=None, endTime=None, status=None): # noqa: E501 + def __init__(self, endTime=None, id=None, instrumentRunId=None, runDataUri=None, runFolderPath=None, runVolumeName=None, sampleSheetName=None, startTime=None, status=None): # noqa: E501 + self._endTime = None self._id = None self._instrumentRunId = None - self._runVolumeName = None - self._runFolderPath = None self._runDataUri = None + self._runFolderPath = None + self._runVolumeName = None self._sampleSheetName = None self._startTime = None - self._endTime = None self._status = None self.discriminator = None + self.endTime = endTime self.id = id self.instrumentRunId = instrumentRunId - self.runVolumeName = runVolumeName - self.runFolderPath = runFolderPath self.runDataUri = runDataUri + self.runFolderPath = runFolderPath + self.runVolumeName = runVolumeName self.sampleSheetName = sampleSheetName self.startTime = startTime - self.endTime = endTime self.status = status + + @property + def endTime(self): + + return self._endTime + + @endTime.setter + def endTime(self, endTime): + + + self._endTime = endTime + + @property def id(self): @@ -60,8 +72,10 @@ def id(self): @id.setter def id(self, id): + self._id = id + @property def instrumentRunId(self): @@ -70,17 +84,21 @@ def instrumentRunId(self): @instrumentRunId.setter def instrumentRunId(self, instrumentRunId): + self._instrumentRunId = instrumentRunId + @property - def runVolumeName(self): + def runDataUri(self): - return self._runVolumeName + return self._runDataUri - @runVolumeName.setter - def runVolumeName(self, runVolumeName): + @runDataUri.setter + def runDataUri(self, runDataUri): + + + self._runDataUri = runDataUri - self._runVolumeName = runVolumeName @property def runFolderPath(self): @@ -90,17 +108,21 @@ def runFolderPath(self): @runFolderPath.setter def runFolderPath(self, runFolderPath): + self._runFolderPath = runFolderPath + @property - def runDataUri(self): + def runVolumeName(self): - return self._runDataUri + return self._runVolumeName - @runDataUri.setter - def runDataUri(self, runDataUri): + @runVolumeName.setter + def runVolumeName(self, runVolumeName): + + + self._runVolumeName = runVolumeName - self._runDataUri = runDataUri @property def sampleSheetName(self): @@ -110,8 +132,10 @@ def sampleSheetName(self): @sampleSheetName.setter def sampleSheetName(self, sampleSheetName): + self._sampleSheetName = sampleSheetName + @property def startTime(self): @@ -120,17 +144,9 @@ def startTime(self): @startTime.setter def startTime(self, startTime): - self._startTime = startTime - @property - def endTime(self): - - return self._endTime - - @endTime.setter - def endTime(self, endTime): + self._startTime = startTime - self._endTime = endTime @property def status(self): @@ -140,6 +156,7 @@ def status(self): @status.setter def status(self, status): + self._status = status def to_dict(self): @@ -182,3 +199,4 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py index dbd5063f1..401d38659 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/lambdas/bssh_event.py @@ -13,37 +13,69 @@ SequenceRule, SequenceRuleError, ) -from sequence_run_manager_proc.services import sequence_srv +from sequence_run_manager_proc.services import sequence_srv, sequence_state_srv from libumccr import libjson from libumccr.aws import libeb -from libica.app import ENSEventType +# from libica.app import ENSEventType logger = logging.getLogger() logger.setLevel(logging.INFO) -IMPLEMENTED_ENS_TYPES = [ - ENSEventType.BSSH_RUNS.value, -] +# IMPLEMENTED_ENS_TYPES = [ +# ENSEventType.BSSH_RUNS.value, +# ] -PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"] +# PRODUCED_BY_BSSH = ["BaseSpaceSequenceHub"] -def sqs_handler(event, context): +def event_handler(event, context): """event payload dict Here is how to generate an example event. See README for more. python manage.py generate_mock_bssh_event | jq + + example event: + { + "version": "0", + "id": f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": { + "gdsFolderPath": "", + "gdsVolumeName": "bssh.123456789fabcdefghijkl", + "v1pre3Id": "444555555555", + "dateModified": "2024-11-02T21:58:13.7451620Z", + "acl": [ + "wid:12345678-debe-3f9f-8b92-21244f46822c", + "tid:Yxmm......" + ], + "flowcellBarcode": "HVJJJJJJ", + "icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7", + "sampleSheetName": "SampleSheet.V2.134567.csv", + "apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.4Wz-ABCDEFGHIJKLM-A", + "name": "222222_A01052_1234_BHVJJJJJJ", + "id": "r.4Wz-ABCDEFGHIJKLMN-A", + "instrumentRunId": "222222_A01052_1234_BHVJJJJJJ", + "status": "PendingAnalysis" + } + } + } - This Lambda is to be subscribed to SQS for BSSH event through ICA v1 ENS - https://illumina.gitbook.io/ica-v1/events/e-deliverytargets + This Lambda is to be subscribed to Orcabus Eventbridge rule for BSSH event through ICA v2 sqs event pipe + https://help.ica.illumina.com/project/p-notifications#delivery-targets + https://illumina.gitbook.io/ica-v1/events/e-deliverytargets (deprecated) OrcaBus SRM BSSH Event High Level: - - through ICA v1 ENS, we subscribe to `bssh.runs` using SQS queue created at our AWS - - in our SQS queue, we hook this Lambda as event trigger and process the event - - now, when `bssh.runs` event with `statuschanged` status arrive... - - this SQS event envelope may contain multiple `Records` - - we parse these `Records`, transform and persist them into our internal OrcaBus SRM `Sequence` entity model + - through ICA v2 sqs event pipe, we subscribe to Orcabus Eventbridge with specific rule + - this Lambda is to be hooked to this Eventbridge rule to process the event + - now, when `ica-event` event with `instrumentRunId` and `statuschanged` status arrive... + - we parse these `ica-event` payload, transform and persist them into our internal OrcaBus SRM `Sequence` entity model - after persisted into database, we again transform into our internal `SequenceRunStateChange` domain event - this domain event schema is what we consented and published in our EventBus event schema registry - we then dispatch our domain events into the channel in batching manner for efficiency @@ -58,51 +90,39 @@ def sqs_handler(event, context): :param context: :return: """ + assert os.environ["EVENT_BUS_NAME"] is not None, "EVENT_BUS_NAME must be set" + logger.info("Start processing BSSH ENS event") logger.info(libjson.dumps(event)) - messages = event["Records"] event_bus_name = os.environ["EVENT_BUS_NAME"] - entries = list() - - for message in messages: - event_type = message["messageAttributes"]["type"]["stringValue"] - produced_by = message["messageAttributes"]["producedby"]["stringValue"] - - if event_type not in IMPLEMENTED_ENS_TYPES: - logger.warning(f"Skipping unsupported ENS type: {event_type}") - continue - - if produced_by not in PRODUCED_BY_BSSH: - raise ValueError(f"Unrecognised BSSH event produced_by: {produced_by}") - - if event_type == ENSEventType.BSSH_RUNS.value: - payload = {} - payload.update(libjson.loads(message["body"])) - - # Create or update Sequence record from BSSH Run event payload - sequence_domain: SequenceDomain = ( - sequence_srv.create_or_update_sequence_from_bssh_event(payload) + # Extract relevant fields from the event payload + event_details = event.get("detail", {}).get("ica-event", {}) + + # Create or update Sequence record from BSSH Run event payload + sequence_domain: SequenceDomain = ( + sequence_srv.create_or_update_sequence_from_bssh_event(event_details) + ) + entry = None + + # Detect SequenceRunStateChange + if sequence_domain.state_has_changed: + try: + SequenceRule(sequence_domain.sequence).must_not_emergency_stop() + sequence_state_srv.create_sequence_state_from_bssh_event(event_details) + entry = sequence_domain.to_put_events_request_entry( + event_bus_name=event_bus_name, ) - - # Detect SequenceRunStateChange - if sequence_domain.state_has_changed: - try: - SequenceRule(sequence_domain.sequence).must_not_emergency_stop() - entry = sequence_domain.to_put_events_request_entry( - event_bus_name=event_bus_name, - ) - entries.append(entry) - except SequenceRuleError as se: - # FIXME emit custom event for this? something to tackle later. log & skip for now - reason = f"Aborted pipeline due to {se}" - logger.warning(reason) - continue - - # Dispatch all event entries in one-go! libeb will take care of batching them up for efficiency. - if entries: - libeb.dispatch_events(entries) + + except SequenceRuleError as se: + # FIXME emit custom event for this? something to tackle later. log & skip for now + reason = f"Aborted pipeline due to {se}" + logger.warning(reason) + + # Dispatch event entry using libeb. + if entry: + libeb.emit_event(entry) resp_msg = { "message": f"BSSH ENS event processing complete", diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_srv.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_srv.py index a219fe19b..2ec5fd318 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_srv.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_srv.py @@ -34,6 +34,7 @@ def create_or_update_sequence_from_bssh_event(payload: dict) -> SequenceDomain: "tid:<82_char_string>" ], "flowcellBarcode": "BARCODEEE", + "icaProjectId": "123456-6789-aaaa-bbbb-abcdefghijk", "sampleSheetName": "SampleSheet.csv", "apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.ACGTlKjDgEy099ioQOeOWg", "name": "200508_A01052_0001_BH5LY7ACGT", @@ -48,14 +49,19 @@ def create_or_update_sequence_from_bssh_event(payload: dict) -> SequenceDomain: gds_folder_path = payload["gdsFolderPath"] gds_volume_name = payload["gdsVolumeName"] date_modified = payload["dateModified"] - + + # key to retrieve further details of icav2 bssh event + ica_project_id = payload.get("icaProjectId") + api_url = payload.get("apiUrl") + v1pre3_id = payload.get("v1pre3Id") + # optional run_id = payload.get("id") name = payload.get("name") sample_sheet_name = payload.get("sampleSheetName") reagent_barcode = payload.get("reagentBarcode") flowcell_barcode = payload.get("flowcellBarcode") - + # --- start mapping to internal Sequence model # status must exist in payload @@ -92,6 +98,10 @@ def create_or_update_sequence_from_bssh_event(payload: dict) -> SequenceDomain: seq.sample_sheet_name = sample_sheet_name seq.sequence_run_id = run_id seq.sequence_run_name = name + + seq.v1pre3_id = v1pre3_id + seq.ica_project_id = ica_project_id + seq.api_url = api_url # seq.sample_sheet_config = liborca.get_samplesheet_json_from_file( # gds_volume=gds_volume_name, diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py new file mode 100644 index 000000000..c70c11020 --- /dev/null +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/services/sequence_state_srv.py @@ -0,0 +1,35 @@ +import logging + +from django.db import transaction +from django.db.models import QuerySet + +from sequence_run_manager.models.sequence import Sequence +from sequence_run_manager.models.state import State + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +@transaction.atomic +def create_sequence_state_from_bssh_event(payload: dict) -> None: + """ + Create SequenceState record from BSSH Run event payload + + { + "dateModified": "2022-06-24T05:07:53.476767Z", + "instrumentRunId": "200508_A01052_0001_BH5LY7ACGT", + "status": "PendingAnalysis" + ... + } + """ + status = payload["status"] + timestamp = payload["dateModified"] + + # get sequence by instrument_run_id + instrument_run_id = payload["instrumentRunId"] + sequence = Sequence.objects.get(instrument_run_id=instrument_run_id) + + # comment for any future usage, None by default + comment = None + + State.objects.create(status=status, timestamp=timestamp, sequence=sequence, comment=comment) \ No newline at end of file diff --git a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py index 676b6b908..54fd55e73 100644 --- a/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py +++ b/lib/workload/stateless/stacks/sequence-run-manager/sequence_run_manager_proc/tests/test_bssh_event.py @@ -9,8 +9,41 @@ from sequence_run_manager_proc.lambdas import bssh_event from sequence_run_manager_proc.tests.case import logger, SequenceRunProcUnitTestCase +""" +example event: + { + "version": "0", + "id": f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": { + "gdsFolderPath": "", + "gdsVolumeName": "bssh.123456789fabcdefghijkl", + "v1pre3Id": "444555555555", + "dateModified": "2024-11-02T21:58:13.7451620Z", + "acl": [ + "wid:12345678-debe-3f9f-8b92-21244f46822c", + "tid:Yxmm......" + ], + "flowcellBarcode": "HVJJJJJJ", + "icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7", + "sampleSheetName": "SampleSheet.V2.134567.csv", + "apiUrl": "https://api.aps2.sh.basespace.illumina.com/v2/runs/r.4Wz-ABCDEFGHIJKLM-A", + "name": "222222_A01052_1234_BHVJJJJJJ", + "id": "r.4Wz-ABCDEFGHIJKLMN-A", + "instrumentRunId": "222222_A01052_1234_BHVJJJJJJ", + "status": "PendingAnalysis" + } + } + } +""" -def sqs_bssh_event_message(): +def bssh_event_message(): mock_instrument_run_id = TestConstant.instrument_run_id.value mock_sequence_run_id = "r.ACGTlKjDgEy099ioQOeOWg" mock_sequence_run_name = mock_instrument_run_id @@ -23,8 +56,9 @@ def sqs_bssh_event_message(): "reagentBarcode": "NV9999999-ACGTA", "v1pre3Id": "666666", "dateModified": mock_date_modified, - "acl": ["wid:e4730533-d752-3601-b4b7-8d4d2f6373de"], + "acl": ["wid:e4730533-d752-3601-b4b7-8d4d2f6373de", "tid:Yxmm......"], "flowcellBarcode": "BARCODEEE", + "icaProjectId": "12345678-53ba-47a5-854d-e6b53101adb7", "sampleSheetName": "MockSampleSheet.csv", "apiUrl": f"https://api.aps2.sh.basespace.illumina.com/v2/runs/{mock_sequence_run_id}", "name": mock_sequence_run_name, @@ -33,57 +67,21 @@ def sqs_bssh_event_message(): "status": mock_status, } - ens_sqs_message_attributes = { - "action": { - "stringValue": "statuschanged", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "actiondate": { - "stringValue": "2020-05-09T22:17:10.815Z", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "type": { - "stringValue": "bssh.runs", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "producedby": { - "stringValue": "BaseSpaceSequenceHub", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "contenttype": { - "stringValue": "application/json", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", + orcabus_event_message = { + "version": "0", + "id": "f8c3de3d-1fea-4d7c-a8b0-29f63c4c3454", # Random UUID + "detail-type": "Event from aws:sqs", + "source": "Pipe IcaEventPipeConstru-xxxxxxxx", + "account": "444444444444", + "time": "2024-11-02T21:58:22Z", + "region": "ap-southeast-2", + "resources": [], + "detail": { + "ica-event": sequence_run_message, }, } - sqs_event_message = { - "Records": [ - { - "eventSource": "aws:sqs", - "body": libjson.dumps(sequence_run_message), - "messageAttributes": ens_sqs_message_attributes, - "attributes": { - "ApproximateReceiveCount": "3", - "SentTimestamp": "1589509337523", - "SenderId": "ACTGAGCTI2IGZA4XHGYYY:sender-sender", - "ApproximateFirstReceiveTimestamp": "1589509337535", - }, - "eventSourceARN": "arn:aws:sqs:ap-southeast-2:843407916570:my-queue", - } - ] - } - - return sqs_event_message + return orcabus_event_message class BSSHEventUnitTests(SequenceRunProcUnitTestCase): @@ -96,45 +94,46 @@ def tearDown(self) -> None: super(BSSHEventUnitTests, self).tearDown() del os.environ["EVENT_BUS_NAME"] - def test_unsupported_ens_event_type(self): - """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_unsupported_ens_event_type - """ - ens_sqs_message_attributes = { - "type": { - "stringValue": "tes.runs", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "producedby": { - "stringValue": "BaseSpaceSequenceHub", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - } - - sqs_event_message = { - "Records": [ - { - "eventSource": "aws:sqs", - "body": "does_not_matter", - "messageAttributes": ens_sqs_message_attributes, - } - ] - } - - result = bssh_event.sqs_handler(sqs_event_message, None) - self.assertIsNotNone(result) - - def test_sqs_handler(self): + # comment as eventbridge rule will filter out unsupported event type + # def test_unsupported_ens_event_type(self): + # """ + # python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_unsupported_ens_event_type + # """ + # ens_sqs_message_attributes = { + # "type": { + # "stringValue": "tes.runs", + # "stringListValues": [], + # "binaryListValues": [], + # "dataType": "String", + # }, + # "producedby": { + # "stringValue": "BaseSpaceSequenceHub", + # "stringListValues": [], + # "binaryListValues": [], + # "dataType": "String", + # }, + # } + + # sqs_event_message = { + # "Records": [ + # { + # "eventSource": "aws:sqs", + # "body": "does_not_matter", + # "messageAttributes": ens_sqs_message_attributes, + # } + # ] + # } + + # result = bssh_event.sqs_handler(sqs_event_message, None) + # self.assertIsNotNone(result) + + def test_event_handler(self): """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_sqs_handler + python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_event_handler """ when(libssm).get_ssm_param(...).thenReturn(libjson.dumps([])) - _ = bssh_event.sqs_handler(sqs_bssh_event_message(), None) + _ = bssh_event.event_handler(bssh_event_message(), None) qs = Sequence.objects.filter( instrument_run_id=TestConstant.instrument_run_id.value @@ -144,15 +143,15 @@ def test_sqs_handler(self): self.assertEqual(1, qs.count()) verify(libeb, times=1).eb_client(...) # event should fire - def test_sqs_handler_emergency_stop(self): + def test_event_handler_emergency_stop(self): """ - python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_sqs_handler_emergency_stop + python manage.py test sequence_run_manager_proc.tests.test_bssh_event.BSSHEventUnitTests.test_event_handler_emergency_stop """ when(libssm).get_ssm_param(...).thenReturn( libjson.dumps([TestConstant.instrument_run_id.value]) ) - _ = bssh_event.sqs_handler(sqs_bssh_event_message(), None) + _ = bssh_event.event_handler(bssh_event_message(), None) qs = Sequence.objects.filter( instrument_run_id=TestConstant.instrument_run_id.value