Skip to content

Commit

Permalink
Use nixmeta cache in sbomnix
Browse files Browse the repository at this point in the history
Signed-off-by: Henri Rosten <[email protected]>
  • Loading branch information
henrirosten committed Dec 19, 2023
1 parent 408704a commit 8dd6755
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/nixupdate/nix_outdated.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def getargs():

def _generate_sbom(target_path, buildtime=False):
LOG.info("Generating SBOM for target '%s'", target_path)
sbomdb = SbomDb(target_path, buildtime, meta_path=None)
sbomdb = SbomDb(target_path, buildtime)
prefix = "nixdeps_"
suffix = ".cdx.json"
with NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as f:
Expand Down
18 changes: 2 additions & 16 deletions src/sbomnix/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import pathlib
from sbomnix.sbomdb import SbomDb
from common.utils import (
LOG,
set_log_verbosity,
check_positive,
get_py_pkg_version,
Expand All @@ -27,19 +26,11 @@ def getargs():
"in NIX_PATH and "
"writes SBOM file(s) as specified in output arguments."
)
epil = "Example: sbomnix /path/to/nix/out --meta /path/to/meta.json"
epil = "Example: sbomnix /path/to/nix/out"
parser = argparse.ArgumentParser(description=desc, epilog=epil)

helps = "Path to nix artifact, e.g.: derivation file or nix output path"
parser.add_argument("NIX_PATH", help=helps, type=pathlib.Path)
helps = (
"Path to json file that details meta information. "
"Generate this file with: `nix-env -qa --meta --json '.*' >meta.json` "
"then give the path to generated json file to this script via the "
"--meta argument to include the license and maintainer information "
"to the output of this script (default: None)"
)
parser.add_argument("--meta", nargs="?", help=helps, default=None)
helps = "Scan buildtime dependencies instead of runtime dependencies"
parser.add_argument("--buildtime", help=helps, action="store_true")
helps = (
Expand Down Expand Up @@ -76,12 +67,7 @@ def main():
target_path = args.NIX_PATH.resolve().as_posix()
runtime = args.buildtime is False
exit_unless_nix_artifact(target_path, force_realise=runtime)
if not args.meta:
LOG.warning(
"Command line argument '--meta' missing: SBOM will not include "
"license information (see '--help' for more details)"
)
sbomdb = SbomDb(target_path, args.buildtime, args.meta, args.depth)
sbomdb = SbomDb(target_path, args.buildtime, args.depth)
if args.cdx:
sbomdb.to_cdx(args.cdx)
if args.spdx:
Expand Down
43 changes: 43 additions & 0 deletions src/sbomnix/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: 2022-2023 Technology Innovation Institute (TII)
#
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=too-few-public-methods

"""Cache nixpkgs meta information"""

from dfdiskcache import DataFrameDiskCache

from common.utils import (
LOG,
df_from_csv_file,
)

###############################################################################

_NIXMETA_CSV_URL = "https://github.com/henrirosten/nixmeta/raw/main/data/nixmeta.csv"
_NIXMETA_CSV_URL_TTL = 60 * 60 * 24

###############################################################################


class NixMeta:
"""Cache nixpkgs meta information"""

def __init__(self):
LOG.debug("")
self.cache = DataFrameDiskCache()
self.df_nixmeta = self.cache.get(_NIXMETA_CSV_URL)
if self.df_nixmeta is None:
LOG.debug("nixmeta cache miss, downloading: %s", _NIXMETA_CSV_URL)
self.df_nixmeta = df_from_csv_file(_NIXMETA_CSV_URL)
self.cache.set(_NIXMETA_CSV_URL, self.df_nixmeta, ttl=_NIXMETA_CSV_URL_TTL)
else:
LOG.debug("read nixmeta from cache")

def get_df(self):
"""Return nix meta information as pandas dataframe"""
return self.df_nixmeta


###############################################################################
65 changes: 10 additions & 55 deletions src/sbomnix/sbomdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from reuse._licenses import LICENSE_MAP as SPDX_LICENSES
from nixgraph.graph import NixDependencies
from sbomnix.nix import Store, find_deriver
from sbomnix.meta import NixMeta
from common.utils import LOG, df_to_csv_file, get_py_pkg_version

###############################################################################
Expand All @@ -27,13 +28,12 @@
class SbomDb:
"""Generates SBOMs in various formats"""

def __init__(self, nix_path, buildtime=False, meta_path=None, depth=None):
def __init__(self, nix_path, buildtime=False, depth=None):
# self.uid specifies the attribute that SbomDb uses as unique
# identifier for the sbom components. See the column names in
# self.df_sbomdb (sbom.csv) for a list of all components' attributes.
self.uid = "store_path"
self.buildtime = buildtime
self.meta_path = meta_path
self.target_deriver = find_deriver(nix_path)
self.df_deps = None
self.depth = depth
Expand Down Expand Up @@ -84,18 +84,22 @@ def _init_sbomdb(self):
store.add_path(path)
self.df_sbomdb = store.to_dataframe()
# Join with meta information
self._sbomdb_join_meta(self.meta_path)
self._sbomdb_join_meta()
# Clean, drop duplicates, sort
self.df_sbomdb.replace(np.nan, "", regex=True, inplace=True)
self.df_sbomdb.drop_duplicates(subset=[self.uid], keep="first", inplace=True)
self.df_sbomdb.sort_values(by=["name", self.uid], inplace=True)
self.df_sbomdb_outputs_exploded = self.df_sbomdb.explode("outputs")

def _sbomdb_join_meta(self, meta_path):
def _sbomdb_join_meta(self):
"""Join self.df_sbomdb with meta information"""
if meta_path is None:
df_meta = NixMeta().get_df()
if df_meta is None or df_meta.empty:
LOG.warning(
"Failed reading nix meta information: "
"SBOM will include only minimum set of attributes"
)
return
df_meta = _parse_json_metadata(meta_path)
if LOG.level <= logging.DEBUG:
df_to_csv_file(df_meta, "meta.csv")
# Join based on package name including the version number
Expand Down Expand Up @@ -406,53 +410,4 @@ def _drv_to_cdx_dependency(drv, deps_list, uid="store_path"):
return dependency


###############################################################################

# Nix package metadata


def _parse_meta_entry(meta, key):
"""Parse the given key from the metadata entry"""
items = []
if isinstance(meta, dict):
items.extend([_parse_meta_entry(meta.get(key, ""), key)])
elif isinstance(meta, list):
items.extend([_parse_meta_entry(x, key) for x in meta])
else:
return str(meta)
return ";".join(list(filter(None, items)))


def _parse_json_metadata(json_filename):
"""Parse package metadata from the specified json file"""
with open(json_filename, "r", encoding="utf-8") as inf:
LOG.info('Loading meta info from "%s"', json_filename)
json_dict = json.loads(inf.read())
dict_selected = {}
setcol = dict_selected.setdefault
for nixpkg_name, pkg in json_dict.items():
# generic package info
setcol("nixpkgs", []).append(nixpkg_name)
setcol("name", []).append(pkg.get("name", ""))
setcol("pname", []).append(pkg.get("pname", ""))
setcol("version", []).append(pkg.get("version", ""))
# meta
meta = pkg.get("meta", {})
setcol("meta_homepage", []).append(meta.get("homepage", ""))
setcol("meta_position", []).append(meta.get("position", ""))
setcol("meta_unfree", []).append(meta.get("unfree", ""))
setcol("meta_description", []).append(meta.get("description", ""))
# meta.license
meta_license = meta.get("license", {})
license_short = _parse_meta_entry(meta_license, key="shortName")
setcol("meta_license_short", []).append(license_short)
license_spdx = _parse_meta_entry(meta_license, key="spdxId")
setcol("meta_license_spdxid", []).append(license_spdx)
# meta.maintainers
meta_maintainers = meta.get("maintainers", {})
emails = _parse_meta_entry(meta_maintainers, key="email")
setcol("meta_maintainers_email", []).append(emails)
return pd.DataFrame(dict_selected)


################################################################################
2 changes: 1 addition & 1 deletion src/vulnxscan/vulnxscan_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def _is_patched(row):

def _generate_sbom(target_path, buildtime=False):
LOG.info("Generating SBOM for target '%s'", target_path)
sbomdb = SbomDb(target_path, buildtime, meta_path=None)
sbomdb = SbomDb(target_path, buildtime)
prefix = "vulnxscan_"
cdx_suffix = ".json"
csv_suffix = ".csv"
Expand Down
20 changes: 19 additions & 1 deletion tests/test_sbomnix.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=invalid-name, global-statement, redefined-outer-name
# pylint: disable=too-few-public-methods

""" Tests for sbomnix """

Expand All @@ -15,6 +16,9 @@
import pandas as pd
import jsonschema
import pytest
import referencing
import referencing.retrieval
import requests

from common.utils import (
df_from_csv_file,
Expand Down Expand Up @@ -653,14 +657,28 @@ def test_whitelist():
################################################################################


class JSONSchemaRetrieve:
"""Cached retriever that can be used with jsonschema.validate"""

@staticmethod
@referencing.retrieval.to_cached_resource()
def _retrieve_via_requests(uri):
print(f"retrieving schema: {uri}")
return requests.get(uri, timeout=10).text

def __call__(self, uri):
return self._retrieve_via_requests(uri)


def validate_json(file_path, schema_path):
"""Validate json file matches schema"""
with open(file_path, encoding="utf-8") as json_file, open(
schema_path, encoding="utf-8"
) as schema_file:
json_obj = json.load(json_file)
schema_obj = json.load(schema_file)
jsonschema.validate(json_obj, schema_obj)
reg = referencing.Registry(retrieve=JSONSchemaRetrieve())
jsonschema.validate(json_obj, schema_obj, registry=reg)


def df_to_string(df):
Expand Down

0 comments on commit 8dd6755

Please sign in to comment.