diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index 77d0c3623b..af0ca6564a 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - tool: ['isort', 'black', 'pyupgrade', 'flake8'] + tool: ['isort', 'black', 'pyupgrade', 'flake8', 'format_checkers'] steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 @@ -26,6 +26,11 @@ jobs: python -m pip install --upgrade pip python -m pip install --upgrade pre-commit pre-commit install + - name: Install cve-bin-tool if needed + if: ${{ matrix.tool == 'format_checkers' }} + run: | + python -m pip install --upgrade setuptools wheel + python -m pip install . - name: Run ${{ matrix.tool }} run: | pre-commit run ${{ matrix.tool }} --all-files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7e7a15c8f7..7177549ecb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,10 +21,9 @@ repos: - id: flake8 - repo: local - hooks: + hooks: - id: format_checkers - language: python + language: system name: format_checkers entry: python cve_bin_tool/format_checkers.py files: "^cve_bin_tool/checkers/__init__.py" - types: [python] \ No newline at end of file diff --git a/README.md b/README.md index 8979618f36..1b23b5baf4 100644 --- a/README.md +++ b/README.md @@ -119,22 +119,23 @@ match certain vulnerable versions of the following libraries and tools: | | | | Available checkers | | | | -|--------------- |--------- |-------------- |--------------- |---------- |---------- |------------- | +|--------------- |------------- |--------- |---------- |------------- |------------ |--------------- | | accountsservice |avahi |bash |bind |binutils |bolt |bubblewrap | | busybox |bzip2 |cronie |cryptsetup |cups |curl |dbus | | dnsmasq |dovecot |dpkg |enscript |expat |ffmpeg |freeradius | | ftp |gcc |gimp |glibc |gnomeshell |gnupg |gnutls | -| gpgme |gstreamer |gupnp |haproxy |hostapd |hunspell |icecast | -| icu |irssi |kbd |kerberos |kexectools |libarchive |libbpg | -| libdb |libgcrypt |libical |libjpeg_turbo |liblas |libnss |libsndfile | -| libsoup |libssh2 |libtiff |libvirt |libxslt |lighttpd |logrotate | -| lua |mariadb |mdadm |memcached |mtr |mysql |nano | -| ncurses |nessus |netpbm |nginx |node |ntp |open_vm_tools | -| openafs |openjpeg |openldap |openssh |openssl |openswan |openvpn | -| p7zip |pcsc_lite |png |polarssl_fedora |poppler |postgresql |pspp | -| python |qt |radare2 |rsyslog |samba |sqlite |strongswan | -| subversion |sudo |syslogng |systemd |tcpdump |trousers |varnish | -| webkitgtk |wireshark |wpa_supplicant |xerces |xml2 |zlib |zsh | +| gpgme |gstreamer |gupnp |haproxy |hdf5 |hostapd |hunspell | +| icecast |icu |irssi |kbd |kerberos |kexectools |libarchive | +| libbpg |libdb |libgcrypt |libical |libjpeg_turbo |liblas |libnss | +| libsndfile |libsoup |libssh2 |libtiff |libvirt |libvncserver |libxslt | +| lighttpd |logrotate |lua |mariadb |mdadm |memcached |mtr | +| mysql |nano |ncurses |nessus |netpbm |nginx |node | +| ntp |open_vm_tools |openafs |openjpeg |openldap |openssh |openssl | +| openswan |openvpn |p7zip |pcsc_lite |pigz |png |polarssl_fedora | +| poppler |postgresql |pspp |python |qt |radare2 |rsyslog | +| samba |sane_backends |sqlite |strongswan |subversion |sudo |syslogng | +| systemd |tcpdump |trousers |varnish |webkitgtk |wireshark |wpa_supplicant | +| xerces |xml2 |zlib |zsh | | | | All the checkers can be found in the checkers directory, as can the diff --git a/cve_bin_tool/async_utils.py b/cve_bin_tool/async_utils.py index 699d78ec3d..7fd6c74169 100644 --- a/cve_bin_tool/async_utils.py +++ b/cve_bin_tool/async_utils.py @@ -23,7 +23,6 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE." -# pylint: disable=too-many-arguments """ Utility classes for the CVE Binary Tool """ diff --git a/cve_bin_tool/checkers/expat.py b/cve_bin_tool/checkers/expat.py index d0f2752a51..905f9099ca 100644 --- a/cve_bin_tool/checkers/expat.py +++ b/cve_bin_tool/checkers/expat.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=anomalous-backslash-in-string, invalid-name r""" CVE checker for libexpat diff --git a/cve_bin_tool/checkers/libgcrypt.py b/cve_bin_tool/checkers/libgcrypt.py index 67ece013be..67c7c1a268 100644 --- a/cve_bin_tool/checkers/libgcrypt.py +++ b/cve_bin_tool/checkers/libgcrypt.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ CVE checker for libgcrypt diff --git a/cve_bin_tool/checkers/libjpeg_turbo.py b/cve_bin_tool/checkers/libjpeg_turbo.py index 8dcf62e18f..ef102eb99d 100644 --- a/cve_bin_tool/checkers/libjpeg_turbo.py +++ b/cve_bin_tool/checkers/libjpeg_turbo.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ CVE checker for libjpg-turbo diff --git a/cve_bin_tool/checkers/liblas.py b/cve_bin_tool/checkers/liblas.py index 8e98e05efb..4081e64329 100644 --- a/cve_bin_tool/checkers/liblas.py +++ b/cve_bin_tool/checkers/liblas.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ CVE checker for liblas diff --git a/cve_bin_tool/checkers/systemd.py b/cve_bin_tool/checkers/systemd.py index 8fe98f7312..f562f66409 100644 --- a/cve_bin_tool/checkers/systemd.py +++ b/cve_bin_tool/checkers/systemd.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ CVE checker for systemd diff --git a/cve_bin_tool/checkers/xerces.py b/cve_bin_tool/checkers/xerces.py index a84af4de5b..6325c736f0 100644 --- a/cve_bin_tool/checkers/xerces.py +++ b/cve_bin_tool/checkers/xerces.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ CVE checker for libxerces diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index 24afa6e710..fe2281e83d 100755 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=invalid-name """ This tool scans for a number of common, vulnerable open source components @@ -161,6 +160,12 @@ def main(argv=None): action="store", help="add a unique tag to differentiate between multiple intermediate reports", ) + output_group.add_argument( + "--affected-versions", + action="count", + default=0, + help="Lists versions of product affected by a given CVE (to facilitate upgrades)", + ) parser.add_argument("-V", "--version", action="version", version=VERSION) parser.add_argument( "-u", @@ -240,6 +245,7 @@ def main(argv=None): "merge": None, "nvd": "json", "filter": [], + "affected_versions": 0, } with ErrorHandler(mode=ErrorMode.NoTrace): @@ -333,11 +339,12 @@ def main(argv=None): if args["update"] != "never": cvedb_orig.get_cvelist_if_stale() else: - LOGGER.warning("Not verifying CVE DB cache") - cvedb_orig.get_db_update_date() - if not cvedb_orig.nvd_years(): - with ErrorHandler(mode=error_mode, logger=LOGGER): - raise EmptyCache(cvedb_orig.cachedir) + if args["nvd"] == "json": + LOGGER.warning("Not verifying CVE DB cache") + cvedb_orig.get_db_update_date() + if not cvedb_orig.nvd_years(): + with ErrorHandler(mode=error_mode, logger=LOGGER): + raise EmptyCache(cvedb_orig.cachedir) # CVE Database validation if not cvedb_orig.check_cve_entries(): @@ -451,6 +458,7 @@ def main(argv=None): # Creates a Object for OutputEngine output = OutputEngine( all_cve_data=cve_scanner.all_cve_data, + all_cve_version_info=cve_scanner.all_cve_version_info, scanned_dir=args["directory"], filename=args["output_file"], themes_dir=args["html_theme"], @@ -462,6 +470,7 @@ def main(argv=None): is_report=args["report"], append=args["append"], merge_report=merged_reports, + affected_versions=args["affected_versions"], ) if not args["quiet"]: diff --git a/cve_bin_tool/cve_scanner.py b/cve_bin_tool/cve_scanner.py index 23ea2603e2..34b6ddfd2b 100644 --- a/cve_bin_tool/cve_scanner.py +++ b/cve_bin_tool/cve_scanner.py @@ -17,7 +17,7 @@ from cve_bin_tool.input_engine import TriageData from cve_bin_tool.log import LOGGER from cve_bin_tool.theme import cve_theme -from cve_bin_tool.util import CVE, CVEData, ProductInfo +from cve_bin_tool.util import CVE, CVEData, ProductInfo, VersionInfo class CVEScanner: @@ -28,6 +28,7 @@ class CVEScanner: products_with_cve: int products_without_cve: int all_cve_data: DefaultDict[ProductInfo, CVEData] + all_cve_version_info: Dict[str, VersionInfo] RANGE_UNSET: str = "" dbname: str = os.path.join(DISK_LOCATION_DEFAULT, DBNAME) @@ -46,6 +47,7 @@ def __init__( self.products_with_cve = 0 self.products_without_cve = 0 self.all_cve_data = defaultdict(CVEData) + self.all_cve_version_info = dict() def get_cves(self, product_info: ProductInfo, triage_data: TriageData): """Get CVEs against a specific version of a product. @@ -90,10 +92,10 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData): for cve_range in self.cursor: ( cve_number, - versionStartIncluding, - versionStartExcluding, - versionEndIncluding, - versionEndExcluding, + version_start_including, + version_start_excluding, + version_end_including, + version_end_excluding, ) = cve_range parsed_version = parse_version(product_info.version) @@ -103,10 +105,10 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData): if product_info.product == "openssl": # if last character is a letter, convert it to .number # version = self.openssl_convert(product_info.version) - versionStartIncluding = self.openssl_convert(versionStartIncluding) - versionStartExcluding = self.openssl_convert(versionStartExcluding) - versionEndIncluding = self.openssl_convert(versionEndIncluding) - versionEndExcluding = self.openssl_convert(versionEndExcluding) + version_start_including = self.openssl_convert(version_start_including) + version_start_excluding = self.openssl_convert(version_start_excluding) + version_end_including = self.openssl_convert(version_end_including) + version_end_excluding = self.openssl_convert(version_end_excluding) parsed_version = parse_version( self.openssl_convert(product_info.version) ) @@ -114,19 +116,20 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData): # check the start range passes_start = False if ( - versionStartIncluding is not self.RANGE_UNSET - and parsed_version >= parse_version(versionStartIncluding) + version_start_including is not self.RANGE_UNSET + and parsed_version >= parse_version(version_start_including) ): passes_start = True + if ( - versionStartExcluding is not self.RANGE_UNSET - and parsed_version > parse_version(versionStartExcluding) + version_start_excluding is not self.RANGE_UNSET + and parsed_version > parse_version(version_start_excluding) ): passes_start = True if ( - versionStartIncluding is self.RANGE_UNSET - and versionStartExcluding is self.RANGE_UNSET + version_start_including is self.RANGE_UNSET + and version_start_excluding is self.RANGE_UNSET ): # then there is no start range so just say true passes_start = True @@ -134,25 +137,32 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData): # check the end range passes_end = False if ( - versionEndIncluding is not self.RANGE_UNSET - and parsed_version <= parse_version(versionEndIncluding) + version_end_including is not self.RANGE_UNSET + and parsed_version <= parse_version(version_end_including) ): passes_end = True if ( - versionEndExcluding is not self.RANGE_UNSET - and parsed_version < parse_version(versionEndExcluding) + version_end_excluding is not self.RANGE_UNSET + and parsed_version < parse_version(version_end_excluding) ): passes_end = True + if ( - versionEndIncluding is self.RANGE_UNSET - and versionEndExcluding is self.RANGE_UNSET + version_end_including is self.RANGE_UNSET + and version_end_excluding is self.RANGE_UNSET ): # then there is no end range so it passes passes_end = True # if it fits into both ends of the range, add the cve number if passes_start and passes_end: cve_list.append(cve_number) + self.all_cve_version_info[cve_number] = VersionInfo( + start_including=version_start_including, + start_excluding=version_start_excluding, + end_including=version_end_including, + end_excluding=version_end_excluding, + ) # Go through and get all the severities if cve_list: diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index e0386337d4..7e8f0cebc2 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -31,17 +31,19 @@ SHAMismatch, ) from cve_bin_tool.log import LOGGER -from cve_bin_tool.nvd_api import DISK_LOCATION_DEFAULT, NVD_API, NVD_FILENAME_TEMPLATE +from cve_bin_tool.nvd_api import NVD_API from cve_bin_tool.version import VERSION, check_latest_version logging.basicConfig(level=logging.DEBUG) # database defaults +DISK_LOCATION_DEFAULT = os.path.join(os.path.expanduser("~"), ".cache", "cve-bin-tool") DISK_LOCATION_BACKUP = os.path.join( os.path.expanduser("~"), ".cache", "cve-bin-tool-backup" ) DBNAME = "cve.db" OLD_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "cvedb") +NVD_FILENAME_TEMPLATE = "nvdcve-1.1-{}.json.gz" class CVEDB: @@ -89,6 +91,7 @@ def __init__( self.cve_count = -1 self.nvd_type = nvd_type self.incremental_update = incremental_update + self.all_cve_entries = None if not os.path.exists(self.dbpath): self.rollback_cache_backup() @@ -104,8 +107,7 @@ async def nist_fetch_using_api(self): nvd_api = NVD_API( logger=self.LOGGER, error_mode=self.error_mode, - session=None, - outdir=self.cachedir, + incremental_update=self.incremental_update, ) if self.incremental_update: await nvd_api.get_nvd_params( @@ -118,6 +120,7 @@ async def nist_fetch_using_api(self): await nvd_api.get() await nvd_api.session.close() nvd_api.session = None + return nvd_api.all_cve_entries def get_db_update_date(self): # last time when CVE data was updated @@ -259,17 +262,17 @@ async def refresh(self): self.session = RateLimiter( aiohttp.ClientSession(connector=connector, trust_env=True) ) - self.LOGGER.info("Downloading CVE data...") tasks = [] if self.nvd_type == "api": self.LOGGER.info("[Using NVD API]") - _, curl_metadata = await asyncio.gather( + self.all_cve_entries, curl_metadata = await asyncio.gather( self.nist_fetch_using_api(), self.get_curl_versions(self.session), ) else: + self.LOGGER.info("Downloading CVE data...") nvd_metadata, curl_metadata = await asyncio.gather( self.nist_scrape(self.session), self.get_curl_versions(self.session), @@ -448,114 +451,131 @@ def populate_db(self): ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """ del_cve_range = "DELETE from cve_range where CVE_number=?" - - # error_mode.value will only be greater than 1 if quiet mode. - if self.error_mode.value > 1: - years = track(self.nvd_years(), description="Updating CVEs from NVD...") - else: - years = self.nvd_years() - - for year in years: - cve_data = self.load_nvd_year(year) - self.LOGGER.debug( - f'Time = {datetime.datetime.today().strftime("%H:%M:%S")}' + if self.nvd_type == "api": + self.LOGGER.debug("Populating database from NVD API") + self.populate_cve_list( + self.all_cve_entries, + cursor, + insert_severity, + insert_cve_range, + del_cve_range, ) - for cve_item in cve_data["CVE_Items"]: - # the information we want: - # CVE ID, Severity, Score -> - # affected {Vendor(s), Product(s), Version(s)} - cve = { - "ID": cve_item["cve"]["CVE_data_meta"]["ID"], - "description": cve_item["cve"]["description"]["description_data"][ - 0 - ]["value"], - "severity": "unknown", - "score": "unknown", - "CVSS_version": "unknown", - "CVSS_vector": "unknown", - } - if cve["description"].startswith("** REJECT **"): - # Skip this CVE if it's marked as 'REJECT' - continue - - # Get CVSSv3 or CVSSv2 score for output. - # Details are left as an exercise to the user. - if "baseMetricV3" in cve_item["impact"]: - cve["severity"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][ - "baseSeverity" - ] - cve["score"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][ - "baseScore" - ] - cve["CVSS_vector"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][ - "vectorString" - ] - cve["CVSS_version"] = 3 - elif "baseMetricV2" in cve_item["impact"]: - cve["severity"] = cve_item["impact"]["baseMetricV2"]["severity"] - cve["score"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][ - "baseScore" - ] - cve["CVSS_vector"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][ - "vectorString" - ] - cve["CVSS_version"] = 2 - - # self.LOGGER.debug( - # "Severity: {} ({}) v{}".format( - # CVE["severity"], CVE["score"], CVE["CVSS_version"] - # ) - # ) + else: + # error_mode.value will only be greater than 1 if quiet mode. + if self.error_mode.value > 1: + years = track(self.nvd_years(), description="Updating CVEs from NVD...") + else: + years = self.nvd_years() - cursor.execute( - insert_severity, - [ - cve["ID"], - cve["severity"], - cve["description"], - cve["score"], - cve["CVSS_version"], - cve["CVSS_vector"], - ], + for year in years: + cve_data = self.load_nvd_year(year) + self.LOGGER.debug( + f'Time = {datetime.datetime.today().strftime("%H:%M:%S")}' ) - - # Delete any old range entries for this CVE_number - cursor.execute(del_cve_range, (cve["ID"],)) - - # walk the nodes with version data - # return list of versions - affects_list = [] - if "configurations" in cve_item: - for node in cve_item["configurations"]["nodes"]: - # self.LOGGER.debug("NODE: {}".format(node)) - affects_list.extend(self.parse_node(node)) - if "children" in node: - for child in node["children"]: - affects_list.extend(self.parse_node(child)) - # self.LOGGER.debug("Affects: {}".format(affects_list)) - cursor.executemany( + self.populate_cve_list( + cve_data["CVE_Items"], + cursor, + insert_severity, insert_cve_range, - [ - ( - cve["ID"], - affected["vendor"], - affected["product"], - affected["version"], - affected["versionStartIncluding"], - affected["versionStartExcluding"], - affected["versionEndIncluding"], - affected["versionEndExcluding"], - ) - for affected in affects_list - ], + del_cve_range, ) - self.connection.commit() + self.connection.commit() # supplemental data gets added here self.supplement_curl() self.db_close() + def populate_cve_list( + self, all_cve_list, cursor, insert_severity, insert_cve_range, del_cve_range + ): + """Populate the cve_list with the CVEs from the JSON""" + for cve_item in all_cve_list: + # the information we want: + # CVE ID, Severity, Score -> + # affected {Vendor(s), Product(s), Version(s)} + cve = { + "ID": cve_item["cve"]["CVE_data_meta"]["ID"], + "description": cve_item["cve"]["description"]["description_data"][0][ + "value" + ], + "severity": "unknown", + "score": "unknown", + "CVSS_version": "unknown", + "CVSS_vector": "unknown", + } + if cve["description"].startswith("** REJECT **"): + # Skip this CVE if it's marked as 'REJECT' + continue + + # Get CVSSv3 or CVSSv2 score for output. + # Details are left as an exercise to the user. + if "baseMetricV3" in cve_item["impact"]: + cve["severity"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][ + "baseSeverity" + ] + cve["score"] = cve_item["impact"]["baseMetricV3"]["cvssV3"]["baseScore"] + cve["CVSS_vector"] = cve_item["impact"]["baseMetricV3"]["cvssV3"][ + "vectorString" + ] + cve["CVSS_version"] = 3 + elif "baseMetricV2" in cve_item["impact"]: + cve["severity"] = cve_item["impact"]["baseMetricV2"]["severity"] + cve["score"] = cve_item["impact"]["baseMetricV2"]["cvssV2"]["baseScore"] + cve["CVSS_vector"] = cve_item["impact"]["baseMetricV2"]["cvssV2"][ + "vectorString" + ] + cve["CVSS_version"] = 2 + + # self.LOGGER.debug( + # "Severity: {} ({}) v{}".format( + # CVE["severity"], CVE["score"], CVE["CVSS_version"] + # ) + # ) + + cursor.execute( + insert_severity, + [ + cve["ID"], + cve["severity"], + cve["description"], + cve["score"], + cve["CVSS_version"], + cve["CVSS_vector"], + ], + ) + + # Delete any old range entries for this CVE_number + cursor.execute(del_cve_range, (cve["ID"],)) + + # walk the nodes with version data + # return list of versions + affects_list = [] + if "configurations" in cve_item: + for node in cve_item["configurations"]["nodes"]: + # self.LOGGER.debug("NODE: {}".format(node)) + affects_list.extend(self.parse_node(node)) + if "children" in node: + for child in node["children"]: + affects_list.extend(self.parse_node(child)) + # self.LOGGER.debug("Affects: {}".format(affects_list)) + cursor.executemany( + insert_cve_range, + [ + ( + cve["ID"], + affected["vendor"], + affected["product"], + affected["version"], + affected["versionStartIncluding"], + affected["versionStartExcluding"], + affected["versionEndIncluding"], + affected["versionEndExcluding"], + ) + for affected in affects_list + ], + ) + def parse_node(self, node): affects_list = [] if "cpe_match" in node: diff --git a/cve_bin_tool/extractor.py b/cve_bin_tool/extractor.py index baf870fe68..dd7670ea3d 100644 --- a/cve_bin_tool/extractor.py +++ b/cve_bin_tool/extractor.py @@ -1,8 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=keyword-arg-before-vararg -# disabled for python2 compatibility reasons """ Extraction of archives """ diff --git a/cve_bin_tool/nvd_api.py b/cve_bin_tool/nvd_api.py index 8dc451cb32..5466bb9820 100644 --- a/cve_bin_tool/nvd_api.py +++ b/cve_bin_tool/nvd_api.py @@ -6,13 +6,12 @@ """ import asyncio -import gzip import json import math -import os -from datetime import datetime +import time +from datetime import datetime, timedelta, timezone from logging import Logger -from typing import Union +from typing import Dict, List, Union import aiohttp from rich.progress import Progress, track @@ -21,25 +20,13 @@ from cve_bin_tool.error_handler import ErrorMode, NVDServiceError from cve_bin_tool.log import LOGGER -NVD_FILENAME_TEMPLATE = "nvdcve-1.1-{}.json.gz" -DISK_LOCATION_DEFAULT = os.path.join(os.path.expanduser("~"), ".cache", "cve-bin-tool") - - FEED = "https://services.nvd.nist.gov/rest/json/cves/1.0" +NVD_CVE_STATUS = "https://nvd.nist.gov/rest/public/dashboard/statistics" PAGESIZE = 2000 MAX_FAIL = 5 # Interval in seconds between successive requests -INTERVAL_PERIOD = 2 - - -def filter_by_id(cve, update_data): - """Function to filter out duplicate CVE entries in case of incremental update""" - cve_id = cve["cve"]["CVE_data_meta"]["ID"] - - return all( - cve_id != update_cve["cve"]["CVE_data_meta"]["ID"] for update_cve in update_data - ) +INTERVAL_PERIOD = 3 class NVD_API: @@ -51,40 +38,45 @@ def __init__( page_size: int = PAGESIZE, max_fail: int = MAX_FAIL, interval: int = INTERVAL_PERIOD, - outdir=DISK_LOCATION_DEFAULT, error_mode: ErrorMode = ErrorMode.TruncTrace, + incremental_update=False, ): self.logger = logger or LOGGER.getChild(self.__class__.__name__) self.feed = feed self.session = session - self.params = dict() + self.params: Dict = dict() self.page_size = page_size self.max_fail = max_fail self.interval = interval - self.outdir = outdir self.error_mode = error_mode - self.NVDCVE_FILENAME_TEMPLATE = NVD_FILENAME_TEMPLATE + self.incremental_update = incremental_update self.total_results = -1 self.failed_count = 0 - self.year_wise_data = [] + self.all_cve_entries: List = [] @staticmethod def convert_date_to_nvd_date(date: datetime) -> str: """Returns a datetime string of NVD recognized date format""" - UTC_OFFSET = datetime.now() - datetime.utcnow() - utc_date = (date + UTC_OFFSET).strftime("%Y-%m-%dT%H:%M:%S:%f")[:-3] + utc_date = date.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S:%f")[:-3] return f"{utc_date} UTC-00:00" - async def filter_year_wise_data(self, year_wise_data) -> dict(): - """Returns a list of year-wise cve dictionary""" - output = dict() - for cve in year_wise_data: - year = int(cve["publishedDate"][:4]) - if year not in output: - output[year] = [cve] - else: - output[year].append(cve) - return output + @staticmethod + async def nvd_count_metadata(session): + """Returns CVE Status count from NVD""" + cve_count = { + "Total": 0, + "Rejected": 0, + "Received": 0, + } + async with await session.get( + NVD_CVE_STATUS, + params={"reporttype": "countsbystatus"}, + raise_for_status=True, + ) as response: + data = await response.json() + for key in data["vulnsByStatusCounts"]: + cve_count[key["name"]] = int(key["count"]) + return cve_count async def get_nvd_params( self, @@ -93,35 +85,39 @@ async def get_nvd_params( """ Initialize NVD request parameters """ - if time_of_last_update: - # Fetch all cves from this date (even the updated ones) - self.params["modStartDate"] = self.convert_date_to_nvd_date( - time_of_last_update - ) - self.logger.info(self.params["modStartDate"]) - self.logger.info(time_of_last_update) - self.params["includeMatchStringChange"] = json.dumps(True) - - # Check modified strings inside CVEs as well - self.params["startIndex"] = 0 self.params["resultsPerPage"] = self.page_size if not self.session: - connector = aiohttp.TCPConnector() + connector = aiohttp.TCPConnector(limit_per_host=19) self.session = RateLimiter( aiohttp.ClientSession(connector=connector, trust_env=True) ) - with Progress() as progress: - task = progress.add_task( - "Fetching metadata from NVD...", total=1, start=False + self.logger.debug("Fetching metadata from NVD...") + cve_count = await self.nvd_count_metadata(self.session) + + if time_of_last_update: + # Fetch all the updated CVE entries from the modified date. Subtracting 2-minute offset for updating cve entries + self.params["modStartDate"] = self.convert_date_to_nvd_date( + time_of_last_update - timedelta(minutes=2) ) + self.logger.info( + f'Fetching updated CVE entries after {self.params["modStartDate"]}' + ) + self.params["includeMatchStringChange"] = json.dumps(True) + # Check modified strings inside CVEs as well + with Progress() as progress: + task = progress.add_task( + "Fetching incremental metadata from NVD...", total=1, start=False + ) while await self.load_nvd_request(start_index=0): progress.update(task) progress.update(task, advance=1) - self.logger.info(f"Total {self.total_results} entries found") + else: + self.total_results = cve_count["Total"] - cve_count["Rejected"] + self.logger.info(f"Adding {self.total_results} CVE entries") async def load_nvd_request(self, start_index): """Get single NVD request and update year_wise_data list which contains list of all CVEs""" @@ -140,39 +136,38 @@ async def load_nvd_request(self, start_index): if response.status == 200: fetched_data = await response.json() if start_index == 0: + # Update total results in case there is discrepancy between NVD dashboard and API self.total_results = fetched_data["totalResults"] + self.all_cve_entries.extend(fetched_data["result"]["CVE_Items"]) - self.year_wise_data.extend(fetched_data["result"]["CVE_Items"]) - - # await asyncio.sleep(0) elif response.status == 503: raise NVDServiceError(self.params["modStartDate"]) else: self.failed_count += 1 if self.failed_count == self.max_fail: + self.failed_count = 0 self.logger.info( f"Pausing requests for {self.interval} seconds" ) - self.failed_count = 0 - await asyncio.sleep(self.interval) + time.sleep(self.interval) else: - await asyncio.sleep(1) + time.sleep(1) - except aiohttp.ClientResponseError as error: + except Exception as error: self.logger.debug(f"Failed to connect to NVD {error}") self.logger.debug(f"Pausing requests for {self.interval} seconds") - await asyncio.sleep(self.interval) + self.failed_count += 1 + time.sleep(self.interval) async def get(self): """Calls load_nvd_request() multiple times to fetch all NVD feeds""" - + start_index = 1 if self.incremental_update else 0 nvd_requests = [ self.load_nvd_request(index * self.page_size) for index in range( - 1, 1 + int(math.ceil(self.total_results / self.page_size)) + start_index, 1 + int(math.ceil(self.total_results / self.page_size)) ) ] - total_tasks = len(nvd_requests) # error_mode.value will only be greater than 1 if quiet mode. if self.error_mode.value > 1: @@ -186,82 +181,3 @@ async def get(self): for task in iter_tasks: await task - self.logger.info( - "Updating cache using fetched NVD data. This will take some minutes..." - ) - await self.update_nvd_data() - - async def replace_updated_nvd_data(self, all_cve_data, update_data): - """ - Replace CVEs already available in case they are updated. Update `CVE_data_numberOfCVEs` - """ - - all_cve_data["CVE_Items"] = list( - filter( - lambda cve: filter_by_id(cve, update_data), all_cve_data["CVE_Items"] - ) - ) - all_cve_data["CVE_Items"].extend(update_data) - all_cve_data["CVE_data_numberOfCVEs"] = str(len(all_cve_data["CVE_Items"])) - - return all_cve_data - - async def save_nvd_year(self, year, new_cve_data): - """ - Saves the dict of CVE data for the given year. - """ - filename = os.path.join(self.outdir, self.NVDCVE_FILENAME_TEMPLATE.format(year)) - all_cve_year_data = None - if not os.path.isfile(filename): - self.logger.debug( - f"Creating new file {self.NVDCVE_FILENAME_TEMPLATE.format(year)}" - ) - all_cve_year_data = { - "CVE_data_type": "CVE", - "CVE_data_format": "MITRE", - "CVE_data_version": "4.0", - "CVE_data_numberOfCVEs": "0", - "CVE_data_timestamp": datetime.now().strftime("%Y-%m-%dT%H:%MZ"), - "CVE_Items": [], - } - else: - with gzip.open(filename, "rt", encoding="utf-8") as file: - all_cve_year_data = json.load(file) - - # Open the file and load the JSON data, log the number of CVEs loaded - with gzip.open(filename, "wt", encoding="utf-8") as file: - - # Update common cves in cves_for_year to updated cve_date list - cve_data = await self.replace_updated_nvd_data( - all_cve_year_data, new_cve_data - ) - - json.dump(cve_data, file, indent=" ") - self.logger.debug( - f"Year {year} has been updated. It contains {len(cve_data['CVE_Items'])} CVEs in dataset now" - ) - - async def update_nvd_data(self): - """ - Iterate save_nvd_year for saving the dict of CVE data for each year. - """ - self.year_wise_data = await self.filter_year_wise_data(self.year_wise_data) - years = list(self.year_wise_data.keys()) - - nvd_update = [ - self.save_nvd_year(year, self.year_wise_data[year]) for year in years - ] - - total_tasks = len(nvd_update) - # error_mode.value will only be greater than 1 if quiet mode. - if self.error_mode.value > 1: - iter_tasks = track( - nvd_update, - description="Updating feeds in cache ...", - total=total_tasks, - ) - else: - iter_tasks = asyncio.as_completed(nvd_update) - - for task in iter_tasks: - await task diff --git a/cve_bin_tool/output_engine/__init__.py b/cve_bin_tool/output_engine/__init__.py index 938bfd8572..4c560ae10d 100644 --- a/cve_bin_tool/output_engine/__init__.py +++ b/cve_bin_tool/output_engine/__init__.py @@ -271,9 +271,11 @@ def __init__( is_report: bool = False, append: Union[str, bool] = False, merge_report: Union[None, List[str]] = None, + affected_versions: int = 0, + all_cve_version_info=None, ): self.logger = logger or LOGGER.getChild(self.__class__.__name__) - self.all_cve_data = all_cve_data + self.all_cve_version_info = all_cve_version_info self.scanned_dir = scanned_dir self.filename = os.path.abspath(filename) if filename else "" self.products_with_cve = products_with_cve @@ -285,6 +287,8 @@ def __init__( self.append = append self.tag = tag self.merge_report = merge_report + self.affected_versions = affected_versions + self.all_cve_data = all_cve_data def output_cves(self, outfile, output_type="console"): """Output a list of CVEs @@ -317,7 +321,12 @@ def output_cves(self, outfile, output_type="console"): outfile, ) else: # console, or anything else that is unrecognised - output_console(self.all_cve_data, self.time_of_last_update) + output_console( + self.all_cve_data, + self.all_cve_version_info, + self.time_of_last_update, + self.affected_versions, + ) if isinstance(self.append, str): save_intermediate( diff --git a/cve_bin_tool/output_engine/console.py b/cve_bin_tool/output_engine/console.py index ffb24c292b..aa37777715 100644 --- a/cve_bin_tool/output_engine/console.py +++ b/cve_bin_tool/output_engine/console.py @@ -16,12 +16,54 @@ from ..input_engine import Remarks from ..linkify import linkify_cve from ..theme import cve_theme -from ..util import ProductInfo +from ..util import ProductInfo, VersionInfo + + +def format_version_range(version_info: VersionInfo) -> str: + """ + Format version info to desirable output + + Example: + ``` + format_version_range('', '', '', '') => "-" + format_version_range('2.2.8', '', '2.2.11', '') => "[2.2.8 - 2.2.11]" + format_version_range('2.2.8', '', '', '2.2.11') => "[2.2.8 - 2.2.11)" + format_version_range('', '2.2.8', '2.2.11', '') => "(2.2.8 - 2.2.11]" + format_version_range('', '2.2.8', '', '2.2.11') => "(2.2.8 - 2.2.11])" + format_version_range('2.2.8', '', '', '') => ">= 2.2.8" + format_version_range('', '2.2.8', '', '') => "> 2.2.8" + format_version_range('', '', '2.2.11', '') => "<= 2.2.11" + format_version_range('', '', '', '2.2.11') => "< 2.2.11" + ``` + + Reference for Interval terminologies: https://en.wikipedia.org/wiki/Interval_(mathematics) + """ + + (start_including, start_excluding, end_including, end_excluding) = version_info + if start_including and end_including: + return f"[{start_including} - {end_including}]" + if start_including and end_excluding: + return f"[{start_including} - {end_excluding})" + if start_excluding and end_including: + return f"({start_excluding} - {end_including}]" + if start_excluding and end_excluding: + return f"({start_excluding} - {end_excluding})" + if start_including: + return f">= {start_including}" + if start_excluding: + return f"> {start_excluding}" + if end_including: + return f"<= {end_including}" + if end_excluding: + return f"< {end_excluding}" + return "-" def output_console( all_cve_data: Dict[ProductInfo, CVEData], + all_cve_version_info: Dict[str, VersionInfo], time_of_last_update, + affected_versions: int, console=Console(theme=cve_theme), ): """Output list of CVEs in a tabular format with color support""" @@ -65,6 +107,14 @@ def output_console( "cvss_version": cve.cvss_version, } ) + if affected_versions != 0: + try: + version_info = all_cve_version_info[cve.cve_number] + except KeyError: # TODO: handle 'UNKNOWN' and some cves more cleanly + version_info = VersionInfo("", "", "", "") + cve_by_remarks[cve.remarks][-1].update( + {"affected_versions": format_version_range(version_info)} + ) for remarks in sorted(cve_by_remarks): color = remarks_colors[remarks] @@ -79,11 +129,13 @@ def output_console( table.add_column("CVE Number") table.add_column("Severity") table.add_column("Score (CVSS Version)") + if affected_versions != 0: + table.add_column("Affected Versions") # table.add_column("CVSS Version") for cve_data in cve_by_remarks[remarks]: color = cve_data["severity"].lower() - table.add_row( + cells = [ Text.styled(cve_data["vendor"], color), Text.styled(cve_data["product"], color), Text.styled(cve_data["version"], color), @@ -96,7 +148,10 @@ def output_console( + ")", color, ), - ) + ] + if affected_versions != 0: + cells.append(Text.styled(cve_data["affected_versions"], color)) + table.add_row(*cells) # Print the table to the console console.print(table) for cve_data in cve_by_remarks[remarks]: diff --git a/cve_bin_tool/util.py b/cve_bin_tool/util.py index b180dc4eea..519e3b7978 100644 --- a/cve_bin_tool/util.py +++ b/cve_bin_tool/util.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=too-many-arguments """ Utility classes for the CVE Binary Tool """ import fnmatch import os @@ -65,6 +64,13 @@ class ProductInfo(NamedTuple): version: str +class VersionInfo(NamedTuple): + start_including: str + start_excluding: str + end_including: str + end_excluding: str + + class CVEData(defaultdict): def __missing__(self, key): if key == "cves": diff --git a/doc/MANUAL.md b/doc/MANUAL.md index 5c7cdfaf80..fe2f79f2ae 100644 --- a/doc/MANUAL.md +++ b/doc/MANUAL.md @@ -104,22 +104,23 @@ which is useful if you're trying the latest code from | | | | Available checkers | | | | -|--------------- |--------- |-------------- |--------------- |---------- |---------- |------------- | +|--------------- |------------- |--------- |---------- |------------- |------------ |--------------- | | accountsservice |avahi |bash |bind |binutils |bolt |bubblewrap | | busybox |bzip2 |cronie |cryptsetup |cups |curl |dbus | | dnsmasq |dovecot |dpkg |enscript |expat |ffmpeg |freeradius | | ftp |gcc |gimp |glibc |gnomeshell |gnupg |gnutls | -| gpgme |gstreamer |gupnp |haproxy |hostapd |hunspell |icecast | -| icu |irssi |kbd |kerberos |kexectools |libarchive |libbpg | -| libdb |libgcrypt |libical |libjpeg_turbo |liblas |libnss |libsndfile | -| libsoup |libssh2 |libtiff |libvirt |libxslt |lighttpd |logrotate | -| lua |mariadb |mdadm |memcached |mtr |mysql |nano | -| ncurses |nessus |netpbm |nginx |node |ntp |open_vm_tools | -| openafs |openjpeg |openldap |openssh |openssl |openswan |openvpn | -| p7zip |pcsc_lite |png |polarssl_fedora |poppler |postgresql |pspp | -| python |qt |radare2 |rsyslog |samba |sqlite |strongswan | -| subversion |sudo |syslogng |systemd |tcpdump |trousers |varnish | -| webkitgtk |wireshark |wpa_supplicant |xerces |xml2 |zlib |zsh | +| gpgme |gstreamer |gupnp |haproxy |hdf5 |hostapd |hunspell | +| icecast |icu |irssi |kbd |kerberos |kexectools |libarchive | +| libbpg |libdb |libgcrypt |libical |libjpeg_turbo |liblas |libnss | +| libsndfile |libsoup |libssh2 |libtiff |libvirt |libvncserver |libxslt | +| lighttpd |logrotate |lua |mariadb |mdadm |memcached |mtr | +| mysql |nano |ncurses |nessus |netpbm |nginx |node | +| ntp |open_vm_tools |openafs |openjpeg |openldap |openssh |openssl | +| openswan |openvpn |p7zip |pcsc_lite |pigz |png |polarssl_fedora | +| poppler |postgresql |pspp |python |qt |radare2 |rsyslog | +| samba |sane_backends |sqlite |strongswan |subversion |sudo |syslogng | +| systemd |tcpdump |trousers |varnish |webkitgtk |wireshark |wpa_supplicant | +| xerces |xml2 |zlib |zsh | | | | For a quick overview of usage and how it works, you can also see [the readme file](README.md). diff --git a/test/test_extractor.py b/test/test_extractor.py index 05e7547327..c9f7a50345 100644 --- a/test/test_extractor.py +++ b/test/test_extractor.py @@ -26,30 +26,15 @@ os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), "cab-test-python3.8.cab", ) -BAD_EXE_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.exe", -) -BAD_ZIP_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.zip", -) -BAD_TAR_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.tar", -) -BAD_RPM_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.rpm", -) -BAD_DEB_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.deb", -) -BAD_CAB_FILE = os.path.join( - os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets"), - "empty-file.cab", -) + +BAD_FILE = dict() +FILE_EXT = ["tar", "zip", "cab", "exe", "rpm", "deb"] +for ext in FILE_EXT: + BAD_FILE[ext] = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "assets", + "empty-file." + ext, + ) class TestExtractorBase: @@ -135,7 +120,7 @@ async def test_extract_cleanup(self): async def test_bad_tar(self): """Test handling of invalid tar files. No errors should be raised.""" - self.extract_files(BAD_TAR_FILE) + self.extract_files(BAD_FILE["tar"]) class TestExtractFileRpm(TestExtractorBase): @@ -173,7 +158,7 @@ async def test_extract_file_rpm_no_rpm2cipo(self): async def test_bad_rpm(self): """Test handling of invalid rpm files. No errors should be raised.""" - self.extract_files(BAD_RPM_FILE) + self.extract_files(BAD_FILE["rpm"]) class TestExtractFileDeb(TestExtractorBase): @@ -202,7 +187,7 @@ async def test_extract_file_deb(self): async def test_bad_deb(self): """Test handling of invalid deb files. No errors should be raised.""" - self.extract_files(BAD_DEB_FILE) + self.extract_files(BAD_FILE["deb"]) class TestExtractFileCab(TestExtractorBase): @@ -232,7 +217,7 @@ async def test_extract_file_cab(self): async def test_bad_cab(self): """Test handling of invalid cab files. No errors should be raised.""" - self.extract_files(BAD_CAB_FILE) + self.extract_files(BAD_FILE["cab"]) class TestExtractFileZip(TestExtractorBase): @@ -271,5 +256,5 @@ async def test_bad_zip(self): Log messages differ for .exe and .zip and are tested in test_cli.py """ - self.extract_files(BAD_EXE_FILE) - self.extract_files(BAD_ZIP_FILE) + self.extract_files(BAD_FILE["exe"]) + self.extract_files(BAD_FILE["zip"]) diff --git a/test/test_nvd_api.py b/test/test_nvd_api.py index a05a1cfe6d..9a1c238299 100644 --- a/test/test_nvd_api.py +++ b/test/test_nvd_api.py @@ -5,7 +5,6 @@ import tempfile from datetime import datetime, timedelta -import aiohttp import pytest from cve_bin_tool.cvedb import CVEDB @@ -24,30 +23,31 @@ def teardown_class(cls): @pytest.mark.asyncio async def test_get_nvd_params(self): """Test NVD for a future date. It should be empty""" - nvd_api = NVD_API(outdir=self.outdir) + nvd_api = NVD_API() await nvd_api.get_nvd_params( - time_of_last_update=(datetime.now() + timedelta(days=1)) + time_of_last_update=(datetime.now() + timedelta(days=2)) ) - assert nvd_api.total_results == 0 and nvd_api.year_wise_data == [] + assert nvd_api.total_results == 0 and nvd_api.all_cve_entries == [] @pytest.mark.asyncio async def test_total_results_count(self): """Total results should be greater than or equal to the current fetched cves""" - nvd_api = NVD_API(outdir=self.outdir) + nvd_api = NVD_API() await nvd_api.get_nvd_params( time_of_last_update=datetime.now() - timedelta(days=2) ) - assert len(nvd_api.year_wise_data) >= nvd_api.total_results + assert len(nvd_api.all_cve_entries) >= nvd_api.total_results @pytest.mark.asyncio async def test_nvd_incremental_update(self): - """Test to check whether we are able to fetch and save the nvd cache""" - nvd_api = NVD_API(outdir=self.outdir) + """Test to check whether we are able to fetch and save the nvd entries using time_of_last_update""" + nvd_api = NVD_API(incremental_update=True) await nvd_api.get_nvd_params( time_of_last_update=datetime.now() - timedelta(days=4) ) await nvd_api.get() - cvedb = CVEDB(cachedir=nvd_api.outdir) + cvedb = CVEDB(cachedir=self.outdir, nvd_type="api") + cvedb.all_cve_entries = nvd_api.all_cve_entries cvedb.init_database() cvedb.populate_db() cvedb.check_cve_entries() @@ -56,37 +56,23 @@ async def test_nvd_incremental_update(self): @pytest.mark.asyncio async def test_empty_nvd_result(self): """Test to check nvd results non-empty result. Total result should be greater than 0""" - nvd_api = NVD_API(outdir=self.outdir) + nvd_api = NVD_API() await nvd_api.get_nvd_params() assert nvd_api.total_results > 0 @pytest.mark.asyncio - @pytest.mark.skip(reason="Test is broken") + @pytest.mark.skip(reason="NVD does not return the Received count") async def test_api_cve_count(self): """Test to match the totalResults and the total CVE count on NVD""" - connector = aiohttp.TCPConnector() - async with aiohttp.ClientSession( - connector=connector, trust_env=True - ) as session: - - status_count = dict() - async with await session.get( - "https://nvd.nist.gov/rest/public/dashboard/statistics?reporttype=countsbystatus" - ) as response: - # Fetch the rejected and received CVE count from NVD - response.raise_for_status() - data = await response.json() - for dictionary in data["vulnsByStatusCounts"]: - status_count[dictionary["name"]] = int(dictionary["count"]) - - nvd_api = NVD_API(outdir=self.outdir) + nvd_api = NVD_API() await nvd_api.get_nvd_params() + await nvd_api.load_nvd_request(0) + cve_count = await nvd_api.nvd_count_metadata(nvd_api.session) + # Difference between the total and rejected CVE count on NVD should be equal to the total CVE count + # Received CVE count might be zero assert ( - abs( - nvd_api.total_results - - (status_count["Total"] - status_count["Rejected"]) - ) - <= status_count["Received"] + abs(nvd_api.total_results - (cve_count["Total"] - cve_count["Rejected"])) + <= cve_count["Received"] ) diff --git a/test/test_output_engine.py b/test/test_output_engine.py index 054c89dd4d..f8d4e2a1c3 100644 --- a/test/test_output_engine.py +++ b/test/test_output_engine.py @@ -18,7 +18,7 @@ from cve_bin_tool.output_engine.console import output_console from cve_bin_tool.output_engine.html import output_html from cve_bin_tool.output_engine.util import format_output -from cve_bin_tool.util import CVE, CVEData, ProductInfo +from cve_bin_tool.util import CVE, CVEData, ProductInfo, VersionInfo class TestOutputEngine(unittest.TestCase): @@ -70,6 +70,96 @@ class TestOutputEngine(unittest.TestCase): ), } + MOCK_ALL_CVE_DATA = { + ProductInfo("vendor0", "product0", "1.0"): CVEData( + cves=[ + CVE( + "UNKNOWN", + "UNKNOWN", + score=0, + cvss_version=0, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0001", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0002", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0003", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0004", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0005", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0006", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0007", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-0008", + "MEDIUM", + score=4.2, + cvss_version=2, + cvss_vector="C:H", + ), + CVE( + "CVE-9999-9999", + "LOW", + score=1.2, + cvss_version=2, + cvss_vector="CVSS2.0/C:H", + ), + ], + paths={""}, + ), + } + + MOCK_ALL_CVE_VERSION_INFO = { + "UNKNOWN": VersionInfo("", "", "", ""), + "CVE-9999-0001": VersionInfo("0.9.0", "", "1.2.0", ""), + "CVE-9999-0002": VersionInfo("0.9.0", "", "", "1.2.0"), + "CVE-9999-0003": VersionInfo("", "0.9.0", "1.2.0", ""), + "CVE-9999-0004": VersionInfo("", "0.9.0", "", "1.2.0"), + "CVE-9999-0005": VersionInfo("0.9.0", "", "", ""), + "CVE-9999-0006": VersionInfo("", "0.9.0", "", ""), + "CVE-9999-0007": VersionInfo("", "", "1.2.0", ""), + "CVE-9999-0008": VersionInfo("", "", "", "1.2.0"), + } + FORMATTED_OUTPUT = [ { "vendor": "vendor0", @@ -125,6 +215,149 @@ class TestOutputEngine(unittest.TestCase): }, ] + FORMATTED_OUTPUT_AFFECTED_VERSIONS = [ + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "UNKNOWN", + "severity": "UNKNOWN", + "score": "0", + "cvss_version": "0", + "affected_versions": "-", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0001", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "[0.9.0 - 1.2.0]", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0002", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "[0.9.0 - 1.2.0)", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0003", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "(0.9.0 - 1.2.0]", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0004", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "(0.9.0 - 1.2.0)", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0005", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": ">= 0.9.0", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0006", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "> 0.9.0", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0007", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "<= 1.2.0", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-0008", + "severity": "MEDIUM", + "score": "4.2", + "cvss_version": "2", + "affected_versions": "< 1.2.0", + "cvss_vector": "C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + { + "vendor": "vendor0", + "product": "product0", + "version": "1.0", + "cve_number": "CVE-9999-9999", + "severity": "LOW", + "score": "1.2", + "cvss_version": "2", + "affected_versions": "-", + "cvss_vector": "CVSS2.0/C:H", + "paths": "", + "remarks": "NewFound", + "comments": "", + }, + ] + def setUp(self) -> None: self.output_engine = OutputEngine( all_cve_data=self.MOCK_OUTPUT, @@ -163,8 +396,10 @@ def test_output_console(self): console = Console(file=self.mock_file) output_console( self.MOCK_OUTPUT, - console=console, + self.MOCK_ALL_CVE_VERSION_INFO, time_of_last_update=datetime.today(), + affected_versions=0, + console=console, ) expected_output = "│ vendor0 │ product0 │ 1.0 │ CVE-1234-1234 │ MEDIUM │ 4.2 (v2) │\n│ vendor0 │ product0 │ 1.0 │ CVE-1234-1234 │ LOW │ 1.2 (v2) │\n│ vendor0 │ product0 │ 2.8.6 │ CVE-1234-1234 │ LOW │ 2.5 (v3) │\n│ vendor1 │ product1 │ 3.2.1.0 │ CVE-1234-1234 │ HIGH │ 7.5 (v2) │\n└─────────┴──────────┴─────────┴───────────────┴──────────┴──────────────────────┘\n" @@ -172,6 +407,23 @@ def test_output_console(self): result = self.mock_file.read() self.assertIn(expected_output, result) + def test_output_console_affected_versions(self): + """Test Formatting Output as console with affected-versions""" + + console = Console(file=self.mock_file) + output_console( + self.MOCK_ALL_CVE_DATA, + self.MOCK_ALL_CVE_VERSION_INFO, + time_of_last_update=datetime.today(), + affected_versions=1, + console=console, + ) + + expected_output = "│ vendor0 │ product0 │ 1.0 │ UNKNOWN │ UNKNOWN │ 0 (v0) │ - │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0001 │ MEDIUM │ 4.2 (v2) │ [0.9.0 - 1.2.0] │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0002 │ MEDIUM │ 4.2 (v2) │ [0.9.0 - 1.2.0) │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0003 │ MEDIUM │ 4.2 (v2) │ (0.9.0 - 1.2.0] │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0004 │ MEDIUM │ 4.2 (v2) │ (0.9.0 - 1.2.0) │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0005 │ MEDIUM │ 4.2 (v2) │ >= 0.9.0 │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0006 │ MEDIUM │ 4.2 (v2) │ > 0.9.0 │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0007 │ MEDIUM │ 4.2 (v2) │ <= 1.2.0 │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-0008 │ MEDIUM │ 4.2 (v2) │ < 1.2.0 │\n│ vendor0 │ product0 │ 1.0 │ CVE-9999-9999 │ LOW │ 1.2 (v2) │ - │\n└─────────┴──────────┴─────────┴───────────────┴──────────┴──────────────────────┴───────────────────┘\n" + self.mock_file.seek(0) # reset file position + result = self.mock_file.read() + self.assertIn(expected_output, result) + def test_output_html(self): """Test formatting output as HTML""" diff --git a/test/test_scanner.py b/test/test_scanner.py index 468fabc828..0279186859 100644 --- a/test/test_scanner.py +++ b/test/test_scanner.py @@ -1,7 +1,6 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later -# pylint: disable=too-many-public-methods, too-many-arguments, fixme """ CVE-bin-tool tests """