Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(security): Improve SQL in version_signature.py #1248

Merged
merged 4 commits into from
Jul 15, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions cve_bin_tool/version_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@
from cve_bin_tool.cvedb import DISK_LOCATION_DEFAULT


class InvalidVersionSignatureTable(ValueError):
"""Raised when an invalid table name is given to version_signature"""


class VersionSignatureDb:
"""Methods for version signature data stored in sqlite"""

def __init__(self, table_name, mapping_function, duration) -> None:
"""Set location on disk data cache will reside.
Also sets the table name and refresh duration
"""
if not table_name.isalnum():
# Basic validation here so we can safely ignore Bandit SQL warnings
raise InvalidVersionSignatureTable
terriko marked this conversation as resolved.
Show resolved Hide resolved
self.table_name = table_name
self.update_table_name = f"latest_update_{table_name}"
self.mapping_function = mapping_function
self.disk_location = DISK_LOCATION_DEFAULT
self.duration = duration
Expand Down Expand Up @@ -54,22 +62,18 @@ def get_mapping_data(self):
the data after the specified refresh duration
"""
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS {}(version TEXT , sourceId TEXT PRIMARY KEY)".format(
self.table_name
)
f"CREATE TABLE IF NOT EXISTS {self.table_name}(version TEXT , sourceId TEXT PRIMARY KEY)"
)

self.cursor.execute(
"CREATE TABLE IF NOT EXISTS {} (datestamp DATETIME PRIMARY KEY)".format(
"latest_update_" + self.table_name
)
f"CREATE TABLE IF NOT EXISTS {self.update_table_name} (datestamp DATETIME PRIMARY KEY)"
)

update_required: bool = False

datestamp = self.cursor.execute(
"SELECT * FROM {}".format("latest_update_" + self.table_name)
).fetchone()
f"SELECT * FROM {self.update_table_name}"
).fetchone() # update_table_name validated in __init__

if datestamp and type(datestamp) is int:
# Updates if the difference between current time and the time of last update is greater than duration
Expand All @@ -81,23 +85,21 @@ def get_mapping_data(self):

if datestamp is None or update_required:
# if update is required or database is empty, fetch and insert data into database
self.cursor.execute(f"DELETE FROM {self.table_name}")
self.cursor.execute(
"DELETE FROM {}".format("latest_update_" + self.table_name)
)
self.cursor.execute(f"DELETE FROM {self.table_name}") # nosec
self.cursor.execute(f"DELETE FROM {self.update_table_name}") # nosec
self.cursor.execute(
"INSERT INTO {} VALUES (?)".format("latest_update_" + self.table_name),
f"INSERT INTO {self.update_table_name} VALUES (?)",
(time.time(),),
)

for mapping in self.mapping_function():
self.cursor.execute(
"INSERT INTO {} (version, sourceId) VALUES (?, ?)".format(
self.table_name
),
f"INSERT INTO {self.table_name} (version, sourceId) VALUES (?, ?)",
(mapping[0], mapping[1]),
)

data = self.cursor.execute(f"SELECT * FROM {self.table_name}").fetchall()
data = self.cursor.execute(
f"SELECT * FROM {self.table_name}"
).fetchall() # table_name validated in __init__
self.conn.commit()
return data