Skip to content

Commit

Permalink
BFD-2835: CCW RIF Pipeline SLO Alarms are failing to evaluate when SL…
Browse files Browse the repository at this point in the history
…Os are violated (#1906)
  • Loading branch information
malessi authored Aug 31, 2023
1 parent b245906 commit f5cbd39
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 111 deletions.
1 change: 1 addition & 0 deletions ops/terraform/services/base/values/ephemeral.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_batch_size_claims: 100
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_queue_size_multiple_claims: 10
/bfd/${env}/pipeline/ccw/nonsensitive/rif_thread_multiple_claims: 25
/bfd/${env}/pipeline/ccw/nonsensitive/slis_repeater_lambda_invoke_rate: 15 minutes
## PIPELINE RDA
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_ccw_rif_job_enabled: false # CCW Jobs are disabled by default on RDA Pipelines
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_rda_job_enabled: false
Expand Down
1 change: 1 addition & 0 deletions ops/terraform/services/base/values/prod-sbx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_batch_size_claims: 100
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_queue_size_multiple_claims: 10
/bfd/${env}/pipeline/ccw/nonsensitive/rif_thread_multiple_claims: 25
/bfd/${env}/pipeline/ccw/nonsensitive/slis_repeater_lambda_invoke_rate: 15 minutes
## PIPELINE+RDA
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_ccw_rif_job_enabled: false # CCW Jobs are disabled by default on RDA Pipelines
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_rda_grpc_inproc_server_mode: 'S3'
Expand Down
1 change: 1 addition & 0 deletions ops/terraform/services/base/values/prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_batch_size_claims: 100
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_queue_size_multiple_claims: 10
/bfd/${env}/pipeline/ccw/nonsensitive/rif_thread_multiple_claims: 25
/bfd/${env}/pipeline/ccw/nonsensitive/slis_repeater_lambda_invoke_rate: 15 minutes
## PIPELINE+RDA
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_ccw_rif_job_enabled: false # CCW Jobs are disabled by default on RDA Pipelines
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_rda_job_enabled: true
Expand Down
1 change: 1 addition & 0 deletions ops/terraform/services/base/values/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_batch_size_claims: 100
/bfd/${env}/pipeline/ccw/nonsensitive/rif_job_queue_size_multiple_claims: 10
/bfd/${env}/pipeline/ccw/nonsensitive/rif_thread_multiple_claims: 25
/bfd/${env}/pipeline/ccw/nonsensitive/slis_repeater_lambda_invoke_rate: 15 minutes
## PIPELINE+RDA
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_ccw_rif_job_enabled: false # CCW Jobs are disabled by default on RDA Pipelines
/bfd/${env}/pipeline/rda/nonsensitive/data_pipeline_rda_job_enabled: true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
data "aws_region" "current" {}

data "aws_ssm_parameters_by_path" "nonsensitive_service" {
path = "/bfd/${local.env}/${local.service}/${local.variant}/nonsensitive"
}

data "aws_s3_bucket" "etl" {
bucket = var.etl_bucket_id
}
Expand All @@ -9,25 +13,36 @@ data "aws_sns_topic" "this" {
}

data "archive_file" "lambda_src" {
for_each = local.lambdas

type = "zip"
output_path = "${path.module}/lambda_src/update_pipeline_slis.zip"
output_path = "${path.module}/lambda_src/${each.value.src}.zip"

source {
content = file("${path.module}/lambda_src/${each.value.src}.py")
filename = "${each.value.src}.py"
}

source {
content = file("${path.module}/lambda_src/__init__.py")
filename = "__init__.py"
}

source {
content = file("${path.module}/lambda_src/update_pipeline_slis.py")
filename = "update_pipeline_slis.py"
content = file("${path.module}/lambda_src/common.py")
filename = "common.py"
}

source {
content = file("${path.module}/lambda_src/backoff_retry.py")
filename = "backoff_retry.py"
}

source {
content = file("${path.module}/lambda_src/sqs.py")
filename = "sqs.py"
}

source {
content = file("${path.module}/lambda_src/cw_metrics.py")
filename = "cw_metrics.py"
Expand Down
131 changes: 101 additions & 30 deletions ops/terraform/services/pipeline/modules/bfd_pipeline_slis/iam.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
resource "aws_iam_policy" "cloudwatch_metrics" {
name = "${local.lambda_full_name}-cloudwatch-metrics"
description = "Permissions for the ${local.lambda_full_name} Lambda to put and get metric data"
for_each = local.lambdas

name = "${each.value.full_name}-cloudwatch-metrics"
description = "Permissions for the ${each.value.full_name} Lambda to put and get metric data"

# Unfortunately, neither GetMetricData nor PutMetricData support resource-level permissions, and
# only PutMetricData supports the cloudwatch:namespace condition. This is why they're both so
Expand Down Expand Up @@ -30,10 +32,10 @@ EOF
}

resource "aws_iam_policy" "s3" {
name = "${local.lambda_full_name}-s3"
name = "${local.lambdas[local.lambda_update_slis].full_name}-s3"
description = join("", [
"Permissions for the ${local.lambda_full_name} Lambda to list and get objects in the ",
"${data.aws_s3_bucket.etl.id} S3 bucket in the Done/ and Incoming/ folders"
"Permissions for the ${local.lambdas[local.lambda_update_slis].full_name} Lambda to list and ",
"get objects in the ${data.aws_s3_bucket.etl.id} S3 bucket in the Done/ and Incoming/ folders"
])

policy = <<-EOF
Expand All @@ -55,9 +57,11 @@ EOF
}

resource "aws_iam_policy" "logs" {
name = "${local.lambda_full_name}-logs"
for_each = local.lambdas

name = "${each.value.full_name}-logs"
description = join("", [
"Permissions for the ${local.lambda_full_name} Lambda to write to its corresponding CloudWatch ",
"Permissions for the ${each.value.full_name} Lambda to write to its corresponding CloudWatch ",
"Log Group and Log Stream"
])

Expand All @@ -74,7 +78,7 @@ resource "aws_iam_policy" "logs" {
"Effect": "Allow",
"Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": [
"arn:aws:logs:${local.region}:${var.account_id}:log-group:/aws/lambda/${local.lambda_full_name}:*"
"arn:aws:logs:${local.region}:${var.account_id}:log-group:/aws/lambda/${each.value.full_name}:*"
]
}
]
Expand All @@ -83,10 +87,10 @@ EOF
}

resource "aws_iam_policy" "sqs" {
name = "${local.lambda_full_name}-sqs"
name = "${local.lambdas[local.lambda_update_slis].full_name}-sqs"
description = join("", [
"Permissions for the ${local.lambda_full_name} Lambda to send and receive messages from the ",
"${aws_sqs_queue.this.name} SQS queue"
"Permissions for the ${local.lambdas[local.lambda_update_slis].full_name} Lambda to send and ",
"receive messages from the ${aws_sqs_queue.this.name} SQS queue"
])
policy = <<-EOF
{
Expand All @@ -113,30 +117,97 @@ resource "aws_iam_policy" "sqs" {
EOF
}

resource "aws_iam_role" "this" {
name = local.lambda_full_name
resource "aws_iam_role" "lambda" {
for_each = local.lambdas

name = each.value.full_name
path = "/"
description = "Role for ${local.lambda_full_name} Lambda"
description = "Role for ${each.value.full_name} Lambda"

assume_role_policy = <<-EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
]
}
EOF

force_detach_policies = true
}

resource "aws_iam_role_policy_attachment" "lambda_policies_to_roles" {
for_each = merge([for k, v in {
"${local.lambda_update_slis}" = [
aws_iam_policy.cloudwatch_metrics[local.lambda_update_slis],
aws_iam_policy.logs[local.lambda_update_slis],
aws_iam_policy.s3,
aws_iam_policy.sqs
]
"${local.lambda_repeater}" = [
aws_iam_policy.cloudwatch_metrics[local.lambda_repeater],
aws_iam_policy.logs[local.lambda_repeater],
]
} : { for policy in v : "${policy.name}" => { role = aws_iam_role.lambda[k], policy = policy } }]...)

role = each.value.role.name
policy_arn = each.value.policy.arn
}

resource "aws_iam_policy" "invoke_repeater" {
name = "${local.lambdas[local.lambda_repeater].full_name}-scheduler-assumee-allow-lambda-invoke"
description = join("", [
"Permissions for EventBridge Scheduler assumed role to invoke the ",
"${local.lambdas[local.lambda_repeater].full_name} Lambda"
])

policy = jsonencode(
{
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "lambda:InvokeFunction"
Resource = aws_lambda_function.repeater.arn
}
]
}
]
)
}
EOF

