Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-5862] OCP on GCP managed summary #5479

Merged
merged 14 commits into from
Feb 7, 2025
Merged
141 changes: 39 additions & 102 deletions koku/masu/database/gcp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import uuid
from os import path
from typing import Any
from typing import List

from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
Expand All @@ -25,6 +24,7 @@
from koku.database import SQLScriptAtomicExecutorMixin
from masu.database import GCP_REPORT_TABLE_MAP
from masu.database import OCP_REPORT_TABLE_MAP
from masu.database.ocp_report_db_accessor import OCPReportDBAccessor
from masu.database.report_db_accessor_base import ReportDBAccessorBase
from masu.processor import is_managed_ocp_cloud_summary_enabled
from masu.processor.parquet.summary_sql_metadata import SummarySqlMetadata
Expand All @@ -36,7 +36,7 @@
from reporting.provider.gcp.models import GCPCostEntryLineItemDailySummary
from reporting.provider.gcp.models import GCPTopology
from reporting.provider.gcp.models import TRINO_LINE_ITEM_TABLE
from reporting.provider.gcp.models import TRINO_MANAGED_OCP_GCP_DAILY_TABLE
from reporting.provider.gcp.models import TRINO_OCP_GCP_DAILY_SUMMARY_TABLE
from reporting.provider.gcp.models import UI_SUMMARY_TABLES
from reporting.provider.gcp.openshift.models import UI_SUMMARY_TABLES as OCPGCP_UI_SUMMARY_TABLES
from reporting_common.models import CostUsageReportStatus
Expand Down Expand Up @@ -443,7 +443,7 @@
)
)
for day in days:
if table == TRINO_MANAGED_OCP_GCP_DAILY_TABLE:
if table == TRINO_OCP_GCP_DAILY_SUMMARY_TABLE:
column_name = "source"
else:
column_name = "gcp_source"
Expand Down Expand Up @@ -585,112 +585,49 @@
return False
return True

def verify_populate_ocp_on_cloud_daily_trino(
self, verification_tags: List[str], sql_metadata: SummarySqlMetadata
) -> Any:
"""
Verify the managed trino table population went successfully.

Args:
verification_tags: List of all cluster's matchable kv pairs
"""
params = sql_metadata.build_params(["schema", "cloud_provider_uuid", "year", "month"])
params["matched_tag_array"] = verification_tags
verify_path = "trino_sql/verify/gcp/"
cost_total_file = verify_path + "managed_ocp_on_gcp_verification.sql"
cost_total_sql = pkgutil.get_data("masu.database", cost_total_file)
cost_total_sql = cost_total_sql.decode("utf-8")
cost_total_result = self._execute_trino_multipart_sql_query(cost_total_sql, bind_params=params)
cost_total_inspect = cost_total_result[0]
if False in cost_total_inspect:
LOG.info(log_json(msg="Cost total validation failed", result=cost_total_inspect))
resource_file = verify_path + "managed_resources.sql"
resource_sql = pkgutil.get_data("masu.database", resource_file)
resource_sql = resource_sql.decode("utf-8")
resource_result = self._execute_trino_multipart_sql_query(resource_sql, bind_params=params)
if resource_result:
# Limit the resources added to the log
params["resources_failed"] = resource_result
LOG.error(log_json(msg="Verification failed", **params))
return
LOG.info(log_json(msg="Verification successful", **params))

def _create_tables_and_generate_unique_id(self, sql_metadata: SummarySqlMetadata) -> Any:
"""
The parquet generated for the gcp line item table does not
contain a unique identifer. Therefore, we create & populate
temporary tables to prevent cost duplication.
"""
params = sql_metadata.build_params(
["schema", "cloud_provider_uuid", "year", "month", "start_date", "end_date"]
)
populate_uuid_sql = pkgutil.get_data(
"masu.database", "trino_sql/gcp/openshift/managed_flow/0_populate_uuid_tmp_table.sql"
)
populate_uuid_sql = populate_uuid_sql.decode("utf-8")
LOG.info(log_json(msg="Create and populate temporary uuid manged tables", **params))
self._execute_trino_multipart_sql_query(populate_uuid_sql, bind_params=params)

def _populate_gcp_filtered_by_ocp_tmp_table(
self, ocp_provider_uuid: str, matched_tags_result: List[str], sql_metadata: SummarySqlMetadata
) -> Any:
"""Populate the managed_gcp_openshift_daily trino table for OCP on GCP.
Args:
ocp_provider_uuid (str) OCP source UUID.
matched_tags_result (list) List of kv pairs
Returns
(None)
"""
params = sql_metadata.build_params(
["schema", "cloud_provider_uuid", "start_date", "end_date", "days_tup", "year", "month"]
)
params["ocp_source_uuid"] = ocp_provider_uuid
params["matched_tag_array"] = matched_tags_result

