From 408704ad8778d133e1971e62870992c190e0e735 Mon Sep 17 00:00:00 2001 From: Henri Rosten Date: Mon, 18 Dec 2023 10:39:28 +0200 Subject: [PATCH] Start using dfdiskcache in CPE local cache Signed-off-by: Henri Rosten --- src/sbomnix/cpe.py | 64 +++++++++------------------------------------- 1 file changed, 12 insertions(+), 52 deletions(-) diff --git a/src/sbomnix/cpe.py b/src/sbomnix/cpe.py index 1a25a31..018b344 100644 --- a/src/sbomnix/cpe.py +++ b/src/sbomnix/cpe.py @@ -7,12 +7,8 @@ """ Generate CPE (Common Platform Enumeration) identifiers""" import sys -import pathlib import string -import datetime -import shutil -import requests - +from dfdiskcache import DataFrameDiskCache from common.utils import ( LOG, LOG_SPAM, @@ -22,7 +18,8 @@ ############################################################################### -CACHE_DIR = "~/.cache/sbomnix" +_CPE_CSV_URL = "https://github.com/tiiuae/cpedict/raw/main/data/cpes.csv" +_CPE_CSV_CACHE_TTL = 60 * 60 * 24 ############################################################################### @@ -41,61 +38,24 @@ class _CPE: def __init__(self): LOG.debug("") - self.cpedict = pathlib.PosixPath(CACHE_DIR).expanduser() / "cpes.csv" - self.cpedict.parent.mkdir(parents=True, exist_ok=True) - self.df_cpedict = self._load_cpedict() + self.cache = DataFrameDiskCache() + self.df_cpedict = self.cache.get(_CPE_CSV_URL) + if self.df_cpedict is None: + LOG.debug("CPE cache miss, downloading: %s", _CPE_CSV_URL) + self.df_cpedict = df_from_csv_file(_CPE_CSV_URL) + self.cache.set(_CPE_CSV_URL, self.df_cpedict, ttl=_CPE_CSV_CACHE_TTL) + else: + LOG.debug("read CPE dictionary from cache") if self.df_cpedict is not None: # Verify the loaded cpedict contains at least the following columns required_cols = {"vendor", "product"} if not required_cols.issubset(self.df_cpedict): LOG.fatal( - "Missing required columns %s from cpedict, manually check: '%s'", + "Missing required columns %s from cpedict", required_cols, - self.cpedict, ) sys.exit(1) - def _load_cpedict(self): - LOG.debug("") - if not self.cpedict.exists() or self.cpedict.stat().st_size <= 0: - # Try updating cpe dictionary if it's not cached - if not self._update_cpedict(): - LOG.warning( - "Missing '%s': CPE identifiers will be inaccurate", self.cpedict - ) - return None - cpe_updated = datetime.datetime.fromtimestamp(self.cpedict.lstat().st_mtime) - week_ago = datetime.datetime.now() - datetime.timedelta(days=7) - if cpe_updated < week_ago: - # Try updating cpe dictionary if it wasn't recently updated - LOG.debug("Attempting periodic update of cpe dictionary") - if not self._update_cpedict(): - LOG.warning( - "CPE data is not up-to-date: CPE identifiers will be inaccurate" - ) - return df_from_csv_file(self.cpedict, exit_on_error=False) - - def _update_cpedict(self): - """Updates local cpe dictionary""" - LOG.debug("") - cpedict_bak = None - if self.cpedict.exists() and self.cpedict.stat().st_size > 0: - # Backup the original cpedict to be able to rollback in case the update - # fails - cpedict_bak = pathlib.PosixPath(CACHE_DIR).expanduser() / "cpes.csv.bak" - shutil.copy(self.cpedict, cpedict_bak) - with open(self.cpedict.as_posix(), "wb") as f: - url = "https://github.com/tiiuae/cpedict/raw/main/data/cpes.csv" - try: - f.write(requests.get(url, stream=True, timeout=10).content) - return True - except requests.exceptions.RequestException as e: - LOG.warning("CPE data update failed: %s", e) - if cpedict_bak: - LOG.debug("Rollback earlier cpedict after failed update") - shutil.copy(cpedict_bak, self.cpedict) - return False - def _cpedict_vendor(self, product): if not product or len(product) == 1: LOG.debug("invalid product name '%s'", product)