Skip to content

Commit

Permalink
Use nixmeta cache in sbomnix
Browse files Browse the repository at this point in the history
Signed-off-by: Henri Rosten <[email protected]>
  • Loading branch information
henrirosten committed Dec 19, 2023
1 parent 408704a commit fc7d952
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 125 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Table of Contents
* [Generate SBOM Including Meta Information](#generate-sbom-including-meta-information)
* [Generate SBOM Including Buildtime Dependencies](#generate-sbom-including-buildtime-dependencies)
* [Generate SBOM Based on Result Symlink](#generate-sbom-based-on-result-symlink)
* [Generate SBOM Based on Flake Reference](#generate-sbom-based-on-flake-reference)
* [Visualize Package Dependencies](#visualize-package-dependencies)
* [Contribute](#contribute)
* [License](#license)
Expand Down Expand Up @@ -143,28 +144,26 @@ INFO Wrote: sbom.csv
```
Main outputs are the SBOM json files sbom.cdx.json and sbom.spdx.json in [CycloneDX](https://cyclonedx.org/) and [SPDX](https://spdx.github.io/spdx-spec/v2.3/) formats.

#### Generate SBOM Including Meta Information
To include license information to the SBOM, first generate package meta information with `nix-env`:
```bash
$ nix-env -qa --meta --json '.*' >meta.json
```
Then, run `sbomnix` with `--meta` argument to tell sbomnix to read meta information from the given json file:
```bash
$ sbomnix /nix/store/8nbv1drmvh588pwiwsxa47iprzlgwx6j-wget-1.21.3 --meta meta.json
```

#### Generate SBOM Including Buildtime Dependencies
By default `sbomnix` scans the given target for runtime dependencies. You can tell sbomnix to determine the buildtime dependencies using the `--buildtime` argument.
Below example generates SBOM including buildtime dependencies.
Notice: as opposed to runtime dependencies, determining the buildtime dependencies does not require building the target.
```bash
$ sbomnix /nix/store/8nbv1drmvh588pwiwsxa47iprzlgwx6j-wget-1.21.3 --meta meta.json --buildtime
$ sbomnix /nix/store/8nbv1drmvh588pwiwsxa47iprzlgwx6j-wget-1.21.3 --buildtime
```

#### Generate SBOM Based on Result Symlink
`sbomnix` can be used with output paths too (e.g. anything which produces a result symlink):
```bash
$ sbomnix /path/to/result
```

#### Generate SBOM Based on Flake Reference
`sbomnix` also supports scanning [flake references](https://nixos.org/manual/nix/stable/command-ref/new-cli/nix3-flake.html#flake-references):
```bash
$ sbomnix github:NixOS/nixpkgs?ref=nixos-unstable#wget --buildtime
```

#### Visualize Package Dependencies
`sbomnix` finds the package dependencies using `nixgraph`.
Moreover, `nixgraph` can also be used as a stand-alone tool for visualizing package dependencies.
Expand Down
84 changes: 49 additions & 35 deletions src/nixmeta/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

""" Summarize nixpkgs meta-attributes """

import re
import pathlib
import json
from tempfile import NamedTemporaryFile
Expand All @@ -28,25 +29,12 @@ class NixMetaScanner:
def __init__(self):
self.df_meta = None

def scan(self, flakeref):
"""Scan nixpkgs meta-info using nixpkgs version pinned in flakeref"""
LOG.info("Finding meta-info for nixpkgs pinned in flake: %s", flakeref)
meta_json = _get_flake_metadata(flakeref)
if not _is_nixpkgs_metadata(meta_json):
# If flakeref is not nixpkgs flake, try finding the nixpkgs
# revision pinned by the given flakeref
LOG.debug("non-nixpkgs flakeref: %s", flakeref)
rev = _get_flake_nixpkgs_pin(meta_json)
if not rev:
LOG.warning("Failed reading nixpkgs pin: %s", flakeref)
return
nixpkgs_flakeref = f"github:NixOS/nixpkgs?ref={rev}"
LOG.log(LOG_SPAM, "using nixpkgs_flakeref: %s", nixpkgs_flakeref)
meta_json = _get_flake_metadata(nixpkgs_flakeref)
if not _is_nixpkgs_metadata(meta_json):
LOG.warning("Failed reading nixpkgs metadata: %s", flakeref)
return
nixpkgs_path = pathlib.Path(meta_json["path"]).absolute()
def scan(self, nixref):
"""
Scan nixpkgs meta-info using nixpkgs version pinned in nixref;
nixref can be a nix store path or flakeref.
"""
nixpkgs_path = nixref_to_nixpkgs_path(nixref)
if not nixpkgs_path.exists():
LOG.warning("Nixpkgs not in nix store: %s", nixpkgs_path.as_posix())
return
Expand All @@ -58,25 +46,17 @@ def to_csv(self, csv_path, append=False):
csv_path = pathlib.Path(csv_path)
if append and csv_path.exists():
df = df_from_csv_file(csv_path)
self.df_meta = pd.concat(
[self.df_meta.astype(str), df.astype(str)], ignore_index=True
)
self.df_meta = pd.concat([self.df_meta, df.astype(str)], ignore_index=True)
if self.df_meta is None or self.df_meta.empty:
LOG.info("Nothing to output")
return
self.df_meta.fillna("", inplace=True)
uids = [
"name",
"version",
"meta_license_short",
"meta_license_spdxid",
"meta_homepage",
]
self.df_meta.sort_values(by=uids, inplace=True)
self.df_meta.drop_duplicates(subset=uids, keep="last", inplace=True)
csv_path.parent.mkdir(parents=True, exist_ok=True)
df_to_csv_file(self.df_meta, csv_path.absolute().as_posix())

def to_df(self):
"""Return meta-info as dataframe"""
return self.df_meta

def _read_nixpkgs_meta(self, nixpkgs_path):
prefix = "nixmeta_"
suffix = ".json"
Expand All @@ -85,21 +65,55 @@ def _read_nixpkgs_meta(self, nixpkgs_path):
exec_cmd(cmd.split(), stdout=f)
LOG.debug("Generated meta.json: %s", f.name)
self.df_meta = _parse_json_metadata(f.name)
self.df_meta = self.df_meta.astype(str)
self.df_meta.fillna("", inplace=True)
uids = [
"name",
"version",
"meta_license_short",
"meta_license_spdxid",
"meta_homepage",
]
self.df_meta.sort_values(by=uids, inplace=True)
self.df_meta.drop_duplicates(subset=uids, keep="last", inplace=True)


###############################################################################


def nixref_to_nixpkgs_path(flakeref):
"""Return the store path of the nixpkgs pinned by flakeref"""
if not flakeref:
return None
LOG.debug("Finding meta-info for nixpkgs pinned in nixref: %s", flakeref)
meta_json = _get_flake_metadata(flakeref)
if not _is_nixpkgs_metadata(meta_json):
# If flakeref is not nixpkgs flake, try finding the nixpkgs
# revision pinned by the given flakeref
LOG.debug("non-nixpkgs flakeref: %s", flakeref)
rev = _get_flake_nixpkgs_pin(meta_json)
if not rev:
LOG.warning("Failed reading nixpkgs pin: %s", flakeref)
return None
nixpkgs_flakeref = f"github:NixOS/nixpkgs?ref={rev}"
LOG.log(LOG_SPAM, "using nixpkgs_flakeref: %s", nixpkgs_flakeref)
meta_json = _get_flake_metadata(nixpkgs_flakeref)
if not _is_nixpkgs_metadata(meta_json):
LOG.warning("Failed reading nixpkgs metadata: %s", flakeref)
return None
return pathlib.Path(meta_json["path"]).absolute()


def _get_flake_metadata(flakeref):
"""
Return json object detailing the output of nix flake metadata
for given flakeref
"""
# Strip possible nixpkgs= prefix to support cases where flakeref is
# given the NIX_PATH environment variable
prefix = "nixpkgs="
if flakeref.startswith(prefix):
flakeref = flakeref[len(prefix):] # fmt: skip
m_nixpkgs = re.match(r"nixpkgs=([^:\s]+)", flakeref)
if m_nixpkgs:
flakeref = m_nixpkgs.group(1)
# Read nix flake metadata as json
cmd = f"nix flake metadata {flakeref} --json"
ret = exec_cmd(cmd.split(), raise_on_error=False, return_error=True)
Expand Down
2 changes: 1 addition & 1 deletion src/nixupdate/nix_outdated.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def getargs():

def _generate_sbom(target_path, buildtime=False):
LOG.info("Generating SBOM for target '%s'", target_path)
sbomdb = SbomDb(target_path, buildtime, meta_path=None)
sbomdb = SbomDb(target_path, buildtime)
prefix = "nixdeps_"
suffix = ".cdx.json"
with NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as f:
Expand Down
60 changes: 39 additions & 21 deletions src/sbomnix/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
check_positive,
get_py_pkg_version,
exit_unless_nix_artifact,
exec_cmd,
)

###############################################################################
Expand All @@ -23,28 +24,20 @@
def getargs():
"""Parse command line arguments"""
desc = (
"This tool finds dependencies of the specified nix artifact "
"in NIX_PATH and "
"This tool finds dependencies of the specified nix store path "
"or flake reference in NIXREF and "
"writes SBOM file(s) as specified in output arguments."
)
epil = "Example: sbomnix /path/to/nix/out --meta /path/to/meta.json"
epil = "Example: sbomnix /nix/store/path/or/flakeref"
parser = argparse.ArgumentParser(description=desc, epilog=epil)

helps = "Path to nix artifact, e.g.: derivation file or nix output path"
parser.add_argument("NIX_PATH", help=helps, type=pathlib.Path)
helps = (
"Path to json file that details meta information. "
"Generate this file with: `nix-env -qa --meta --json '.*' >meta.json` "
"then give the path to generated json file to this script via the "
"--meta argument to include the license and maintainer information "
"to the output of this script (default: None)"
)
parser.add_argument("--meta", nargs="?", help=helps, default=None)
helps = "Nix store path (e.g. derivation file or nix output path) or flakeref"
parser.add_argument("NIXREF", help=helps, type=str)
helps = "Scan buildtime dependencies instead of runtime dependencies"
parser.add_argument("--buildtime", help=helps, action="store_true")
helps = (
"Set the depth of the included dependencies. As an example, --depth=1 "
"indicates the SBOM should include only the NIX_PATH direct dependencies. "
"indicates the SBOM should include only the NIXREF direct dependencies. "
"With --depth=2, the output SBOM includes the direct dependencies and the "
"first level of transitive dependencies. "
"By default, when --depth is not specified, the output SBOM includes "
Expand All @@ -69,19 +62,44 @@ def getargs():
################################################################################


def try_resolve_flakeref(flakeref, force_realise):
"""Resolve flakeref to out-path"""
LOG.debug("")
cmd = f"nix eval --raw {flakeref}"
ret = exec_cmd(cmd.split(), raise_on_error=False)
if not ret:
LOG.debug("not a flakeref: '%s'", flakeref)
return None
nixpath = ret.stdout
LOG.debug("nixpath=%s", nixpath)
if not force_realise:
return nixpath
cmd = f"nix build --no-link {flakeref}"
ret = exec_cmd(cmd.split(), raise_on_error=False, return_error=True)
if not ret:
LOG.fatal("Failed force_realising %s: %s", flakeref, ret.stderr)
return nixpath


def main():
"""main entry point"""
args = getargs()
set_log_verbosity(args.verbose)
target_path = args.NIX_PATH.resolve().as_posix()
runtime = args.buildtime is False
target_path = try_resolve_flakeref(args.NIXREF, force_realise=runtime)
flakeref = None
if target_path:
flakeref = args.NIXREF
LOG.debug("flakeref=%s maps to path=%s", flakeref, target_path)
else:
target_path = pathlib.Path(args.NIXREF).resolve().as_posix()
exit_unless_nix_artifact(target_path, force_realise=runtime)
if not args.meta:
LOG.warning(
"Command line argument '--meta' missing: SBOM will not include "
"license information (see '--help' for more details)"
)
sbomdb = SbomDb(target_path, args.buildtime, args.meta, args.depth)
sbomdb = SbomDb(
nix_path=target_path,
buildtime=args.buildtime,
depth=args.depth,
flakeref=flakeref,
)
if args.cdx:
sbomdb.to_cdx(args.cdx)
if args.spdx:
Expand Down
116 changes: 116 additions & 0 deletions src/sbomnix/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# SPDX-FileCopyrightText: 2022-2023 Technology Innovation Institute (TII)
#
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=too-few-public-methods, invalid-name

"""Cache nixpkgs meta information"""

import os
import re
import logging
import tempfile
from pathlib import Path

import pandas as pd
from dfdiskcache import DataFrameDiskCache
from nixmeta.scanner import NixMetaScanner, nixref_to_nixpkgs_path
from common.utils import LOG, df_from_csv_file, df_to_csv_file

###############################################################################

# DataFrameDiskCache cache path. Nix meta-info disk cache is placed here:
_CACHE_PATH = Path(tempfile.gettempdir()) / "sbomnix_df_cache"

_NIXMETA_CSV_URL = "https://github.com/henrirosten/nixmeta/raw/main/data/nixmeta.csv"
# Update local cached version of _NIXMETA_CSV_URL once a day or when local cache
# is cleaned:
_NIXMETA_CSV_URL_TTL = 60 * 60 * 24

# Update locally generated nixpkgs meta-info every 30 days or when local cache
# is cleaned.
_NIXMETA_NIXPKGS_TTL = 60 * 60 * 24 * 30

###############################################################################


class Meta:
"""Cache nixpkgs meta information"""

def __init__(self):
LOG.debug("using CACHE_PATH: %s", _CACHE_PATH)
self.cache = DataFrameDiskCache(cache_dir_path=_CACHE_PATH)
# df_nixmeta includes the meta-info from _NIXMETA_CSV_URL
self.df_nixmeta = self.cache.get(_NIXMETA_CSV_URL)
if self.df_nixmeta is not None and not self.df_nixmeta.empty:
LOG.debug("read nixmeta from cache")
else:
LOG.debug("nixmeta cache miss, downloading: %s", _NIXMETA_CSV_URL)
self.df_nixmeta = df_from_csv_file(_NIXMETA_CSV_URL)
if self.df_nixmeta is None or self.df_nixmeta.empty:
LOG.warning("Failed downloading nixmeta")
else:
# Nix meta dictionary stored at _NIXMETA_CSV_URL is
# regularly updated upstream, we want the local cache
# to be updated roughly on same schedule (once a day)
self.cache.set(
key=_NIXMETA_CSV_URL,
value=self.df_nixmeta,
ttl=_NIXMETA_CSV_URL_TTL,
)

def get_nixpkgs_meta(self, nixref=None):
"""
Return nixpkgs meta pinned in `nixref`. `nixref` can point to a
nix store path or flake reference. If nixref is None, attempt to
read the nixpkgs store path from NIX_PATH environment variable.
"""
nixpkgs_path = None
if nixref:
# Read meta from nixpkgs pinned by nixref
LOG.debug("Reading nixpkgs path from nixref: %s", nixref)
nixpkgs_path = nixref_to_nixpkgs_path(nixref).as_posix()
elif "NIX_PATH" in os.environ:
# Read meta from nipxkgs referenced in NIX_PATH
LOG.debug("Reading nixpkgs path from NIX_PATH environment")
nix_path = os.environ["NIX_PATH"]
m_nixpkgs = re.match(r"nixpkgs=([^:\s]+)", nix_path)
if m_nixpkgs:
nixpkgs_path = m_nixpkgs.group(1)
df = None
if nixpkgs_path:
LOG.debug("Scanning meta-info using nixpkgs path: %s", nixpkgs_path)
df = self._scan(nixpkgs_path)
# Supplement the nix meta info from self.df_nixmeta with the
# meta information extracted either from nixref or NIX_PATH
df_concat = pd.concat([df, self.df_nixmeta]).astype(str)
df_concat = df_concat.drop_duplicates().reset_index(drop=True)
if LOG.level <= logging.DEBUG:
if df is not None:
df_to_csv_file(df, "df_nixref.csv")
if self.df_nixmeta is not None:
df_to_csv_file(self.df_nixmeta, "df_nixmeta.csv")
if df_concat is not None:
df_to_csv_file(df_concat, "df_concat.csv")
return df_concat

def _scan(self, nixpkgs_path):
df = self.cache.get(nixpkgs_path)
if df is not None and not df.empty:
LOG.debug("found from cache: %s", nixpkgs_path)
return df
LOG.debug("cache miss, scanning: %s", nixpkgs_path)
scanner = NixMetaScanner()
scanner.scan(nixpkgs_path)
df = scanner.to_df()
if df is None or df.empty:
LOG.warning("Failed scanning nixmeta: %s", nixpkgs_path)
return None
# Cache requires some TTL, so we set it to some value here.
# Although, we could as well store it indefinitely as it should
# not change given the same key (nixpkgs store path).
self.cache.set(key=nixpkgs_path, value=df, ttl=_NIXMETA_NIXPKGS_TTL)
return df


###############################################################################
Loading

0 comments on commit fc7d952

Please sign in to comment.