Skip to content

Commit

Permalink
feat(nowcasting-dev): Add satellite-consumer and airflow dag
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Feb 26, 2025
1 parent 1ccfcc1 commit 4620466
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 0 deletions.
103 changes: 103 additions & 0 deletions terraform/modules/services/airflow/dags/uk/sat-dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Dag to download and process satellite data from EUMETSAT.
Consists of two tasks made from the same ECS operator,
one for RSS data and one for Odegree data.
The 0degree data task only runs if the RSS data task fails.
"""

import os
import datetime as dt
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator, EcsRegisterTaskDefinitionOperator
from airflow.decorators import dag
from airflow.utils.trigger_rule import TriggerRule

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import slack_message_callback

env = os.getenv("ENVIRONMENT", "development")

default_dag_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": dt.timedelta(minutes=1),
"max_active_runs": 10,
"concurrency": 10,
"max_active_tasks": 10,
"execution_timeout": dt.timedelta(minutes=30),
}
default_task_args = {
"cluster": f"Nowcasting-{env}",
"task_definition": "satellite-consumer",
"launch_type": "FARGATE",
"task_concurrency": 1,
"network_configuration": {
"awsvpcConfiguration": {
"subnets": [os.getenv("ECS_SUBNET")],
"securityGroups": [os.getenv("ECS_SECURITY_GROUP")],
"assignPublicIp": "ENABLED",
},
},
"awslogs_group": "/aws/ecs/consumer/sat",
"awslogs_stream_prefix": "streaming/sat-consumer",
"awslogs_region": "eu-west-1",
}

satellite_error_message = (
)

@dag(
dag_id="uk-satellite-consumer",
description=__doc__,
schedule_interval="*/5 * * * *",
start_date=dt.datetime.now(tz=dt.timezone.utc) - dt.timedelta(hours=0.5),
catchup=False,
default_args=default_dag_args,
)
def sat_consumer_dag():

latest_only = LatestOnlyOperator(task_id="latest_only")

sat_consumer_rss = EcsRunTaskOperator(
task_id="satellite-consumer-rss",
overrides={"containerOverrides": [{
"name": "satellite-consumer",
"environment": {
"SATCONS_SATELLITE": "rss",
"SATCONS_WORKDIR": "s3://nowcasting-sat-{env}/testdata",
},
}]},
on_failure_callback=slack_message_callback((
"⚠️ The task {{ ti.task_id }} failed to collect RSS satellite image data. "
"Falling back to 15 minute 0-degree satellite data. "
"If in a scheduled period of RSS downtime then no action is required. "
"(Can check EUMETSAT RSS status "
"<https://masif.eumetsat.int/ossi/webpages/level3.html?ossi_level3_filename=seviri_rss_hr.html&ossi_level2_filename=seviri_rss.html|here>). "
)),
**default_task_args,
)

sat_consumer_odegree = EcsRunTaskOperator(
task_id="satellite-consumer-odegree",
trigger_rule=TriggerRule.ALL_FAILED,
overrides={"containerOverrides": [{
"name": "satellite-consumer",
"environment": {
"SATCONS_SATELLITE": "odegree",
"SATCONS_WORKDIR": "s3://nowcasting-sat-{env}/testdata",
},
}]},
on_failure_callback=slack_message_callback((
"⚠️ The task {{ ti.task_id }} failed to collect odegree satellite data. "
"The forecast will automatically move over to PVNET-ECMWF "
"which doesn't need satellite data. "
"Forecast quality may be impacted, but no out-of-hours support is required. "
"Please log in an incident log. "
)),
**default_task_args,
)

latest_only >> sat_consumer_rss >> sat_consumer_odegree

sat_consumer_dag()

35 changes: 35 additions & 0 deletions terraform/nowcasting/development/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The componentes ares:
3.8 - PVLive Consumer - GSP Day After
3.9 - PVLive Consumer - National Day After
3.10 - NESO Forecast Consumer
3.11 - Satellite Consumer (new)
4.1 - Metrics
4.2 - Cloudcasting app
4.3 - Forecast National XG
Expand Down Expand Up @@ -504,6 +505,40 @@ module "neso-forecast-consumer" {
container-command = []
}

# 3.11
module "satellite-consumer" {
source = "../../modules/services/ecs_task"

ecs-task_name = "satellite-consumer"
ecs-task_type = "consumer"
ecs-task_execution_role_arn = module.ecs.ecs_task_execution_role_arn
ecs-task_size = {
cpu = 1024
memory = 2048
}

aws-region = var.region
aws-environment = local.environment
s3-buckets = [
{
id : module.s3.s3-sat-bucket.id,
access_policy_arn : module.s3.iam-policy-s3-sat-write.arn
}
]
container_env_vars = [
{"name": "SATCONS_COMMAND", "value": "CONSUME"},
{"name": "SATCONS_RESCALE", "value": "true"},
]
container_secret_vars = [
{secret_policy_arn: aws_secretsmanager_secret.satellite_consumer_secret.arn,
values: ["EUMETSAT_CONSUMER_KEY", "EUMETSAT_CONSUMER_SECRET"]}
]
container-tag = var.satellite_consumer_version
container-name = "openclimatefix/satellite-consumer"
container-registry = "ghcr.io"
container-command = []
}

# 4.1
module "metrics" {
source = "../../modules/services/ecs_task"
Expand Down

0 comments on commit 4620466

Please sign in to comment.