From 27d4a847ff6a9ae73a15e8f3475fede3efb16b48 Mon Sep 17 00:00:00 2001 From: Fabrice Fontaine Date: Fri, 6 Jan 2023 17:18:57 +0100 Subject: [PATCH] feat: add hyperscan support hyperscan will run simultaneously all version checkers on a file which reduce processing time. hyperscan depends on python >= 3.8 however python 3.7 will not have any security support after 27 Jun 2023: https://endoflife.date/python pyperscan package is used instead of the most well-known hyperscan package as pyperscan allows to add a tag for each pattern. This feature will allow to retrieve easily the checker associated to the matched pattern. Fix #2485 Signed-off-by: Fabrice Fontaine --- cve_bin_tool/checkers/__init__.py | 36 +++++++++++-------- cve_bin_tool/version_scanner.py | 59 ++++++++++++++++++++++++++++--- requirements.csv | 2 ++ requirements.txt | 2 ++ test/test_checkers.py | 6 ++-- 5 files changed, 83 insertions(+), 22 deletions(-) diff --git a/cve_bin_tool/checkers/__init__.py b/cve_bin_tool/checkers/__init__.py index 4bb2b1bfb0..0e536d99d0 100644 --- a/cve_bin_tool/checkers/__init__.py +++ b/cve_bin_tool/checkers/__init__.py @@ -353,10 +353,11 @@ def __new__(cls, name, bases, props): else: cls.IGNORE_PATTERNS = list(map(re.compile, cls.IGNORE_PATTERNS)) # Compile regex - cls.CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) - cls.VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) - cls.FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) - cls.CONTAINS_PATTERNS.extend(cls.VERSION_PATTERNS) + cls.REGEX_CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) + cls.REGEX_VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) + cls.REGEX_FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) + cls.REGEX_CONTAINS_PATTERNS.extend(cls.REGEX_VERSION_PATTERNS) + cls.version_info = dict() # Return the new checker class return cls @@ -365,26 +366,31 @@ class Checker(metaclass=CheckerMetaClass): CONTAINS_PATTERNS: list[str] = [] VERSION_PATTERNS: list[str] = [] FILENAME_PATTERNS: list[str] = [] + REGEX_CONTAINS_PATTERNS: list[str] = [] + REGEX_VERSION_PATTERNS: list[str] = [] + REGEX_FILENAME_PATTERNS: list[str] = [] VENDOR_PRODUCT: list[tuple[str, str]] = [] IGNORE_PATTERNS: list[str] = [] def guess_contains(self, lines): - if any(pattern.search(lines) for pattern in self.CONTAINS_PATTERNS): + if any(pattern.search(lines) for pattern in self.REGEX_CONTAINS_PATTERNS): return True return False def get_version(self, lines, filename): - version_info = dict() + if any(pattern.match(filename) for pattern in self.REGEX_FILENAME_PATTERNS): + self.version_info["is_or_contains"] = "is" - if any(pattern.match(filename) for pattern in self.FILENAME_PATTERNS): - version_info["is_or_contains"] = "is" + if "is_or_contains" not in self.version_info and self.guess_contains(lines): + self.version_info["is_or_contains"] = "contains" - if "is_or_contains" not in version_info and self.guess_contains(lines): - version_info["is_or_contains"] = "contains" - - if "is_or_contains" in version_info: - version_info["version"] = regex_find( - lines, self.VERSION_PATTERNS, self.IGNORE_PATTERNS + if "is_or_contains" in self.version_info: + version = regex_find( + lines, self.REGEX_VERSION_PATTERNS, self.IGNORE_PATTERNS ) - return version_info + # Don't override a "correct" version with UNKNOWN + if "version" not in self.version_info or version != "UNKNOWN": + self.version_info["version"] = version + + return self.version_info diff --git a/cve_bin_tool/version_scanner.py b/cve_bin_tool/version_scanner.py index 58bfd4891f..860737cad7 100644 --- a/cve_bin_tool/version_scanner.py +++ b/cve_bin_tool/version_scanner.py @@ -8,6 +8,9 @@ from pathlib import Path, PurePath from typing import Iterator +import attr +from pyperscan import Pattern, Scan, StreamDatabase + from cve_bin_tool.checkers import Checker from cve_bin_tool.cvedb import CVEDB from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg @@ -29,6 +32,14 @@ import importlib_resources as resources +@attr.define +class HyperscanMatchContext: + version_scanner: VersionScanner + filename: str + lines: str + task_result: dict + + class InvalidFileError(Exception): """Filepath is invalid for scanning.""" @@ -47,6 +58,7 @@ def __init__( error_mode: ErrorMode = ErrorMode.TruncTrace, score: int = 0, validate: bool = True, + hyperscan_db: StreamDatabase = None, ): self.logger = logger or LOGGER.getChild(self.__class__.__name__) # Update egg if installed in development mode @@ -231,11 +243,50 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]: yield from self.run_checkers(filename, lines) - def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: - # tko - for dummy_checker_name, checker in self.checkers.items(): + def build_hyperscan_database(self, checkers: Checker) -> StreamDatabase: + patterns = [] + for (dummy_checker_name, checker) in self.checkers.items(): checker = checker() - result = checker.get_version(lines, filename) + checker.dummy_checker_name = dummy_checker_name + for pattern in checker.VERSION_PATTERNS + checker.CONTAINS_PATTERNS: + patterns.append(Pattern(pattern.encode(), tag=checker)) + + if patterns: + return StreamDatabase(*patterns) + else: + return None + + @staticmethod + def hyperscan_match( + context: HyperscanMatchContext, checker: Checker, offset: int, end: int + ) -> Scan: + # Confirm hyperscan match with get_version as hyperscan doesn't support + # group capture. SOM_LEFTMOST is not enabled (offset is always 0) + result = checker.get_version(context.lines[offset:end], context.filename) + + context.task_result[checker] = result + + return Scan.Continue + + def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: + # Build hyperscan_db with checker's patterns + self.hyperscan_db = self.build_hyperscan_database(self.checkers) + + task_result = dict() + hyperscan_context = HyperscanMatchContext( + version_scanner=self, + filename=filename, + lines=lines, + task_result=task_result, + ) + + if self.hyperscan_db is not None: + scanner = self.hyperscan_db.build(hyperscan_context, self.hyperscan_match) + scanner.scan(lines.encode()) + + for checker in task_result: + result = task_result[checker] + dummy_checker_name = checker.dummy_checker_name # do some magic so we can iterate over all results, even the ones that just return 1 hit if "is_or_contains" in result: results = [dict()] diff --git a/requirements.csv b/requirements.csv index b4ba3b8da2..f5d0cdaf15 100644 --- a/requirements.csv +++ b/requirements.csv @@ -21,3 +21,5 @@ python_not_in_db,packaging python_not_in_db,importlib_resources vsajip_not_in_db,python-gnupg anthonyharrison_not_in_db,lib4sbom +intel,hyperscan +vlaci_not_in_db,pyperscan diff --git a/requirements.txt b/requirements.txt index 45c67dc97e..afb6e42a84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ cvss defusedxml distro gsutil +hyperscan importlib_metadata>=3.6; python_version < "3.10" importlib_resources; python_version < "3.9" jinja2>=2.11.3 @@ -12,6 +13,7 @@ lib4sbom>=0.3.0 python-gnupg packaging<22.0 plotly +pyperscan @ https://github.com/vlaci/pyperscan/archive/master.zip#egg=pyperscan-0.2.3', pyyaml>=5.4 requests rich diff --git a/test/test_checkers.py b/test/test_checkers.py index 5b31b3127f..6d39e38a9a 100644 --- a/test/test_checkers.py +++ b/test/test_checkers.py @@ -29,9 +29,9 @@ class MyChecker(Checker): VENDOR_PRODUCT = [("myvendor", "myproduct")] IGNORE_PATTERNS = [r"ignore"] - assert type(MyChecker.CONTAINS_PATTERNS[0]) == Pattern - assert type(MyChecker.VERSION_PATTERNS[0]) == Pattern - assert type(MyChecker.FILENAME_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_CONTAINS_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_VERSION_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_FILENAME_PATTERNS[0]) == Pattern assert type(MyChecker.VENDOR_PRODUCT[0]) == VendorProductPair assert type(MyChecker.IGNORE_PATTERNS[0]) == Pattern