Skip to content

Commit

Permalink
Fetch available reports from fff_dqmtools
Browse files Browse the repository at this point in the history
..and store them in the database. The latter
is only storing the metadata of the comparisons
(base, comp releases, PRs involved), and the
actual HTML report is stored on the filesystem.

Serve the HTML reports from a dedicated
endpoint, separate API endpoints for querying
the available ones.
  • Loading branch information
nothingface0 committed Jan 28, 2025
1 parent 468b43e commit 5224201
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 7 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
*.log
*.db
*.pyc
tmp/*
tmp/
log/
cmssw_comparison_reports/
__pycache__/
.vscode/
dqm2m_production.db_test
Expand Down
164 changes: 162 additions & 2 deletions db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### DQM^2 Mirror DB === >
import os
import sys
import shutil
import logging
import psycopg2
import sqlalchemy
Expand All @@ -14,7 +15,7 @@

sys.path.append(os.path.join(os.path.dirname(__file__), "."))

from dqmsquare_cfg import TZ, TIMEZONE
from dqmsquare_cfg import TZ, TIMEZONE, load_cfg
from utils.common import (
get_short_client_name,
filter_clients,
Expand All @@ -23,6 +24,7 @@
)

DEFAULT_DATETIME = TZ.localize(datetime(2012, 3, 3, 10, 10, 10, 0))
cfg = load_cfg()


class DQM2MirrorDB:
Expand Down Expand Up @@ -100,6 +102,25 @@ class DQM2MirrorDB:
)
TB_DESCRIPTION_HOST_STATUS_COLS = ["host_id", "is_up"]

TB_NAME_CMSSW_COMPARISON_REPORTS = "cmsswcomparisonreports"
TB_DESCRIPTION_CMSSW_COMPARISON_REPORTS = (
"( id VARCHAR(160) PRIMARY KEY, "
"created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), "
"comparison_ran_at TIMESTAMP WITH TIME ZONE NOT NULL, "
"base_cmssw_version VARCHAR(25) NOT NULL, "
"comp_cmssw_version VARCHAR(25) NOT NULL, "
"base_prs INT[], "
"comp_prs INT[])"
)
TB_DESCRIPTION_CMSSW_COMPARISON_REPORTS_COLS = [
"id",
"comparison_ran_at",
"base_cmssw_version",
"comp_cmssw_version",
"base_prs",
"comp_prs",
]

def __str__(self):
return f"{self.__class__.__name__}: {self.db_uri}"

Expand Down Expand Up @@ -245,9 +266,23 @@ def create_tables(self):
self.log.error("Error occurred: ", e)
session.rollback()

try:
self.log.info(f"Creating table {self.TB_NAME_CMSSW_COMPARISON_REPORTS}")
session.execute(
text(
"CREATE TABLE IF NOT EXISTS "
+ self.TB_NAME_CMSSW_COMPARISON_REPORTS
+ " "
+ self.TB_DESCRIPTION_CMSSW_COMPARISON_REPORTS
)
)
except psycopg2.IntegrityError as e:
self.log.error("Error occurred: ", e)
session.rollback()

try:
self.log.info(f'Running "migrations"')
with open("./sql/migrations/0001_indexes.sql") as f:
with open("./sql/migrations/add_indices.sql") as f:
session.execute(text(f.read()))
except psycopg2.IntegrityError as e:
self.log.error(f"Error occurred: ", e)
Expand Down Expand Up @@ -540,6 +575,120 @@ def fill_cluster_status(self, cluster_status: dict):
)
)

def get_cmssw_comparison_reports(self) -> list[dict]:
"""
Check if comparison report exists in DB and return result
"""
with self.engine.connect() as cur:
results = (
cur.execute(
text(
f"SET TIMEZONE = '{TIMEZONE}'; "
+ f"SELECT * FROM {self.TB_NAME_CMSSW_COMPARISON_REPORTS} ORDER BY comparison_ran_at DESC;"
)
)
.mappings()
.all()
)
if results:
return [dict(result) for result in results]
return []

def get_cmssw_comparison_report(self, id: str) -> dict:
"""
Check if comparison report exists in DB and return result
"""
with self.engine.connect() as cur:
result = (
cur.execute(
text(
f"SET TIMEZONE = '{TIMEZONE}'; SELECT * FROM {self.TB_NAME_CMSSW_COMPARISON_REPORTS} WHERE id=:id"
),
id=id,
)
.mappings()
.first()
)
if result:
return dict(result)
return {}

def _store_cmssw_comparison_report_html(
self, comparison_id: str, comparison_report_html: str
) -> None:
"""
Given a unique comparison id, and the contents of the HTML report, store
it into a unique directory under the SERVER_DATA_PATH.
This is preferred to storing the file as a blob inside the db, which
is generally advised against.
"""
if not cfg["CMSSW_COMPARISON_REPORTS_STORE_PATH"].exists():
self.log.debug(f"Creating {cfg['CMSSW_COMPARISON_REPORTS_STORE_PATH']}")
cfg["CMSSW_COMPARISON_REPORTS_STORE_PATH"].mkdir()

comparison_report_dir = cfg["CMSSW_COMPARISON_REPORTS_STORE_PATH"].joinpath(
comparison_id
)
if comparison_report_dir.exists():
shutil.rmtree(comparison_report_dir)
self.log.info(f"Creating {comparison_report_dir}")
comparison_report_dir.mkdir()
self.log.info(
f"Storing {comparison_report_dir.joinpath('dqm-histo-comparison-summary.html')}"
)
with open(
comparison_report_dir.joinpath("dqm-histo-comparison-summary.html"),
"w+",
) as f:
f.write(comparison_report_html)

def fill_cmssw_comparison_report(
self, comparison_report_metadata: dict, comparison_report_html: str
) -> dict:
"""
For the given report, check if it exists in the DB
and store it, if not.
"""

assert isinstance(comparison_report_metadata, dict)
assert len(comparison_report_metadata.keys()) > 0
assert all(
key in comparison_report_metadata
for key in ["id", "base", "comp", "comp_prs", "base_prs"]
), "Comparison report entry does not contain all the expected data"

self.log.debug(f"Filling data: {comparison_report_metadata}")
with self.engine.connect() as cur:
result = cur.execute(
sqlalchemy.insert(
self.db_meta.tables[self.TB_NAME_CMSSW_COMPARISON_REPORTS]
).values(
dict(
zip(
self.TB_DESCRIPTION_CMSSW_COMPARISON_REPORTS_COLS,
[
comparison_report_metadata[key]
for key in [
"id",
"timestamp",
"base",
"comp",
"base_prs",
"comp_prs",
]
],
)
)
)
)
# Store the HTML in the appropriate directory
self._store_cmssw_comparison_report_html(
comparison_id=comparison_report_metadata["id"],
comparison_report_html=comparison_report_html,
)
return result

def get_run(
self,
run_start: int,
Expand Down Expand Up @@ -774,3 +923,14 @@ def get_runs_around(self, run: int) -> list:
).all()
answer = [item[0] for item in answer]
return answer


if __name__ == "__main__":
db_playback = DQM2MirrorDB(
log=logging.getLogger(__name__),
host=cfg.get("DB_PLAYBACK_HOST"),
port=cfg.get("DB_PLAYBACK_PORT"),
username=cfg.get("DB_PLAYBACK_USERNAME"),
password=cfg.get("DB_PLAYBACK_PASSWORD"),
db_name=cfg.get("DB_PLAYBACK_NAME"),
)
8 changes: 8 additions & 0 deletions dqmsquare_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import pytz
import tempfile
from pathlib import Path
from dotenv import load_dotenv

# Important for converting datetime objects (from the database)
Expand Down Expand Up @@ -42,6 +43,10 @@ def load_cfg() -> dict:
cfg["GRABBER_SLEEP_TIME_STATUS"] = int(
os.environ.get("GRABBER_SLEEP_TIME_STATUS", 30)
)
# How often to ask the playback machines for new CMSSW comparison
# reports. Keep it above 30 secs.
# sec, int
cfg["GRABBER_SLEEP_TIME_COMP"] = int(os.environ.get("GRABBER_SLEEP_TIME_COMP", 30))

cfg["LOGGER_ROTATION_TIME"] = 24 # h, int
cfg["LOGGER_MAX_N_LOG_FILES"] = 10 # int
Expand All @@ -67,6 +72,9 @@ def load_cfg() -> dict:

# This is used both as part of URLs and local filenames, so it must not start with a "/"
cfg["SERVER_DATA_PATH"] = mount_path if cfg["ENV"] != "development" else ""
cfg["CMSSW_COMPARISON_REPORTS_STORE_PATH"] = Path(cfg["SERVER_DATA_PATH"]).joinpath(
"cmssw_comparison_reports"
)
# The prefix is appended to the base URL, to create relative URLs.
# Since the k8s deployment is served at /dqm/dqm-square, we always need to append
# to the base URL (cmsweb.cern.ch) to have relative URLs.
Expand Down
87 changes: 87 additions & 0 deletions grabber.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,61 @@ def get_cluster_status(cluster: str = "playback") -> dict | None:
return None


def get_list_of_comparison_reports() -> list[dict]:
url = urljoin(
cfg["CMSWEB_FRONTEND_PROXY_URL"] + "/",
"cr/exe?" + urlencode({"what": "cmssw_comparison_reports"}),
)
response = _request_wrapper(
requests.get,
url,
cookies=_get_cookies(),
verify=False,
cert=([cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]),
)
if response:
if response.status_code != 200:
logger.error(
f"fff_dqmtools ({url}) returned {response.status_code}. Response: "
f"{response.text}"
)
raise Exception(
f"Failed to fetch list of CMSSW comparison reports. Got ({response.status_code}) {response.text}"
)
logger.debug("Got list of CMSSW comparison reports")
return response.json()

return []


def get_single_comparison_report(comparison_id: str) -> str:
url = urljoin(
cfg["CMSWEB_FRONTEND_PROXY_URL"] + "/",
"cr/exe?"
+ urlencode({"what": "cmssw_comparison_reports", "id": comparison_id}),
)
response = _request_wrapper(
requests.get,
url,
cookies=_get_cookies(),
verify=False,
cert=([cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]),
)
if response:
if response.status_code != 200:
logger.error(
f"fff_dqmtools ({url}) returned {response.status_code}. Response: "
f"{response.text}"
)
raise Exception(
f"Failed to fetch list of CMSSW comparison reports. Got ({response.status_code}) {response.text}"
)
logger.debug("Got list of CMSSW comparison reports")
return response.text

return ""


def get_latest_info_from_host(host: str, rev: int) -> list[dict]:
"""
Given a host and a revision, it gets the latest headers from the host specified,
Expand Down Expand Up @@ -384,6 +439,31 @@ def loop_status(
logger.debug(f"Sleeping for {timeout}s")
time.sleep(timeout)

def loop_cmssw_comparison_reports(
db: DQM2MirrorDB,
timeout: int = cfg["GRABBER_SLEEP_TIME_COMP"],
):
while True:
try:
results = get_list_of_comparison_reports()
if results:
logger.debug(f"There are {len(results)} comparisons available.")

for report in results:
if db.get_cmssw_comparison_report(report["id"]):
logger.debug(f"Report {report['id']} already in db")
continue
logger.debug(f"Requesting HTML report for {report['id']}")
html_report = get_single_comparison_report(report["id"])
db.fill_cmssw_comparison_report(
comparison_report_metadata=report,
comparison_report_html=html_report,
)
except Exception:
logger.warning("Unhandled exception in status loop", exc_info=True)
logger.debug(f"Sleeping for {timeout}s")
time.sleep(timeout)

active_threads = []
if "playback" in run_modes:
active_threads.append(
Expand All @@ -400,6 +480,13 @@ def loop_status(
daemon=True,
)
)
active_threads.append(
threading.Thread(
target=loop_cmssw_comparison_reports,
args=[db_playback, cfg["GRABBER_SLEEP_TIME_COMP"]],
daemon=True,
)
)
if "production" in run_modes:
active_threads.append(
threading.Thread(
Expand Down
Loading

0 comments on commit 5224201

Please sign in to comment.