populate_tmp_managed_sql = pkgutil.get_data(
"masu.database", "trino_sql/gcp/openshift/managed_flow/1_populate_managed_tmp_table.sql"
)
populate_tmp_managed_sql = populate_tmp_managed_sql.decode("utf-8")
LOG.info(log_json(msg="running managed OCP on GCP daily SQL", **params))
self._execute_trino_multipart_sql_query(populate_tmp_managed_sql, bind_params=params)

def _populate_final_managed_table(self, sql_metadata: SummarySqlMetadata) -> Any:
"""Populates the managed openshift on gcp table"""
params = sql_metadata.build_params(
[
"schema",
"start_date",
"year",
"month",
"days",
"end_date",
"ocp_provider_uuids",
"cloud_provider_uuid",
]
)
update_managed_sql = pkgutil.get_data(
"masu.database", "trino_sql/gcp/openshift/managed_flow/2_managed_gcp_openshift_daily.sql"
)
update_managed_sql = update_managed_sql.decode("utf-8")
LOG.info(log_json(msg="populating managed OCP on GCP data", **params))
self._execute_trino_multipart_sql_query(update_managed_sql, bind_params=params)

def populate_ocp_on_cloud_daily_trino(self, sql_metadata: SummarySqlMetadata) -> Any:
"""Populate the managed_gcp_openshift_daily trino table for OCP on GCP"""
self._create_tables_and_generate_unique_id(sql_metadata)
verification_tags = []
managed_path = "trino_sql/gcp/openshift/populate_daily_summary/"
prepare_sql, prepare_params = sql_metadata.prepare_template(
f"{managed_path}/0_prepare_daily_summary_tables.sql"
)
LOG.info(log_json(msg="Preparing tables for OCP on GCP flow", **prepare_params))
self._execute_trino_multipart_sql_query(prepare_sql, bind_params=prepare_params)
for ocp_provider_uuid in sql_metadata.ocp_provider_uuids:
matched_tags_result = self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata)
verification_tags.extend(matched_tags_result)
self._populate_gcp_filtered_by_ocp_tmp_table(ocp_provider_uuid, matched_tags_result, sql_metadata)
with OCPReportDBAccessor(sql_metadata.schema) as accessor:
report_period = accessor.report_periods_for_provider_uuid(ocp_provider_uuid, sql_metadata.start_date)
ctx = sql_metadata.build_params(["schema", "cloud_provider_uuid", "start_date", "end_date"])
ctx["ocp_provider_uuid"] = ocp_provider_uuid
if not report_period:
LOG.info(log_json(msg="no report period available", context=ctx))
return

Check warning on line 603 in koku/masu/database/gcp_report_db_accessor.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/database/gcp_report_db_accessor.py#L602-L603

Added lines #L602 - L603 were not covered by tests
report_period_id = report_period.id
self.delete_ocp_on_gcp_hive_partition_by_day(
sql_metadata.days_tup,
sql_metadata.cloud_provider_uuid,
ocp_provider_uuid,
sql_metadata.year,
sql_metadata.month,
TRINO_MANAGED_OCP_GCP_DAILY_TABLE,
TRINO_OCP_GCP_DAILY_SUMMARY_TABLE,
)
# Resource Matching
resource_matching_sql, resource_matching_params = sql_metadata.prepare_template(
f"{managed_path}/1_resource_matching_by_cluster.sql",
{
"ocp_provider_uuid": ocp_provider_uuid,
"matched_tag_array": self.find_openshift_keys_expected_values(ocp_provider_uuid, sql_metadata),
},
)
self._execute_trino_multipart_sql_query(resource_matching_sql, bind_params=resource_matching_params)
# Data Transformation for Daily Summary
daily_summary_sql, daily_summary_params = sql_metadata.prepare_template(
f"{managed_path}/2_summarize_data_by_cluster.sql",
{
**sql_metadata.build_cost_model_params(ocp_provider_uuid),
**{"ocp_provider_uuid": ocp_provider_uuid, "report_period_id": report_period_id},
},
)
LOG.info(
log_json(msg="executing data transformations for ocp on gcp daily summary", **daily_summary_params)
)
self._populate_final_managed_table(sql_metadata)
verification_tags = list(dict.fromkeys(verification_tags))
self.verify_populate_ocp_on_cloud_daily_trino(verification_tags, sql_metadata)
self._execute_trino_multipart_sql_query(daily_summary_sql, bind_params=daily_summary_params)
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ SELECT azure.row_uuid as row_uuid,
AND (ocp.resource_id IS NOT NULL AND ocp.resource_id != '')
-- Filter for Node Network Costs to tie them to the Network unattributed project
AND azure.data_transfer_direction IS NOT NULL
AND azure.data_transfer_direction != ''
AND azure.ocp_source = {{ocp_provider_uuid}}
AND azure.source = {{cloud_provider_uuid}}
AND azure.year = {{year}}
Expand Down

This file was deleted.

Loading