Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(package-list-parser): Ubuntu #1183

Merged
merged 4 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cve_bin_tool/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class EmptyTxtError(Exception):
"""Given txt File is empty"""


class NotTxtError(Exception):
"""Given File is not txt"""
class InvalidListError(Exception):
"""Given File is an invalid package list"""


class InvalidCsvError(Exception):
Expand Down Expand Up @@ -163,7 +163,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
InvalidCsvError: -4,
InvalidJsonError: -4,
EmptyTxtError: -4,
NotTxtError: -4,
InvalidListError: -4,
MissingFieldsError: -5,
InsufficientArgs: -6,
EmptyCache: -7,
Expand Down
141 changes: 94 additions & 47 deletions cve_bin_tool/package_list_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import csv
import json
import re
import subprocess
from collections import defaultdict
from logging import Logger
from os.path import dirname, getsize, isfile, join
from subprocess import PIPE, run
from sys import platform

from cve_bin_tool.error_handler import (
EmptyTxtError,
ErrorHandler,
ErrorMode,
NotTxtError,
InvalidListError,
)
from cve_bin_tool.log import LOGGER
from cve_bin_tool.util import ProductInfo, Remarks
Expand All @@ -37,57 +38,93 @@ def __init__(
self.package_names_without_vendor = []

def parse_list(self):
input_file = self.input_file
self.check_file()

txt_package_names = []
csv_package_names = []
csv_package_vendors = []
package_names_with_vendor = self.package_names_with_vendor
package_names_without_vendor = self.package_names_without_vendor

installed_packages_json = subprocess.run(
["pip", "list", "--format", "json"],
stdout=subprocess.PIPE,
)
installed_packages = json.loads(installed_packages_json.stdout.decode("utf-8"))

with open(PYPI_CSV) as csvfile, open(self.input_file) as txtfile:
csv_reader = csv.reader(csvfile)
next(csv_reader)
lines = txtfile.readlines()

if not input_file.endswith("requirements.txt"):
if platform != "linux":
LOGGER.warning("Package list support only available on Linux!")
return {}

system_packages = []
linux_distribution = run(["lsb_release", "-si"], stdout=PIPE)

if "Ubuntu" in linux_distribution.stdout.decode("utf-8"):
LOGGER.info("Scanning ubuntu package list.")
installed_packages = run(
[
"dpkg-query",
"--show",
'--showformat={"name": "${binary:Package}", "version": "${Version}"}, ',
],
stdout=PIPE,
)
installed_packages = json.loads(
f"[{installed_packages.stdout.decode('utf-8')[0:-2]}]"
)
with open(input_file) as req:
lines = req.readlines()
for line in lines:
txt_package_names.append(re.split(">|\\[|;|=|\n", line)[0])
for (vendor, product) in csv_reader:
csv_package_names.append(product)
csv_package_vendors.append(vendor)
system_packages.append(re.split("\n", line)[0])

for installed_package in installed_packages:
package_name = installed_package["name"].lower()
if package_name in txt_package_names:
if package_name in csv_package_names:
installed_package["vendor"] = csv_package_vendors[
csv_package_names.index(package_name)
]
package_names_with_vendor.append(installed_package)
else:
package_names_without_vendor.append(installed_package)

not_found_package_names = set(txt_package_names) - set(csv_package_names)

with VendorFetch() as vendor_fetch:
vendor_package_pairs = vendor_fetch.get_vendor_product_pairs(
package_names_without_vendor
if installed_package["name"] in system_packages:
self.package_names_without_vendor.append(installed_package)

else:
LOGGER.info("Scanning python package list.")
txt_package_names = []
csv_package_names = []
csv_package_vendors = []

installed_packages_json = run(
["pip", "list", "--format", "json"],
stdout=PIPE,
)
installed_packages = json.loads(
installed_packages_json.stdout.decode("utf-8")
)

with open(PYPI_CSV) as csvfile, open(input_file) as txtfile:
csv_reader = csv.reader(csvfile)
next(csv_reader)
lines = txtfile.readlines()

for line in lines:
txt_package_names.append(re.split(">|\\[|;|=|\n", line)[0])
for (vendor, product) in csv_reader:
csv_package_names.append(product)
csv_package_vendors.append(vendor)
for installed_package in installed_packages:
package_name = installed_package["name"].lower()
if package_name in txt_package_names:
if package_name in csv_package_names:
installed_package["vendor"] = csv_package_vendors[
csv_package_names.index(package_name)
]
self.package_names_with_vendor.append(installed_package)
else:
self.package_names_without_vendor.append(installed_package)

not_found_package_names = set(txt_package_names) - set(
csv_package_names
)

with VendorFetch() as vendor_fetch:
vendor_package_pairs = vendor_fetch.get_vendor_product_pairs(
self.package_names_without_vendor
)
if input_file.endswith("requirements.txt"):
LOGGER.warning(
f"{not_found_package_names} are not found in the mapping."
)
LOGGER.info(
f"{vendor_package_pairs} are possibly the missing vendor product pairs."
)

self.add_vendor(vendor_package_pairs)
self.parse_data(package_names_with_vendor)
return self.parsed_data_with_vendor
self.add_vendor(vendor_package_pairs)
self.parse_data()
return self.parsed_data_with_vendor

def add_vendor(self, vendor_package_pairs):
for vendor_package_pair in vendor_package_pairs:
Expand All @@ -104,8 +141,8 @@ def add_vendor(self, vendor_package_pairs):
package_name["vendor"] = "UNKNOWN"
self.package_names_with_vendor.append(package_name)

def parse_data(self, data):
for row in data:
def parse_data(self):
for row in self.package_names_with_vendor:
product_info = ProductInfo(
row["vendor"], row["name"].lower(), row["version"]
)
Expand All @@ -126,10 +163,20 @@ def check_file(self):
with ErrorHandler(mode=error_mode):
raise FileNotFoundError(input_file)

if not input_file.endswith(".txt"):
with ErrorHandler(mode=error_mode):
raise NotTxtError(input_file)

if getsize(input_file) == 0:
with ErrorHandler(mode=error_mode):
raise EmptyTxtError(input_file)

if not input_file.endswith("requirements.txt"):
# Simulate installation on Ubuntu using apt-get to check if the file is valid
output = run(
[f"xargs", "-a", input_file, "apt-get", "install", "-s"],
stderr=PIPE,
stdout=PIPE,
)

if output.returncode != 0:
with ErrorHandler(mode=error_mode):
raise InvalidListError(
f"Invalid Package list\n{output.stderr.decode('utf-8')}"
)
6 changes: 5 additions & 1 deletion cve_bin_tool/package_list_parser/vendor_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sqlite3
from os.path import join

from rich.progress import track

from cve_bin_tool.cvedb import DBNAME, DISK_LOCATION_DEFAULT


Expand All @@ -21,7 +23,9 @@ def get_vendor_product_pairs(self, package_names):
SELECT DISTINCT vendor FROM cve_range
WHERE product=?
"""
for package_name in package_names:
for package_name in track(
package_names, description="Processing the given list...."
):
self.cursor.execute(query, [package_name["name"]])
vendors = list(map(lambda x: x[0], self.cursor.fetchall()))
for vendor in vendors:
Expand Down
68 changes: 63 additions & 5 deletions test/test_package_list_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
# SPDX-License-Identifier: GPL-3.0-or-later

import subprocess
from os import environ
from os.path import dirname, join
from sys import platform

import pytest

from cve_bin_tool.error_handler import ErrorMode
from cve_bin_tool.package_list_parser import (
EmptyTxtError,
NotTxtError,
InvalidListError,
PackageListParser,
Remarks,
)
Expand All @@ -19,7 +21,13 @@
class TestPackageListParser:
TXT_PATH = join(dirname(__file__), "txt")

PARSED_TRIAGE_DATA = {
DISTRO = (
subprocess.run(["lsb_release", "-sd"], stdout=subprocess.PIPE)
if platform == "linux"
else ""
)

REQ_PARSED_TRIAGE_DATA = {
ProductInfo(vendor="httplib2_project", product="httplib2", version="0.18.1"): {
"default": {"remarks": Remarks.Unexplored, "comments": "", "severity": ""},
"paths": {""},
Expand All @@ -34,6 +42,21 @@ class TestPackageListParser:
},
}

UBUNTU_PARSED_TRIAGE_DATA = {
ProductInfo(vendor="gnu*", product="bash", version="5.0-6ubuntu1.1"): {
"default": {"remarks": Remarks.Unexplored, "comments": "", "severity": ""},
"paths": {""},
},
ProductInfo(vendor="gnu*", product="binutils", version="2.34-6ubuntu1.1"): {
"default": {"remarks": Remarks.Unexplored, "comments": "", "severity": ""},
"paths": {""},
},
ProductInfo(vendor="gnu*", product="wget", version="1.20.3-1ubuntu1"): {
"default": {"remarks": Remarks.Unexplored, "comments": "", "severity": ""},
"paths": {""},
},
}

@pytest.mark.parametrize("filepath", [join(TXT_PATH, "nonexistent.txt")])
def test_nonexistent_txt(self, filepath):
package_list = PackageListParser(filepath, error_mode=ErrorMode.FullTrace)
Expand All @@ -49,7 +72,7 @@ def test_empty_txt(self, filepath, exception):
package_list.parse_list()

@pytest.mark.parametrize(
"filepath, exception", [(join(TXT_PATH, "not_txt.csv"), NotTxtError)]
"filepath, exception", [(join(TXT_PATH, "not_txt.csv"), InvalidListError)]
)
def test_not_txt(self, filepath, exception):
package_list = PackageListParser(filepath, error_mode=ErrorMode.FullTrace)
Expand All @@ -58,10 +81,45 @@ def test_not_txt(self, filepath, exception):

@pytest.mark.parametrize(
"filepath, parsed_data",
[(join(TXT_PATH, "test_requirements.txt"), PARSED_TRIAGE_DATA)],
[(join(TXT_PATH, "test_requirements.txt"), REQ_PARSED_TRIAGE_DATA)],
)
def test_valid_txt(self, filepath, parsed_data):
def test_valid_requirements(self, filepath, parsed_data):
# packages is installed from test_requirements with specific versions for the test to pass
subprocess.run(["pip", "install", "-r", filepath])
package_list = PackageListParser(filepath, error_mode=ErrorMode.FullTrace)
assert package_list.parse_list() == parsed_data
# Update the packages back to latest
subprocess.run(["pip", "install", "httplib2", "requests", "html5lib", "-U"])

@pytest.mark.parametrize(
"filepath, exception",
[(join(TXT_PATH, "test_broken_ubuntu_list.txt"), InvalidListError)],
)
def test_invalid_ubuntu_list(self, filepath, exception):
package_list = PackageListParser(filepath, error_mode=ErrorMode.FullTrace)
with pytest.raises(exception):
package_list.parse_list()

@pytest.mark.skipif(
"ACTIONS" not in environ
or not platform == "linux"
or "Ubuntu 20.04" not in DISTRO.stdout.decode(),
reason="Running locally requires root permission",
)
@pytest.mark.parametrize(
"filepath, parsed_data",
[(join(TXT_PATH, "test_ubuntu_list.txt"), UBUNTU_PARSED_TRIAGE_DATA)],
)
def test_valid_ubuntu_list(self, filepath, parsed_data):
subprocess.run(
[
"sudo",
"apt-get",
"install",
"bash=5.0-6ubuntu1.1",
"binutils=2.34-6ubuntu1.1",
"wget=1.20.3-1ubuntu1",
]
)
package_list = PackageListParser(filepath, error_mode=ErrorMode.FullTrace)
assert package_list.parse_list() == parsed_data
5 changes: 5 additions & 0 deletions test/txt/test_broken_ubuntu_list.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
acpid
bash
binutils
wget
br0s
3 changes: 3 additions & 0 deletions test/txt/test_ubuntu_list.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
bash
binutils
wget