Skip to content

Commit

Permalink
[COST-4283] OCP: set start-date based on data in file being processed (
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb authored Jul 17, 2024
1 parent 166d175 commit ef60c15
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ docker-up-min-no-build-with-listener: docker-up-min-no-build
docker-up-db:
$(DOCKER_COMPOSE) up -d db
$(DOCKER_COMPOSE) up -d unleash
dev/scripts/setup_unleash.py
$(PYTHON) dev/scripts/setup_unleash.py

docker-up-db-monitor:
$(DOCKER_COMPOSE) up --build -d grafana
Expand Down
54 changes: 41 additions & 13 deletions koku/masu/external/kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from api.common import log_json
from api.provider.models import Provider
from api.utils import DateHelper
from common.queues import get_customer_queue
from common.queues import OCPQueue
from kafka_utils.utils import extract_from_header
Expand Down Expand Up @@ -626,16 +627,18 @@ def summarize_manifest(report_meta, manifest_uuid):
if not MANIFEST_ACCESSOR.manifest_ready_for_summary(manifest_id):
return

new_report_meta = {
"schema": schema,
"schema_name": schema,
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": manifest_id,
"manifest_uuid": manifest_uuid,
"start": start_date,
"end": end_date,
}
new_report_meta = [
{
"schema": schema,
"schema_name": schema,
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": manifest_id,
"manifest_uuid": manifest_uuid,
"start": start_date,
"end": end_date,
}
]
if not (start_date or end_date):
# we cannot process without start and end dates
LOG.info(
Expand All @@ -644,9 +647,35 @@ def summarize_manifest(report_meta, manifest_uuid):
return

if "0001-01-01 00:00:00+00:00" not in [str(start_date), str(end_date)]:
dates = {
datetime.strptime(meta["meta_reportdatestart"], "%Y-%m-%d").date()
for meta in report_meta["ocp_files_to_process"].values()
}
min_date = min(dates)
max_date = max(dates)
# if we cross the month boundary, then we need to create 2 manifests:
# 1 for each month so that we summarize all the data correctly within the month bounds
if min_date.month != max_date.month:
dh = DateHelper()
new_report_meta[0]["start"] = min_date
new_report_meta[0]["end"] = dh.month_end(min_date)

new_report_meta.append(
{
"schema": schema,
"schema_name": schema,
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": manifest_id,
"manifest_uuid": manifest_uuid,
"start": dh.month_start(max_date),
"end": max_date,
}
)

# we have valid dates, so we can summarize the payload
LOG.info(log_json(manifest_uuid, msg="summarizing ocp reports", context=context))
return summarize_reports.s([new_report_meta], ocp_processing_queue).apply_async(queue=ocp_processing_queue)
return summarize_reports.s(new_report_meta, ocp_processing_queue).apply_async(queue=ocp_processing_queue)

cr_status = report_meta.get("cr_status", {})
if data_collection_message := cr_status.get("reports", {}).get("data_collection_message", ""):
Expand Down Expand Up @@ -782,8 +811,7 @@ def process_messages(msg):
)
)
process_complete = report_metas_complete(report_metas)
summary_task_id = summarize_manifest(report_meta, tracing_id)
if summary_task_id:
if summary_task_id := summarize_manifest(report_meta, tracing_id):
LOG.info(log_json(tracing_id, msg=f"Summarization celery uuid: {summary_task_id}"))

if status and not settings.DEBUG:
Expand Down
28 changes: 20 additions & 8 deletions koku/masu/processor/parquet/parquet_report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ def start_date(self, new_start_date):
LOG.error(log_json(self.tracing_id, msg=msg, context=self.error_context), exc_info=ex)
raise ParquetReportProcessorError(msg) from ex

@property
def bill_date(self):
return self.start_date.replace(day=1)

@property
def trino_table_exists_key(self):
return f"{self.report_type}|{self.bill_date}"

@property
def create_table(self):
"""Whether to create the Hive/Trino table"""
Expand Down Expand Up @@ -458,6 +466,13 @@ def convert_to_parquet(self): # noqa: C901
return parquet_base_filename, daily_data_frames

for csv_filename in file_list:

# set start date based on data in the file being processed:
if self.provider_type == Provider.PROVIDER_OCI:
self.start_date = str(csv_filename).split(".")[1]
elif self.provider_type == Provider.PROVIDER_OCP:
self.start_date = self.ocp_files_to_process[csv_filename.stem]["meta_reportdatestart"]

self.prepare_parquet_s3(Path(csv_filename))
if self.provider_type == Provider.PROVIDER_OCP and self.report_type is None:
msg = "could not establish report type"
Expand All @@ -470,9 +485,7 @@ def convert_to_parquet(self): # noqa: C901
)
)
raise ParquetReportProcessorError(msg)
if self.provider_type == Provider.PROVIDER_OCI:
file_specific_start_date = str(csv_filename).split(".")[1]
self.start_date = file_specific_start_date

parquet_base_filename, daily_frame, success = self.convert_csv_to_parquet(csv_filename)
daily_data_frames.extend(daily_frame)
if self.provider_type not in (Provider.PROVIDER_AZURE):
Expand All @@ -498,16 +511,15 @@ def create_parquet_table(self, parquet_file, daily=False, partition_map=None):
# Skip empty files, if we have no storage report data we can't create the table
if parquet_file:
processor = self._get_report_processor(parquet_file, daily=daily)
bill_date = self.start_date.replace(day=1)
if not processor.schema_exists():
processor.create_schema()
if not processor.table_exists():
processor.create_table(partition_map=partition_map)
self.trino_table_exists[self.report_type] = True
processor.get_or_create_postgres_partition(bill_date=bill_date)
self.trino_table_exists[self.trino_table_exists_key] = True
processor.get_or_create_postgres_partition(bill_date=self.bill_date)
processor.sync_hive_partitions()
if not daily:
processor.create_bill(bill_date=bill_date)
processor.create_bill(bill_date=self.bill_date)

