From 8dd6755d77bba397ca98208b38a7d59560eda609 Mon Sep 17 00:00:00 2001 From: Henri Rosten Date: Mon, 18 Dec 2023 14:25:12 +0200 Subject: [PATCH] Use nixmeta cache in sbomnix Signed-off-by: Henri Rosten --- src/nixupdate/nix_outdated.py | 2 +- src/sbomnix/main.py | 18 ++-------- src/sbomnix/meta.py | 43 ++++++++++++++++++++++ src/sbomnix/sbomdb.py | 65 ++++++---------------------------- src/vulnxscan/vulnxscan_cli.py | 2 +- tests/test_sbomnix.py | 20 ++++++++++- 6 files changed, 76 insertions(+), 74 deletions(-) create mode 100644 src/sbomnix/meta.py diff --git a/src/nixupdate/nix_outdated.py b/src/nixupdate/nix_outdated.py index dbbed91..595f82f 100755 --- a/src/nixupdate/nix_outdated.py +++ b/src/nixupdate/nix_outdated.py @@ -77,7 +77,7 @@ def getargs(): def _generate_sbom(target_path, buildtime=False): LOG.info("Generating SBOM for target '%s'", target_path) - sbomdb = SbomDb(target_path, buildtime, meta_path=None) + sbomdb = SbomDb(target_path, buildtime) prefix = "nixdeps_" suffix = ".cdx.json" with NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as f: diff --git a/src/sbomnix/main.py b/src/sbomnix/main.py index e5d7b2d..1256df6 100755 --- a/src/sbomnix/main.py +++ b/src/sbomnix/main.py @@ -10,7 +10,6 @@ import pathlib from sbomnix.sbomdb import SbomDb from common.utils import ( - LOG, set_log_verbosity, check_positive, get_py_pkg_version, @@ -27,19 +26,11 @@ def getargs(): "in NIX_PATH and " "writes SBOM file(s) as specified in output arguments." ) - epil = "Example: sbomnix /path/to/nix/out --meta /path/to/meta.json" + epil = "Example: sbomnix /path/to/nix/out" parser = argparse.ArgumentParser(description=desc, epilog=epil) helps = "Path to nix artifact, e.g.: derivation file or nix output path" parser.add_argument("NIX_PATH", help=helps, type=pathlib.Path) - helps = ( - "Path to json file that details meta information. " - "Generate this file with: `nix-env -qa --meta --json '.*' >meta.json` " - "then give the path to generated json file to this script via the " - "--meta argument to include the license and maintainer information " - "to the output of this script (default: None)" - ) - parser.add_argument("--meta", nargs="?", help=helps, default=None) helps = "Scan buildtime dependencies instead of runtime dependencies" parser.add_argument("--buildtime", help=helps, action="store_true") helps = ( @@ -76,12 +67,7 @@ def main(): target_path = args.NIX_PATH.resolve().as_posix() runtime = args.buildtime is False exit_unless_nix_artifact(target_path, force_realise=runtime) - if not args.meta: - LOG.warning( - "Command line argument '--meta' missing: SBOM will not include " - "license information (see '--help' for more details)" - ) - sbomdb = SbomDb(target_path, args.buildtime, args.meta, args.depth) + sbomdb = SbomDb(target_path, args.buildtime, args.depth) if args.cdx: sbomdb.to_cdx(args.cdx) if args.spdx: diff --git a/src/sbomnix/meta.py b/src/sbomnix/meta.py new file mode 100644 index 0000000..d36c23f --- /dev/null +++ b/src/sbomnix/meta.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2022-2023 Technology Innovation Institute (TII) +# +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-few-public-methods + +"""Cache nixpkgs meta information""" + +from dfdiskcache import DataFrameDiskCache + +from common.utils import ( + LOG, + df_from_csv_file, +) + +############################################################################### + +_NIXMETA_CSV_URL = "https://github.com/henrirosten/nixmeta/raw/main/data/nixmeta.csv" +_NIXMETA_CSV_URL_TTL = 60 * 60 * 24 + +############################################################################### + + +class NixMeta: + """Cache nixpkgs meta information""" + + def __init__(self): + LOG.debug("") + self.cache = DataFrameDiskCache() + self.df_nixmeta = self.cache.get(_NIXMETA_CSV_URL) + if self.df_nixmeta is None: + LOG.debug("nixmeta cache miss, downloading: %s", _NIXMETA_CSV_URL) + self.df_nixmeta = df_from_csv_file(_NIXMETA_CSV_URL) + self.cache.set(_NIXMETA_CSV_URL, self.df_nixmeta, ttl=_NIXMETA_CSV_URL_TTL) + else: + LOG.debug("read nixmeta from cache") + + def get_df(self): + """Return nix meta information as pandas dataframe""" + return self.df_nixmeta + + +############################################################################### diff --git a/src/sbomnix/sbomdb.py b/src/sbomnix/sbomdb.py index 829de44..84d9bc1 100644 --- a/src/sbomnix/sbomdb.py +++ b/src/sbomnix/sbomdb.py @@ -19,6 +19,7 @@ from reuse._licenses import LICENSE_MAP as SPDX_LICENSES from nixgraph.graph import NixDependencies from sbomnix.nix import Store, find_deriver +from sbomnix.meta import NixMeta from common.utils import LOG, df_to_csv_file, get_py_pkg_version ############################################################################### @@ -27,13 +28,12 @@ class SbomDb: """Generates SBOMs in various formats""" - def __init__(self, nix_path, buildtime=False, meta_path=None, depth=None): + def __init__(self, nix_path, buildtime=False, depth=None): # self.uid specifies the attribute that SbomDb uses as unique # identifier for the sbom components. See the column names in # self.df_sbomdb (sbom.csv) for a list of all components' attributes. self.uid = "store_path" self.buildtime = buildtime - self.meta_path = meta_path self.target_deriver = find_deriver(nix_path) self.df_deps = None self.depth = depth @@ -84,18 +84,22 @@ def _init_sbomdb(self): store.add_path(path) self.df_sbomdb = store.to_dataframe() # Join with meta information - self._sbomdb_join_meta(self.meta_path) + self._sbomdb_join_meta() # Clean, drop duplicates, sort self.df_sbomdb.replace(np.nan, "", regex=True, inplace=True) self.df_sbomdb.drop_duplicates(subset=[self.uid], keep="first", inplace=True) self.df_sbomdb.sort_values(by=["name", self.uid], inplace=True) self.df_sbomdb_outputs_exploded = self.df_sbomdb.explode("outputs") - def _sbomdb_join_meta(self, meta_path): + def _sbomdb_join_meta(self): """Join self.df_sbomdb with meta information""" - if meta_path is None: + df_meta = NixMeta().get_df() + if df_meta is None or df_meta.empty: + LOG.warning( + "Failed reading nix meta information: " + "SBOM will include only minimum set of attributes" + ) return - df_meta = _parse_json_metadata(meta_path) if LOG.level <= logging.DEBUG: df_to_csv_file(df_meta, "meta.csv") # Join based on package name including the version number @@ -406,53 +410,4 @@ def _drv_to_cdx_dependency(drv, deps_list, uid="store_path"): return dependency -############################################################################### - -# Nix package metadata - - -def _parse_meta_entry(meta, key): - """Parse the given key from the metadata entry""" - items = [] - if isinstance(meta, dict): - items.extend([_parse_meta_entry(meta.get(key, ""), key)]) - elif isinstance(meta, list): - items.extend([_parse_meta_entry(x, key) for x in meta]) - else: - return str(meta) - return ";".join(list(filter(None, items))) - - -def _parse_json_metadata(json_filename): - """Parse package metadata from the specified json file""" - with open(json_filename, "r", encoding="utf-8") as inf: - LOG.info('Loading meta info from "%s"', json_filename) - json_dict = json.loads(inf.read()) - dict_selected = {} - setcol = dict_selected.setdefault - for nixpkg_name, pkg in json_dict.items(): - # generic package info - setcol("nixpkgs", []).append(nixpkg_name) - setcol("name", []).append(pkg.get("name", "")) - setcol("pname", []).append(pkg.get("pname", "")) - setcol("version", []).append(pkg.get("version", "")) - # meta - meta = pkg.get("meta", {}) - setcol("meta_homepage", []).append(meta.get("homepage", "")) - setcol("meta_position", []).append(meta.get("position", "")) - setcol("meta_unfree", []).append(meta.get("unfree", "")) - setcol("meta_description", []).append(meta.get("description", "")) - # meta.license - meta_license = meta.get("license", {}) - license_short = _parse_meta_entry(meta_license, key="shortName") - setcol("meta_license_short", []).append(license_short) - license_spdx = _parse_meta_entry(meta_license, key="spdxId") - setcol("meta_license_spdxid", []).append(license_spdx) - # meta.maintainers - meta_maintainers = meta.get("maintainers", {}) - emails = _parse_meta_entry(meta_maintainers, key="email") - setcol("meta_maintainers_email", []).append(emails) - return pd.DataFrame(dict_selected) - - ################################################################################ diff --git a/src/vulnxscan/vulnxscan_cli.py b/src/vulnxscan/vulnxscan_cli.py index b57d5df..b65bb67 100755 --- a/src/vulnxscan/vulnxscan_cli.py +++ b/src/vulnxscan/vulnxscan_cli.py @@ -736,7 +736,7 @@ def _is_patched(row): def _generate_sbom(target_path, buildtime=False): LOG.info("Generating SBOM for target '%s'", target_path) - sbomdb = SbomDb(target_path, buildtime, meta_path=None) + sbomdb = SbomDb(target_path, buildtime) prefix = "vulnxscan_" cdx_suffix = ".json" csv_suffix = ".csv" diff --git a/tests/test_sbomnix.py b/tests/test_sbomnix.py index ec171b9..eca1d66 100644 --- a/tests/test_sbomnix.py +++ b/tests/test_sbomnix.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: Apache-2.0 # pylint: disable=invalid-name, global-statement, redefined-outer-name +# pylint: disable=too-few-public-methods """ Tests for sbomnix """ @@ -15,6 +16,9 @@ import pandas as pd import jsonschema import pytest +import referencing +import referencing.retrieval +import requests from common.utils import ( df_from_csv_file, @@ -653,6 +657,19 @@ def test_whitelist(): ################################################################################ +class JSONSchemaRetrieve: + """Cached retriever that can be used with jsonschema.validate""" + + @staticmethod + @referencing.retrieval.to_cached_resource() + def _retrieve_via_requests(uri): + print(f"retrieving schema: {uri}") + return requests.get(uri, timeout=10).text + + def __call__(self, uri): + return self._retrieve_via_requests(uri) + + def validate_json(file_path, schema_path): """Validate json file matches schema""" with open(file_path, encoding="utf-8") as json_file, open( @@ -660,7 +677,8 @@ def validate_json(file_path, schema_path): ) as schema_file: json_obj = json.load(json_file) schema_obj = json.load(schema_file) - jsonschema.validate(json_obj, schema_obj) + reg = referencing.Registry(retrieve=JSONSchemaRetrieve()) + jsonschema.validate(json_obj, schema_obj, registry=reg) def df_to_string(df):