Skip to content

Commit

Permalink
Added validation in init, marked lines for Bandit
Browse files Browse the repository at this point in the history
You can't use bound parameters for things like table names that aren't
technically parameters in sqlite3.  Moved some basic input validation
into the init function and marked lines for bandit.  Because bandit
doesn't deal well with multi-lines with `nosec` comments, I've added
descriptive comments to some select statements.

Signed-off-by: Terri Oda <terri.oda@intel.com>
  • Loading branch information
terriko committed Jul 14, 2021
1 parent cc3b2ec commit ec128e0
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions cve_bin_tool/version_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@
from cve_bin_tool.cvedb import DISK_LOCATION_DEFAULT


class InvalidVersionSignatureTable(ValueError):
"""Raised when an invalid table name is given to version_signature"""


class VersionSignatureDb:
"""Methods for version signature data stored in sqlite"""

def __init__(self, table_name, mapping_function, duration) -> None:
"""Set location on disk data cache will reside.
Also sets the table name and refresh duration
"""
if not table_name.isalnum():
# Basic validation here so we can safely ignore Bandit SQL warnings
raise InvalidVersionSignatureTable
self.table_name = table_name
self.update_table_name = f"latest_update_{table_name}"
self.mapping_function = mapping_function
self.disk_location = DISK_LOCATION_DEFAULT
self.duration = duration
Expand Down Expand Up @@ -58,16 +66,14 @@ def get_mapping_data(self):
)

self.cursor.execute(
"CREATE TABLE IF NOT EXISTS {} (datestamp DATETIME PRIMARY KEY)".format(
"latest_update_" + self.table_name
)
f"CREATE TABLE IF NOT EXISTS {self.update_table_name} (datestamp DATETIME PRIMARY KEY)"
)

update_required: bool = False

datestamp = self.cursor.execute(
"SELECT * FROM {}".format("latest_update_" + self.table_name)
).fetchone()
f"SELECT * FROM {self.update_table_name}"
).fetchone() # update_table_name validated in __init__

if datestamp and type(datestamp) is int:
# Updates if the difference between current time and the time of last update is greater than duration
Expand All @@ -79,22 +85,21 @@ def get_mapping_data(self):

if datestamp is None or update_required:
# if update is required or database is empty, fetch and insert data into database
self.cursor.execute("DELETE FROM ?", self.table_name)
self.cursor.execute("DELETE FROM ?", ("latest_update_" + self.table_name))
self.cursor.execute(f"DELETE FROM {self.table_name}") # nosec
self.cursor.execute(f"DELETE FROM {self.update_table_name}") # nosec
self.cursor.execute(
"INSERT INTO ? VALUES (?)",
(
("latest_update_" + self.table_name),
time.time(),
),
f"INSERT INTO {self.update_table_name} VALUES (?)",
(time.time(),),
)

for mapping in self.mapping_function():
self.cursor.execute(
"INSERT INTO ? (version, sourceId) VALUES (?, ?)",
(self.table_name, mapping[0], mapping[1]),
f"INSERT INTO {self.table_name} (version, sourceId) VALUES (?, ?)",
(mapping[0], mapping[1]),
)

data = self.cursor.execute("SELECT * FROM ?", self.table_name).fetchall()
data = self.cursor.execute(
f"SELECT * FROM {self.table_name}"
).fetchall() # table_name validated in __init__
self.conn.commit()
return data

0 comments on commit ec128e0

Please sign in to comment.