def check_required_columns_for_ingress_reports(self, col_names):
LOG.info(log_json(msg="checking required columns for ingress reports", context=self._context))
Expand Down Expand Up @@ -570,7 +582,7 @@ def convert_csv_to_parquet(self, csv_filename: os.PathLike): # noqa: C901
)
)
self.post_processor.finalize_post_processing()
if self.create_table and not self.trino_table_exists.get(self.report_type):
if self.create_table and not self.trino_table_exists.get(self.trino_table_exists_key):
self.create_parquet_table(parquet_filepath)

except Exception as err:
Expand Down
47 changes: 34 additions & 13 deletions koku/masu/test/external/test_kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import tempfile
import uuid
from datetime import date
from datetime import datetime
from pathlib import Path
from unittest.mock import patch
Expand Down Expand Up @@ -486,6 +487,7 @@ def test_summarize_manifest(self):
"file": "/path/to/file.csv",
"start": str(datetime.now()),
"end": str(datetime.now()),
"ocp_files_to_process": {"filename": {"meta_reportdatestart": str(datetime.now().date())}},
}

with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True):
Expand All @@ -500,31 +502,49 @@ def test_summarize_manifest(self):

def test_summarize_manifest_dates(self):
"""Test report summarization."""
start_date = date(year=2024, month=6, day=17)
end_date = date(year=2024, month=7, day=17)
report_meta = {
"schema_name": "test_schema",
"manifest_id": "1",
"provider_uuid": uuid.uuid4(),
"provider_type": "OCP",
"compression": "UNCOMPRESSED",
"file": "/path/to/file.csv",
"start": str(datetime.now()),
"end": str(datetime.now()),
}
expected_meta = {
"schema": report_meta.get("schema_name"),
"schema_name": report_meta.get("schema_name"),
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": report_meta.get("manifest_id"),
"start": report_meta.get("start"),
"end": report_meta.get("end"),
"manifest_uuid": "1234",
"start": "2024-07-17 17:00:00.000000",
"end": "2024-07-17 17:00:00.000000",
"ocp_files_to_process": {
"filename1": {"meta_reportdatestart": str(start_date)},
"filename2": {"meta_reportdatestart": str(end_date)},
},
}
expected_meta = [
{
"schema": report_meta.get("schema_name"),
"schema_name": report_meta.get("schema_name"),
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": report_meta.get("manifest_id"),
"start": date(year=2024, month=6, day=17),
"end": date(year=2024, month=6, day=30),
"manifest_uuid": "1234",
},
{
"schema": report_meta.get("schema_name"),
"schema_name": report_meta.get("schema_name"),
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": report_meta.get("manifest_id"),
"start": date(year=2024, month=7, day=1),
"end": date(year=2024, month=7, day=17),
"manifest_uuid": "1234",
},
]

with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True):
with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports:
msg_handler.summarize_manifest(report_meta, self.manifest_id)
mock_summarize_reports.assert_called_with([expected_meta], OCPQueue.DEFAULT)
mock_summarize_reports.assert_called_with(expected_meta, OCPQueue.DEFAULT)

with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=False):
with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports:
Expand Down Expand Up @@ -860,6 +880,7 @@ def test_summarize_manifest_called_with_XL_queue(self):
"manifest_id": "1",
"start": str(datetime.now()),
"end": str(datetime.now()),
"ocp_files_to_process": {"filename": {"meta_reportdatestart": str(date.today())}},
}

# Check when manifest is done
Expand Down
28 changes: 17 additions & 11 deletions koku/masu/test/processor/parquet/test_parquet_report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def setUp(self):
self.manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=True).first().id
self.ocp_manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=False).first().id
self.start_date = self.today
self.report_name = "koku-1.csv.gz"
self.report_name = Path("koku-1.csv.gz")
self.report_path = f"/my/{self.test_assembly_id}/{self.report_name}"
self.report_processor = ParquetReportProcessor(
schema_name=self.schema,
Expand All @@ -85,7 +85,17 @@ def setUp(self):
provider_uuid=self.ocp_provider_uuid,
provider_type=Provider.PROVIDER_OCP,
manifest_id=self.manifest_id,
context={"tracing_id": self.tracing_id, "start_date": self.today, "create_table": True},
context={
"tracing_id": self.tracing_id,
"start_date": self.today,
"create_table": True,
"ocp_files_to_process": {
self.report_name.stem: {
"meta_reportdatestart": "2023-01-01",
"meta_reportnumhours": "2",
}
},
},
)
ingress_uuid = "882083b7-ea62-4aab-aa6a-f0d08d65ee2b"
self.ingress_report_dict = {
Expand Down Expand Up @@ -271,15 +281,11 @@ def test_convert_to_parquet(self, mock_remove, mock_exists):
self.assertEqual(file_name, "")
self.assertTrue(data_frame.empty)

with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""):
with patch(
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None
):
with patch(
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"
):
with self.assertRaises(ParquetReportProcessorError):
self.report_processor_ocp.convert_to_parquet()
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch(
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None
), patch("masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"):
with self.assertRaises(ParquetReportProcessorError):
self.report_processor_ocp.convert_to_parquet()

expected = "no split files to convert to parquet"
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch.object(
Expand Down

0 comments on commit ef60c15

Please sign in to comment.