From 00a6ded45abb411b4a40baafa30ee560b4b3102b Mon Sep 17 00:00:00 2001 From: Fabrice Fontaine Date: Fri, 6 Jan 2023 17:18:57 +0100 Subject: [PATCH] feat: add hyperscan support hyperscan will run simultaneously all version checkers on a file which reduce processing time. hyperscan depends on python >= 3.8 however python 3.7 will not have any security support after 27 Jun 2023: https://endoflife.date/python pyperscan package is used instead of the most well-known hyperscan package as pyperscan allows to add a tag for each pattern. This feature will allow to retrieve easily the checker associated to the matched pattern. Fix #2485 Signed-off-by: Fabrice Fontaine --- cve_bin_tool/checkers/__init__.py | 37 +++++++++++------- cve_bin_tool/checkers/python.py | 4 +- cve_bin_tool/checkers/sqlite.py | 4 +- cve_bin_tool/version_scanner.py | 64 +++++++++++++++++++++++++++++-- requirements.csv | 3 +- requirements.txt | 1 + test/test_checkers.py | 6 +-- 7 files changed, 94 insertions(+), 25 deletions(-) diff --git a/cve_bin_tool/checkers/__init__.py b/cve_bin_tool/checkers/__init__.py index 84a8afe24a..0e8c85aba8 100644 --- a/cve_bin_tool/checkers/__init__.py +++ b/cve_bin_tool/checkers/__init__.py @@ -320,10 +320,11 @@ def __new__(cls, name, bases, props): f"Checker {name} has a VENDOR_PRODUCT string that is not lowercase" ) # Compile regex - cls.CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) - cls.VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) - cls.FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) - cls.CONTAINS_PATTERNS.extend(cls.VERSION_PATTERNS) + cls.REGEX_CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) + cls.REGEX_VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) + cls.REGEX_FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) + cls.REGEX_CONTAINS_PATTERNS.extend(cls.REGEX_VERSION_PATTERNS) + cls.version_info = dict() # Return the new checker class return cls @@ -332,23 +333,31 @@ class Checker(metaclass=CheckerMetaClass): CONTAINS_PATTERNS: list[str] = [] VERSION_PATTERNS: list[str] = [] FILENAME_PATTERNS: list[str] = [] + REGEX_CONTAINS_PATTERNS: list[str] = [] + REGEX_VERSION_PATTERNS: list[str] = [] + REGEX_FILENAME_PATTERNS: list[str] = [] VENDOR_PRODUCT: list[tuple[str, str]] = [] def guess_contains(self, lines): - if any(pattern.search(lines) for pattern in self.CONTAINS_PATTERNS): + if any(pattern.search(lines) for pattern in self.REGEX_CONTAINS_PATTERNS): return True return False - def get_version(self, lines, filename): - version_info = dict() + def get_version(self, lines, filename, version_lines=None): + if version_lines == None: + version_lines = lines - if any(pattern.match(filename) for pattern in self.FILENAME_PATTERNS): - version_info["is_or_contains"] = "is" + if any(pattern.match(filename) for pattern in self.REGEX_FILENAME_PATTERNS): + self.version_info["is_or_contains"] = "is" - if "is_or_contains" not in version_info and self.guess_contains(lines): - version_info["is_or_contains"] = "contains" + if "is_or_contains" not in self.version_info and self.guess_contains(lines): + self.version_info["is_or_contains"] = "contains" - if "is_or_contains" in version_info: - version_info["version"] = regex_find(lines, self.VERSION_PATTERNS) + if "is_or_contains" in self.version_info: + version = regex_find(version_lines, self.REGEX_VERSION_PATTERNS) - return version_info + # Don't override a "correct" version with UNKNOWN + if "version" not in self.version_info or version != "UNKNOWN": + self.version_info["version"] = version + + return self.version_info diff --git a/cve_bin_tool/checkers/python.py b/cve_bin_tool/checkers/python.py index 04851cd022..4d34a59765 100644 --- a/cve_bin_tool/checkers/python.py +++ b/cve_bin_tool/checkers/python.py @@ -24,12 +24,12 @@ class PythonChecker(Checker): VERSION_PATTERNS = [r"python([23]+\.[0-9])"] VENDOR_PRODUCT = [("python_software_foundation", "python"), ("python", "python")] - def get_version(self, lines, filename): + def get_version(self, lines, filename, version_lines=None): # we will try to find python3+ as well as python2+ # currently regex will probably find a single string "lib/python3.6" # where 3.6 is the version similarly "lib/python2.7" where 2.7 is the version - version_info = super().get_version(lines, filename) + version_info = super().get_version(lines, filename, version_lines) # we will check if the guess returned some version probably 3.6 or 2.7 in our example # return version_info diff --git a/cve_bin_tool/checkers/sqlite.py b/cve_bin_tool/checkers/sqlite.py index c08aa05012..626572fa97 100644 --- a/cve_bin_tool/checkers/sqlite.py +++ b/cve_bin_tool/checkers/sqlite.py @@ -85,14 +85,14 @@ def guess_contains(self, lines): # If that fails, find a signature that might indicate presence of sqlite return super().guess_contains(lines) - def get_version(self, lines, filename): + def get_version(self, lines, filename, version_lines=None): """returns version information for sqlite as found in a given file. The most correct way to do this is to search for the sha1 sums per release. Fedora rpms have a simpler SQLite version string. """ - version_info = super().get_version(lines, filename) + version_info = super().get_version(lines, filename, version_lines) for mapping in self.VERSION_MAP: # Truncate last four characters as "If the source code has been edited diff --git a/cve_bin_tool/version_scanner.py b/cve_bin_tool/version_scanner.py index 723dfd54a7..8a56b5e0bd 100644 --- a/cve_bin_tool/version_scanner.py +++ b/cve_bin_tool/version_scanner.py @@ -8,6 +8,9 @@ from pathlib import Path, PurePath from typing import Iterator +import attr +from pyperscan import Flag, Pattern, Scan, StreamDatabase + from cve_bin_tool.checkers import Checker from cve_bin_tool.cvedb import CVEDB from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg @@ -29,6 +32,14 @@ import importlib_resources as resources +@attr.define +class HyperscanMatchContext: + version_scanner: VersionScanner + filename: str + lines: str + task_result: dict + + class InvalidFileError(Exception): """Filepath is invalid for scanning.""" @@ -47,6 +58,7 @@ def __init__( error_mode: ErrorMode = ErrorMode.TruncTrace, score: int = 0, validate: bool = True, + hyperscan_db: StreamDatabase = None, ): self.logger = logger or LOGGER.getChild(self.__class__.__name__) # Update egg if installed in development mode @@ -74,6 +86,9 @@ def __init__( # self.logger.info("Checkers loaded: %s" % (", ".join(self.checkers.keys()))) self.language_checkers = self.available_language_checkers() + # Build hyperscan_db with all checker's patterns + self.hyperscan_db = self.build_hyperscan_database(self.checkers) + @classmethod def load_checkers(cls) -> dict[str, type[Checker]]: """Loads CVE checkers""" @@ -104,6 +119,9 @@ def remove_skiplist(self, skips: list[str]) -> None: else: self.logger.error(f"Checker {skipme} is not a valid checker name") + # Rebuild hyperscan_db with checker's patterns + self.hyperscan_db = self.build_hyperscan_database(self.checkers) + def print_checkers(self) -> None: self.logger.info(f'Checkers: {", ".join(self.checkers.keys())}') @@ -204,11 +222,51 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]: yield from self.run_checkers(filename, lines) - def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: - # tko + def build_hyperscan_database(self, checkers: Checker) -> StreamDatabase: + patterns = [] for (dummy_checker_name, checker) in self.checkers.items(): checker = checker() - result = checker.get_version(lines, filename) + checker.dummy_checker_name = dummy_checker_name + for pattern in checker.VERSION_PATTERNS + checker.CONTAINS_PATTERNS: + patterns.append( + Pattern(pattern.encode(), Flag.SOM_LEFTMOST, tag=checker) + ) + + if patterns: + return StreamDatabase(*patterns) + else: + return None + + @staticmethod + def hyperscan_match( + context: HyperscanMatchContext, checker: Checker, offset: int, end: int + ) -> Scan: + # hyperscan doesn't support group capture so use standard regex + # (i.e. get_version) + result = checker.get_version( + context.lines, context.filename, context.lines[offset:end] + ) + + context.task_result[checker] = result + + return Scan.Continue + + def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: + task_result = dict() + hyperscan_context = HyperscanMatchContext( + version_scanner=self, + filename=filename, + lines=lines, + task_result=task_result, + ) + + if self.hyperscan_db is not None: + scanner = self.hyperscan_db.build(hyperscan_context, self.hyperscan_match) + scanner.scan(lines.encode()) + + for checker in task_result: + result = task_result[checker] + dummy_checker_name = checker.dummy_checker_name # do some magic so we can iterate over all results, even the ones that just return 1 hit if "is_or_contains" in result: results = [dict()] diff --git a/requirements.csv b/requirements.csv index 4a8816c667..fa117a830d 100644 --- a/requirements.csv +++ b/requirements.csv @@ -18,4 +18,5 @@ python,urllib3 google,gsutil skontar,cvss python_not_in_db,packaging -python_not_in_db,importlib_resources \ No newline at end of file +python_not_in_db,importlib_resources +vlaci_not_in_db,pyperscan diff --git a/requirements.txt b/requirements.txt index d23fbc3c12..32f72a0518 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ gsutil cvss packaging<22.0 importlib_resources; python_version < "3.9" +pyperscan diff --git a/test/test_checkers.py b/test/test_checkers.py index 8f55c70c6e..883000712c 100644 --- a/test/test_checkers.py +++ b/test/test_checkers.py @@ -26,9 +26,9 @@ class MyChecker(Checker): FILENAME_PATTERNS = [r"myproduct"] VENDOR_PRODUCT = [("myvendor", "myproduct")] - assert type(MyChecker.CONTAINS_PATTERNS[0]) == Pattern - assert type(MyChecker.VERSION_PATTERNS[0]) == Pattern - assert type(MyChecker.FILENAME_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_CONTAINS_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_VERSION_PATTERNS[0]) == Pattern + assert type(MyChecker.REGEX_FILENAME_PATTERNS[0]) == Pattern assert type(MyChecker.VENDOR_PRODUCT[0]) == VendorProductPair def test_no_vpkg(self):