From ef60c1545a300ec3626711e747fde469a7db1776 Mon Sep 17 00:00:00 2001 From: Michael Skarbek Date: Wed, 17 Jul 2024 17:59:00 -0400 Subject: [PATCH] [COST-4283] OCP: set start-date based on data in file being processed (#5212) --- Makefile | 2 +- koku/masu/external/kafka_msg_handler.py | 54 ++++++++++++++----- .../parquet/parquet_report_processor.py | 28 +++++++--- .../test/external/test_kafka_msg_handler.py | 47 +++++++++++----- .../parquet/test_parquet_report_processor.py | 28 ++++++---- 5 files changed, 113 insertions(+), 46 deletions(-) diff --git a/Makefile b/Makefile index 376f069fcb..c140a26102 100644 --- a/Makefile +++ b/Makefile @@ -357,7 +357,7 @@ docker-up-min-no-build-with-listener: docker-up-min-no-build docker-up-db: $(DOCKER_COMPOSE) up -d db $(DOCKER_COMPOSE) up -d unleash - dev/scripts/setup_unleash.py + $(PYTHON) dev/scripts/setup_unleash.py docker-up-db-monitor: $(DOCKER_COMPOSE) up --build -d grafana diff --git a/koku/masu/external/kafka_msg_handler.py b/koku/masu/external/kafka_msg_handler.py index 988131be40..7eef5ebda2 100644 --- a/koku/masu/external/kafka_msg_handler.py +++ b/koku/masu/external/kafka_msg_handler.py @@ -32,6 +32,7 @@ from api.common import log_json from api.provider.models import Provider +from api.utils import DateHelper from common.queues import get_customer_queue from common.queues import OCPQueue from kafka_utils.utils import extract_from_header @@ -626,16 +627,18 @@ def summarize_manifest(report_meta, manifest_uuid): if not MANIFEST_ACCESSOR.manifest_ready_for_summary(manifest_id): return - new_report_meta = { - "schema": schema, - "schema_name": schema, - "provider_type": report_meta.get("provider_type"), - "provider_uuid": report_meta.get("provider_uuid"), - "manifest_id": manifest_id, - "manifest_uuid": manifest_uuid, - "start": start_date, - "end": end_date, - } + new_report_meta = [ + { + "schema": schema, + "schema_name": schema, + "provider_type": report_meta.get("provider_type"), + "provider_uuid": report_meta.get("provider_uuid"), + "manifest_id": manifest_id, + "manifest_uuid": manifest_uuid, + "start": start_date, + "end": end_date, + } + ] if not (start_date or end_date): # we cannot process without start and end dates LOG.info( @@ -644,9 +647,35 @@ def summarize_manifest(report_meta, manifest_uuid): return if "0001-01-01 00:00:00+00:00" not in [str(start_date), str(end_date)]: + dates = { + datetime.strptime(meta["meta_reportdatestart"], "%Y-%m-%d").date() + for meta in report_meta["ocp_files_to_process"].values() + } + min_date = min(dates) + max_date = max(dates) + # if we cross the month boundary, then we need to create 2 manifests: + # 1 for each month so that we summarize all the data correctly within the month bounds + if min_date.month != max_date.month: + dh = DateHelper() + new_report_meta[0]["start"] = min_date + new_report_meta[0]["end"] = dh.month_end(min_date) + + new_report_meta.append( + { + "schema": schema, + "schema_name": schema, + "provider_type": report_meta.get("provider_type"), + "provider_uuid": report_meta.get("provider_uuid"), + "manifest_id": manifest_id, + "manifest_uuid": manifest_uuid, + "start": dh.month_start(max_date), + "end": max_date, + } + ) + # we have valid dates, so we can summarize the payload LOG.info(log_json(manifest_uuid, msg="summarizing ocp reports", context=context)) - return summarize_reports.s([new_report_meta], ocp_processing_queue).apply_async(queue=ocp_processing_queue) + return summarize_reports.s(new_report_meta, ocp_processing_queue).apply_async(queue=ocp_processing_queue) cr_status = report_meta.get("cr_status", {}) if data_collection_message := cr_status.get("reports", {}).get("data_collection_message", ""): @@ -782,8 +811,7 @@ def process_messages(msg): ) ) process_complete = report_metas_complete(report_metas) - summary_task_id = summarize_manifest(report_meta, tracing_id) - if summary_task_id: + if summary_task_id := summarize_manifest(report_meta, tracing_id): LOG.info(log_json(tracing_id, msg=f"Summarization celery uuid: {summary_task_id}")) if status and not settings.DEBUG: diff --git a/koku/masu/processor/parquet/parquet_report_processor.py b/koku/masu/processor/parquet/parquet_report_processor.py index 68dbd44963..ba5b264afc 100644 --- a/koku/masu/processor/parquet/parquet_report_processor.py +++ b/koku/masu/processor/parquet/parquet_report_processor.py @@ -202,6 +202,14 @@ def start_date(self, new_start_date): LOG.error(log_json(self.tracing_id, msg=msg, context=self.error_context), exc_info=ex) raise ParquetReportProcessorError(msg) from ex + @property + def bill_date(self): + return self.start_date.replace(day=1) + + @property + def trino_table_exists_key(self): + return f"{self.report_type}|{self.bill_date}" + @property def create_table(self): """Whether to create the Hive/Trino table""" @@ -458,6 +466,13 @@ def convert_to_parquet(self): # noqa: C901 return parquet_base_filename, daily_data_frames for csv_filename in file_list: + + # set start date based on data in the file being processed: + if self.provider_type == Provider.PROVIDER_OCI: + self.start_date = str(csv_filename).split(".")[1] + elif self.provider_type == Provider.PROVIDER_OCP: + self.start_date = self.ocp_files_to_process[csv_filename.stem]["meta_reportdatestart"] + self.prepare_parquet_s3(Path(csv_filename)) if self.provider_type == Provider.PROVIDER_OCP and self.report_type is None: msg = "could not establish report type" @@ -470,9 +485,7 @@ def convert_to_parquet(self): # noqa: C901 ) ) raise ParquetReportProcessorError(msg) - if self.provider_type == Provider.PROVIDER_OCI: - file_specific_start_date = str(csv_filename).split(".")[1] - self.start_date = file_specific_start_date + parquet_base_filename, daily_frame, success = self.convert_csv_to_parquet(csv_filename) daily_data_frames.extend(daily_frame) if self.provider_type not in (Provider.PROVIDER_AZURE): @@ -498,16 +511,15 @@ def create_parquet_table(self, parquet_file, daily=False, partition_map=None): # Skip empty files, if we have no storage report data we can't create the table if parquet_file: processor = self._get_report_processor(parquet_file, daily=daily) - bill_date = self.start_date.replace(day=1) if not processor.schema_exists(): processor.create_schema() if not processor.table_exists(): processor.create_table(partition_map=partition_map) - self.trino_table_exists[self.report_type] = True - processor.get_or_create_postgres_partition(bill_date=bill_date) + self.trino_table_exists[self.trino_table_exists_key] = True + processor.get_or_create_postgres_partition(bill_date=self.bill_date) processor.sync_hive_partitions() if not daily: - processor.create_bill(bill_date=bill_date) + processor.create_bill(bill_date=self.bill_date) def check_required_columns_for_ingress_reports(self, col_names): LOG.info(log_json(msg="checking required columns for ingress reports", context=self._context)) @@ -570,7 +582,7 @@ def convert_csv_to_parquet(self, csv_filename: os.PathLike): # noqa: C901 ) ) self.post_processor.finalize_post_processing() - if self.create_table and not self.trino_table_exists.get(self.report_type): + if self.create_table and not self.trino_table_exists.get(self.trino_table_exists_key): self.create_parquet_table(parquet_filepath) except Exception as err: diff --git a/koku/masu/test/external/test_kafka_msg_handler.py b/koku/masu/test/external/test_kafka_msg_handler.py index 3191b28f0e..62b77516cd 100644 --- a/koku/masu/test/external/test_kafka_msg_handler.py +++ b/koku/masu/test/external/test_kafka_msg_handler.py @@ -10,6 +10,7 @@ import shutil import tempfile import uuid +from datetime import date from datetime import datetime from pathlib import Path from unittest.mock import patch @@ -486,6 +487,7 @@ def test_summarize_manifest(self): "file": "/path/to/file.csv", "start": str(datetime.now()), "end": str(datetime.now()), + "ocp_files_to_process": {"filename": {"meta_reportdatestart": str(datetime.now().date())}}, } with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True): @@ -500,6 +502,8 @@ def test_summarize_manifest(self): def test_summarize_manifest_dates(self): """Test report summarization.""" + start_date = date(year=2024, month=6, day=17) + end_date = date(year=2024, month=7, day=17) report_meta = { "schema_name": "test_schema", "manifest_id": "1", @@ -507,24 +511,40 @@ def test_summarize_manifest_dates(self): "provider_type": "OCP", "compression": "UNCOMPRESSED", "file": "/path/to/file.csv", - "start": str(datetime.now()), - "end": str(datetime.now()), - } - expected_meta = { - "schema": report_meta.get("schema_name"), - "schema_name": report_meta.get("schema_name"), - "provider_type": report_meta.get("provider_type"), - "provider_uuid": report_meta.get("provider_uuid"), - "manifest_id": report_meta.get("manifest_id"), - "start": report_meta.get("start"), - "end": report_meta.get("end"), - "manifest_uuid": "1234", + "start": "2024-07-17 17:00:00.000000", + "end": "2024-07-17 17:00:00.000000", + "ocp_files_to_process": { + "filename1": {"meta_reportdatestart": str(start_date)}, + "filename2": {"meta_reportdatestart": str(end_date)}, + }, } + expected_meta = [ + { + "schema": report_meta.get("schema_name"), + "schema_name": report_meta.get("schema_name"), + "provider_type": report_meta.get("provider_type"), + "provider_uuid": report_meta.get("provider_uuid"), + "manifest_id": report_meta.get("manifest_id"), + "start": date(year=2024, month=6, day=17), + "end": date(year=2024, month=6, day=30), + "manifest_uuid": "1234", + }, + { + "schema": report_meta.get("schema_name"), + "schema_name": report_meta.get("schema_name"), + "provider_type": report_meta.get("provider_type"), + "provider_uuid": report_meta.get("provider_uuid"), + "manifest_id": report_meta.get("manifest_id"), + "start": date(year=2024, month=7, day=1), + "end": date(year=2024, month=7, day=17), + "manifest_uuid": "1234", + }, + ] with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True): with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports: msg_handler.summarize_manifest(report_meta, self.manifest_id) - mock_summarize_reports.assert_called_with([expected_meta], OCPQueue.DEFAULT) + mock_summarize_reports.assert_called_with(expected_meta, OCPQueue.DEFAULT) with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=False): with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports: @@ -860,6 +880,7 @@ def test_summarize_manifest_called_with_XL_queue(self): "manifest_id": "1", "start": str(datetime.now()), "end": str(datetime.now()), + "ocp_files_to_process": {"filename": {"meta_reportdatestart": str(date.today())}}, } # Check when manifest is done diff --git a/koku/masu/test/processor/parquet/test_parquet_report_processor.py b/koku/masu/test/processor/parquet/test_parquet_report_processor.py index 5371bb1d7a..11bab8a355 100644 --- a/koku/masu/test/processor/parquet/test_parquet_report_processor.py +++ b/koku/masu/test/processor/parquet/test_parquet_report_processor.py @@ -61,7 +61,7 @@ def setUp(self): self.manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=True).first().id self.ocp_manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=False).first().id self.start_date = self.today - self.report_name = "koku-1.csv.gz" + self.report_name = Path("koku-1.csv.gz") self.report_path = f"/my/{self.test_assembly_id}/{self.report_name}" self.report_processor = ParquetReportProcessor( schema_name=self.schema, @@ -85,7 +85,17 @@ def setUp(self): provider_uuid=self.ocp_provider_uuid, provider_type=Provider.PROVIDER_OCP, manifest_id=self.manifest_id, - context={"tracing_id": self.tracing_id, "start_date": self.today, "create_table": True}, + context={ + "tracing_id": self.tracing_id, + "start_date": self.today, + "create_table": True, + "ocp_files_to_process": { + self.report_name.stem: { + "meta_reportdatestart": "2023-01-01", + "meta_reportnumhours": "2", + } + }, + }, ) ingress_uuid = "882083b7-ea62-4aab-aa6a-f0d08d65ee2b" self.ingress_report_dict = { @@ -271,15 +281,11 @@ def test_convert_to_parquet(self, mock_remove, mock_exists): self.assertEqual(file_name, "") self.assertTrue(data_frame.empty) - with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""): - with patch( - "masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None - ): - with patch( - "masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3" - ): - with self.assertRaises(ParquetReportProcessorError): - self.report_processor_ocp.convert_to_parquet() + with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch( + "masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None + ), patch("masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"): + with self.assertRaises(ParquetReportProcessorError): + self.report_processor_ocp.convert_to_parquet() expected = "no split files to convert to parquet" with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch.object(