managed_policy_arns = [
aws_iam_policy.cloudwatch_metrics.arn,
aws_iam_policy.s3.arn,
aws_iam_policy.logs.arn,
aws_iam_policy.sqs.arn
]
resource "aws_iam_role" "scheduler_assume_role" {
name = "${local.lambdas[local.lambda_repeater].full_name}-scheduler-assumee"
path = "/"
description = join("", [
"Role for EventBridge Scheduler allowing permissions to invoke the ",
"${local.lambdas[local.lambda_repeater].full_name} Lambda"
])

assume_role_policy = jsonencode(
{
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "scheduler.amazonaws.com"
}
}
]
}
)

force_detach_policies = true
}

resource "aws_iam_role_policy_attachment" "invoke_repeater_policy_to_scheduler_assume_role" {
role = aws_iam_role.scheduler_assume_role.name
policy_arn = aws_iam_policy.invoke_repeater.arn
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
from dataclasses import dataclass
from enum import Enum

METRICS_NAMESPACE = os.environ.get("METRICS_NAMESPACE", "")


@dataclass
class PipelineMetricMetadata:
"""Encapsulates metadata about a given pipeline metric"""

metric_name: str
"""The name of the metric in CloudWatch Metrics, excluding namespace"""
unit: str
"""The unit of the metric. Must conform to the list of supported CloudWatch Metrics"""

def __hash__(self) -> int:
return hash(self.metric_name)


class PipelineMetric(PipelineMetricMetadata, Enum):
"""Enumeration of pipeline metrics that can be stored in CloudWatch Metrics"""

TIME_DATA_AVAILABLE = PipelineMetricMetadata("time/data-available", "Seconds")
TIME_DATA_FIRST_AVAILABLE = PipelineMetricMetadata("time/data-first-available", "Seconds")
TIME_DATA_LOADED = PipelineMetricMetadata("time/data-loaded", "Seconds")
TIME_DATA_FULLY_LOADED = PipelineMetricMetadata("time/data-fully-loaded", "Seconds")
TIME_DELTA_DATA_LOAD_TIME = PipelineMetricMetadata("time-delta/data-load-time", "Seconds")
TIME_DELTA_FULL_DATA_LOAD_TIME = PipelineMetricMetadata(
"time-delta/data-full-load-time", "Seconds"
)
TIME_DATA_FIRST_AVAILABLE_REPEATING = PipelineMetricMetadata(
"time/data-first-available-repeating", "Seconds"
)
TIME_DATA_FULLY_LOADED_REPEATING = PipelineMetricMetadata(
"time/data-fully-loaded-repeating", "Seconds"
)

def __init__(self, data: PipelineMetricMetadata):
for key in data.__annotations__.keys():
value = getattr(data, key)
setattr(self, key, value)

def __hash__(self) -> int:
return hash(self.value)

def full_name(self) -> str:
"""Returns the fully qualified name of the metric, which includes the metric namespace and
metric name
Returns:
str: The "full name" of the metric
"""
metric_metadata: PipelineMetricMetadata = self.value
return f"{METRICS_NAMESPACE}/{metric_metadata.metric_name}"
Loading

0 comments on commit f5cbd39

Please sign in to comment.