Skip to content

Commit

Permalink
feat: add hyperscan support
Browse files Browse the repository at this point in the history
hyperscan will run simultaneously all version checkers on a file which
reduce processing time.

Signed-off-by: Fabrice Fontaine <[email protected]>
  • Loading branch information
ffontaine committed Feb 7, 2023
1 parent 02e4438 commit a921dfc
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 16 deletions.
24 changes: 16 additions & 8 deletions cve_bin_tool/checkers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ def __new__(cls, name, bases, props):
f"Checker {name} has a VENDOR_PRODUCT string that is not lowercase"
)
# Compile regex
cls.CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS))
cls.VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS))
cls.FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS))
cls.CONTAINS_PATTERNS.extend(cls.VERSION_PATTERNS)
cls.REGEX_CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS))
cls.REGEX_VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS))
cls.REGEX_FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS))
cls.REGEX_CONTAINS_PATTERNS.extend(cls.REGEX_VERSION_PATTERNS)
# Return the new checker class
return cls

Expand All @@ -329,23 +329,31 @@ class Checker(metaclass=CheckerMetaClass):
CONTAINS_PATTERNS: list[str] = []
VERSION_PATTERNS: list[str] = []
FILENAME_PATTERNS: list[str] = []
REGEX_CONTAINS_PATTERNS: list[str] = []
REGEX_VERSION_PATTERNS: list[str] = []
REGEX_FILENAME_PATTERNS: list[str] = []
VENDOR_PRODUCT: list[tuple[str, str]] = []

def guess_contains(self, lines):
if any(pattern.search(lines) for pattern in self.CONTAINS_PATTERNS):
if any(pattern.search(lines) for pattern in self.REGEX_CONTAINS_PATTERNS):
return True
return False

def get_version(self, lines, filename):
def get_version(self, lines, filename, version_lines=None):
version_info = dict()

if any(pattern.match(filename) for pattern in self.FILENAME_PATTERNS):
if not version_lines:
version_lines = lines

if any(pattern.match(filename) for pattern in self.REGEX_FILENAME_PATTERNS):
version_info["is_or_contains"] = "is"

if "is_or_contains" not in version_info and self.guess_contains(lines):
version_info["is_or_contains"] = "contains"

if "is_or_contains" in version_info:
version_info["version"] = regex_find(lines, self.VERSION_PATTERNS)
version_info["version"] = regex_find(
version_lines, self.REGEX_VERSION_PATTERNS
)

return version_info
4 changes: 2 additions & 2 deletions cve_bin_tool/checkers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ class PythonChecker(Checker):
VERSION_PATTERNS = [r"python([23]+\.[0-9])"]
VENDOR_PRODUCT = [("python_software_foundation", "python"), ("python", "python")]

def get_version(self, lines, filename):
def get_version(self, lines, filename, version_lines=None):
# we will try to find python3+ as well as python2+

# currently regex will probably find a single string "lib/python3.6"
# where 3.6 is the version similarly "lib/python2.7" where 2.7 is the version
version_info = super().get_version(lines, filename)
version_info = super().get_version(lines, filename, version_lines)

# we will check if the guess returned some version probably 3.6 or 2.7 in our example
# return version_info
Expand Down
4 changes: 2 additions & 2 deletions cve_bin_tool/checkers/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ def guess_contains(self, lines):
# If that fails, find a signature that might indicate presence of sqlite
return super().guess_contains(lines)

def get_version(self, lines, filename):
def get_version(self, lines, filename, version_lines=None):
"""returns version information for sqlite as found in a given file.
The most correct way to do this is to search for the sha1 sums per release.
Fedora rpms have a simpler SQLite version string.
"""

version_info = super().get_version(lines, filename)
version_info = super().get_version(lines, filename, version_lines)

for mapping in self.VERSION_MAP:
# Truncate last four characters as "If the source code has been edited
Expand Down
62 changes: 59 additions & 3 deletions cve_bin_tool/version_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from pathlib import Path, PurePath
from typing import Iterator

import attr
from pyperscan import Flag, Pattern, Scan, StreamDatabase

from cve_bin_tool.checkers import Checker
from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg
Expand All @@ -29,6 +32,14 @@
import importlib_resources as resources


@attr.define
class HyperscanMatchContext:
version_scanner: VersionScanner
filename: str
lines: str
task_result: dict


class InvalidFileError(Exception):
"""Filepath is invalid for scanning."""

Expand All @@ -47,6 +58,7 @@ def __init__(
error_mode: ErrorMode = ErrorMode.TruncTrace,
score: int = 0,
validate: bool = True,
hyperscan_db=None,
):
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
# Update egg if installed in development mode
Expand Down Expand Up @@ -204,11 +216,52 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]:

yield from self.run_checkers(filename, lines)

def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]:
# tko
def build_hyperscan_database(self, checkers: Checker) -> StreamDatabase:
patterns = []
for (dummy_checker_name, checker) in self.checkers.items():
checker = checker()
result = checker.get_version(lines, filename)
for pattern in checker.VERSION_PATTERNS:
patterns.append(
Pattern(
pattern.encode(), Flag.SOM_LEFTMOST, Flag.DOTALL, tag=checker
)
)

if patterns:
return StreamDatabase(*patterns)
else:
return None

@staticmethod
def hyperscan_match(
context: HyperscanMatchContext, checker: Checker, offset: int, end: int
) -> Scan:
# hyperscan doesn't support group capture so use standard regex
# (i.e. get_version)
result = checker.get_version(
context.lines, context.filename, context.lines[offset:end]
)

context.task_result[checker] = result

return Scan.Continue

def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]:
task_result = dict()
hyperscan_context = HyperscanMatchContext(
version_scanner=self,
filename=filename,
lines=lines,
task_result=task_result,
)

if self.hyperscan_db is not None:
scanner = self.hyperscan_db.build(hyperscan_context, self.hyperscan_match)
scanner.scan(lines.encode())

for checker in task_result:
result = task_result[checker]
dummy_checker_name = checker.VENDOR_PRODUCT[0].product
# do some magic so we can iterate over all results, even the ones that just return 1 hit
if "is_or_contains" in result:
results = [dict()]
Expand Down Expand Up @@ -274,6 +327,9 @@ def scan_and_or_extract_file(
self.file_stack.pop()

def recursive_scan(self, scan_path: str) -> Iterator[ScanInfo]:
# Build hyperscan_db with all checker's patterns once and for all
self.hyperscan_db = self.build_hyperscan_database(self.checkers)

with Extractor(logger=self.logger, error_mode=self.error_mode) as ectx:
if Path(scan_path).is_dir():

Expand Down
3 changes: 2 additions & 1 deletion requirements.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ python,urllib3
google,gsutil
skontar,cvss
python_not_in_db,packaging
python_not_in_db,importlib_resources
python_not_in_db,importlib_resources
vlaci_not_in_db,pyperscan
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ gsutil
cvss
packaging<22.0
importlib_resources; python_version < "3.9"
pyperscan

0 comments on commit a921dfc

Please sign in to comment.