Skip to content

Commit 7ec5658

Browse files
committed
cve_bin_tool: data_sources: Set cachedir and backupcachedir as parameter
The cachedir and backupcachedir was always on the default value for the data sources. For the cvedb is the cachedir configurable. With this patch the cve-bin-tool can run in different instances with a own cache for the CVE information. Additional the patch is a workaround for the cvedb access in line 441 purl2cpe_conn = sqlite3.connect(self.cachedir / "purl2cpe/purl2cpe.db"), which fails if cvedb has an other cachedir as the purl2cpe with the DISK_LOCATION_DEFAULT. Signed-off-by: Maik Otto <[email protected]>
1 parent 1290802 commit 7ec5658

18 files changed

+67
-31
lines changed

cve_bin_tool/cve_scanner.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ class CVEScanner:
3131
all_cve_version_info: Dict[str, VersionInfo]
3232

3333
RANGE_UNSET: str = ""
34-
dbname: str = str(Path(DISK_LOCATION_DEFAULT) / DBNAME)
3534
CONSOLE: Console = Console(file=sys.stderr, theme=cve_theme)
3635
ALPHA_TO_NUM: Dict[str, int] = dict(zip(ascii_lowercase, range(26)))
36+
CACHEDIR = DISK_LOCATION_DEFAULT
3737

3838
def __init__(
3939
self,
@@ -46,6 +46,7 @@ def __init__(
4646
check_exploits: bool = False,
4747
exploits_list: List[str] = [],
4848
disabled_sources: List[str] = [],
49+
cachedir: str = None,
4950
):
5051
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
5152
self.error_mode = error_mode
@@ -61,6 +62,7 @@ def __init__(
6162
self.exploits_list = exploits_list
6263
self.disabled_sources = disabled_sources
6364
self.all_product_data = dict()
65+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
6466

6567
def get_cves(self, product_info: ProductInfo, triage_data: TriageData):
6668
"""Get CVEs against a specific version of a product.

cve_bin_tool/data_sources/curl_source.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,16 @@ class Curl_Source(Data_Source):
3131
LOGGER = LOGGER.getChild("CVEDB")
3232
DATA_SOURCE_LINK = "https://curl.se/docs/vuln.json"
3333

34-
def __init__(self, error_mode=ErrorMode.TruncTrace):
34+
def __init__(
35+
self,
36+
error_mode=ErrorMode.TruncTrace,
37+
cachedir: str = None,
38+
backup_cachedir: str = None
39+
):
3540
"""Initialize a Curl_Source instance. Args: error_mode (ErrorMode): The error mode to be used."""
3641
self.cve_list = None
37-
self.cachedir = self.CACHEDIR
38-
self.backup_cachedir = self.BACKUPCACHEDIR
42+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
43+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
3944
self.error_mode = error_mode
4045
self.session = None
4146
self.affected_data = None

cve_bin_tool/data_sources/epss_source.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,16 @@ class Epss_Source:
2929
LOGGER = logging.getLogger().getChild("CVEDB")
3030
DATA_SOURCE_LINK = "https://epss.cyentia.com/epss_scores-current.csv.gz"
3131

32-
def __init__(self, error_mode=ErrorMode.TruncTrace):
32+
def __init__(
33+
self,
34+
error_mode=ErrorMode.TruncTrace,
35+
cachedir: str = None,
36+
backup_cachedir: str = None
37+
):
3338
self.epss_data = None
3439
self.error_mode = error_mode
35-
self.cachedir = self.CACHEDIR
36-
self.backup_cachedir = self.BACKUPCACHEDIR
40+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
41+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
3742
self.epss_path = str(Path(self.cachedir) / "epss")
3843
self.file_name = os.path.join(self.epss_path, "epss_scores-current.csv")
3944
self.source_name = self.SOURCE

cve_bin_tool/data_sources/gad_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ class GAD_Source(Data_Source):
3434
GAD_API_URL = "https://gitlab.com/api/v4/projects/12006272/repository/tree"
3535

3636
def __init__(
37-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
37+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3838
):
39-
self.cachedir = self.CACHEDIR
39+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
4040
self.slugs = None
4141
self.gad_path = str(Path(self.cachedir) / "gad")
4242
self.source_name = self.SOURCE

cve_bin_tool/data_sources/nvd_source.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@ def __init__(
6666
nvd_type: str = "json-mirror",
6767
incremental_update: bool = False,
6868
nvd_api_key: str = "",
69+
cachedir: str = None,
70+
backup_cachedir: str = None,
6971
):
7072
if feed is None:
7173
self.feed = self.FEED_NVD if nvd_type == "json-nvd" else self.FEED_MIRROR
7274
else:
7375
self.feed = feed
74-
self.cachedir = self.CACHEDIR
75-
self.backup_cachedir = self.BACKUPCACHEDIR
76+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
77+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
7678
self.error_mode = error_mode
7779
self.source_name = self.SOURCE
7880

cve_bin_tool/data_sources/osv_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ class OSV_Source(Data_Source):
3030
OSV_GS_URL = "gs://osv-vulnerabilities/"
3131

3232
def __init__(
33-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
33+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3434
):
35-
self.cachedir = self.CACHEDIR
35+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3636
self.ecosystems = None
3737
self.osv_path = str(Path(self.cachedir) / "osv")
3838
self.source_name = self.SOURCE

cve_bin_tool/data_sources/purl2cpe_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ class PURL2CPE_Source(Data_Source):
2121
PURL2CPE_URL = "https://github.com/scanoss/purl2cpe/raw/main/purl2cpe.db.zip"
2222

2323
def __init__(
24-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
24+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str | None = None
2525
):
26-
self.cachedir = self.CACHEDIR
26+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
2727
self.purl2cpe_path = str(Path(self.cachedir) / "purl2cpe")
2828
self.source_name = self.SOURCE
2929
self.error_mode = error_mode

cve_bin_tool/data_sources/redhat_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ class REDHAT_Source(Data_Source):
2424
CVE_ENDPOINT = "/cve.json"
2525

2626
def __init__(
27-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
27+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None,
2828
):
29-
self.cachedir = self.CACHEDIR
29+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3030
self.redhat_path = str(Path(self.cachedir) / "redhat")
3131
self.source_name = self.SOURCE
3232

cve_bin_tool/data_sources/rsd_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class RSD_Source(Data_Source):
3232
RSD_API_URL = "https://gitlab.com/api/v4/projects/39314828/repository/tree"
3333

3434
def __init__(
35-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
35+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3636
):
37-
self.cachedir = self.CACHEDIR
37+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3838
self.rsd_path = str(Path(self.cachedir) / "rsd")
3939
self.source_name = self.SOURCE
4040

cve_bin_tool/helper_script.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
product_name: str | None = None,
3838
version_number: str | None = None,
3939
string_length: int = 40,
40+
cachedir: str = None,
4041
):
4142
self.extractor: TempDirExtractorContext = Extractor()
4243
self.product_name = product_name
@@ -45,7 +46,7 @@ def __init__(
4546

4647
# for setting the database
4748
self.connection = None
48-
self.dbpath = str(Path(DISK_LOCATION_DEFAULT) / DBNAME)
49+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
4950

5051
# for extraction
5152
self.walker = DirWalk().walk

cve_bin_tool/input_engine.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import Any, DefaultDict, Dict, Iterable, Set, Union
1717

1818
from cve_bin_tool.cvedb import CVEDB
19+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
1920
from cve_bin_tool.error_handler import (
2021
ErrorHandler,
2122
ErrorMode,
@@ -66,6 +67,7 @@ def __init__(
6667
logger: Logger = None,
6768
error_mode=ErrorMode.TruncTrace,
6869
filetype="autodetect",
70+
cachedir: str = None,
6971
):
7072
"""
7173
Initializes the InputEngine instance.
@@ -82,8 +84,9 @@ def __init__(
8284
self.error_mode = error_mode
8385
self.filetype = filetype
8486
self.parsed_data = defaultdict(dict)
87+
self.cachedir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
8588
# Connect to the database
86-
self.cvedb = CVEDB(version_check=False)
89+
self.cvedb = CVEDB(version_check=False, cachedir=self.cachedir)
8790

8891
def parse_input(self) -> DefaultDict[ProductInfo, TriageData]:
8992
"""

cve_bin_tool/merge.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(
3838
merge_files: list[str],
3939
logger: Logger | None = None,
4040
error_mode=ErrorMode.TruncTrace,
41-
cache_dir=DISK_LOCATION_DEFAULT,
41+
cachedir: str = None,
4242
score=0,
4343
filter_tag=[],
4444
):
@@ -52,7 +52,7 @@ def __init__(
5252
self.total_files = 0
5353
self.products_with_cve = 0
5454
self.products_without_cve = 0
55-
self.cache_dir = cache_dir
55+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
5656
self.merged_files = ["tag"]
5757
self.score = score
5858
self.filter_tag = filter_tag

cve_bin_tool/output_engine/__init__.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from pathlib import Path
1414

1515
from cve_bin_tool.cve_scanner import CVEData
16-
from cve_bin_tool.cvedb import CVEDB
16+
from cve_bin_tool.cvedb import CVEDB, DISK_LOCATION_DEFAULT
1717
from cve_bin_tool.error_handler import ErrorHandler, ErrorMode
1818
from cve_bin_tool.log import LOGGER
1919
from cve_bin_tool.output_engine.console import output_console
@@ -130,9 +130,10 @@ def output_pdf(
130130
exploits: bool = False,
131131
metrics: bool = False,
132132
all_product_data=None,
133+
cachedir: str = DISK_LOCATION_DEFAULT,
133134
):
134135
"""Output a PDF of CVEs"""
135-
cvedb_data = CVEDB()
136+
cvedb_data = CVEDB(cachedir=cachedir)
136137
db_date = time.strftime(
137138
"%d %B %Y at %H:%M:%S", time.localtime(cvedb_data.get_db_update_date())
138139
)
@@ -595,6 +596,7 @@ def output_pdf(
595596
affected_versions: int = 0,
596597
exploits: bool = False,
597598
all_product_data=None,
599+
cachedir: str = DISK_LOCATION_DEFAULT
598600
):
599601
"""Output a PDF of CVEs
600602
Required module: Reportlab not found"""
@@ -673,6 +675,7 @@ def __init__(
673675
vex_product_info: dict[str, str] = {},
674676
offline: bool = False,
675677
organized_arguements: dict = None,
678+
cachedir: str = DISK_LOCATION_DEFAULT
676679
):
677680
"""Constructor for OutputEngine class."""
678681
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
@@ -705,6 +708,7 @@ def __init__(
705708
self.vex_type = vex_type
706709
self.vex_product_info = vex_product_info
707710
self.vex_filename = vex_filename
711+
self.cachedir = cachedir
708712

709713
def output_cves(self, outfile, output_type="console"):
710714
"""Output a list of CVEs
@@ -752,6 +756,7 @@ def output_cves(self, outfile, output_type="console"):
752756
self.affected_versions,
753757
self.exploits,
754758
self.metrics,
759+
self.cachedir,
755760
)
756761
elif output_type == "html":
757762
output_html(

cve_bin_tool/package_list_parser.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import distro
1313

1414
from cve_bin_tool.cvedb import CVEDB
15+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
1516
from cve_bin_tool.error_handler import (
1617
EmptyTxtError,
1718
ErrorHandler,
@@ -35,6 +36,7 @@ def __init__(
3536
input_file: str,
3637
logger: Logger = LOGGER.getChild("PackageListParser"),
3738
error_mode=ErrorMode.TruncTrace,
39+
cachedir: str = None,
3840
) -> None:
3941
"""
4042
Initialize the PackageListParser object.
@@ -56,6 +58,7 @@ def __init__(
5658
self.parsed_data_with_vendor: Dict[Any, Any] = defaultdict(dict)
5759
self.package_names_with_vendor: List[Any] = []
5860
self.package_names_without_vendor: List[Any] = []
61+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT,
5962

6063
def parse_list(self):
6164
"""
@@ -147,7 +150,7 @@ def parse_list(self):
147150
if package_name in txt_package_names:
148151
self.package_names_without_vendor.append(installed_package)
149152

150-
cve_db = CVEDB()
153+
cve_db = CVEDB(cachedir=self.cache_dir)
151154
vendor_package_pairs = cve_db.get_vendor_product_pairs(
152155
self.package_names_without_vendor
153156
)

cve_bin_tool/parsers/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import sqlite3
7+
from pathlib import Path
78

89
from packageurl import PackageURL
910

@@ -41,14 +42,15 @@ class Parser:
4142
filename (str): The filename of the data to be processed.
4243
"""
4344

44-
def __init__(self, cve_db, logger):
45+
def __init__(self, cve_db, logger, cachedir : str = None):
4546
"""Initializes a Parser object."""
4647
self.cve_db = cve_db
4748
self.logger = logger
4849
self.filename = ""
4950
self.purl_pkg_type = "default"
5051
self.connection: sqlite3.Connection | None = None
5152
self.dbpath = DISK_LOCATION_DEFAULT / DBNAME
53+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
5254

5355
def run_checker(self, filename):
5456
"""

cve_bin_tool/sbom_manager/parse.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from packageurl import PackageURL
1515

1616
from cve_bin_tool.cvedb import CVEDB
17+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
1718
from cve_bin_tool.input_engine import TriageData
1819
from cve_bin_tool.log import LOGGER
1920
from cve_bin_tool.util import (
@@ -45,12 +46,15 @@ class SBOMParse:
4546

4647
sbom_data: defaultdict[ProductInfo, TriageData]
4748

49+
CACHEDIR = DISK_LOCATION_DEFAULT
50+
4851
def __init__(
4952
self,
5053
filename: str,
5154
sbom_type: str = "spdx",
5255
logger: Logger | None = None,
5356
validate: bool = True,
57+
cachedir: str = None,
5458
):
5559
self.filename = filename
5660
self.sbom_data = defaultdict(dict)
@@ -60,9 +64,10 @@ def __init__(
6064
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
6165
self.validate = validate
6266
self.serialNumber = ""
67+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
6368

6469
# Connect to the database
65-
self.cvedb = CVEDB(version_check=False)
70+
self.cvedb = CVEDB(version_check=False, cachedir=self.cachedir)
6671

6772
def parse_sbom(self) -> dict[ProductInfo, TriageData]:
6873
"""

cve_bin_tool/version_scanner.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from cve_bin_tool.checkers import BUILTIN_CHECKERS, Checker
1212
from cve_bin_tool.cvedb import CVEDB
13+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
1314
from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg
1415
from cve_bin_tool.error_handler import ErrorMode
1516
from cve_bin_tool.extractor import Extractor, TempDirExtractorContext
@@ -44,6 +45,7 @@ def __init__(
4445
score: int = 0,
4546
validate: bool = True,
4647
sources=None,
48+
cachedir: str = None,
4749
):
4850
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
4951
# Update egg if installed in development mode
@@ -66,7 +68,8 @@ def __init__(
6668
self.should_extract = should_extract
6769
self.file_stack: list[str] = []
6870
self.error_mode = error_mode
69-
self.cve_db = CVEDB(sources=sources)
71+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
72+
self.cve_db = CVEDB(sources=sources,cachedir=self.cache_dir)
7073
self.validate = validate
7174
# self.logger.info("Checkers loaded: %s" % (", ".join(self.checkers.keys())))
7275
self.language_checkers = self.available_language_checkers()

cve_bin_tool/version_signature.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class InvalidVersionSignatureTable(ValueError):
1818
class VersionSignatureDb:
1919
"""Methods for version signature data stored in sqlite"""
2020

21-
def __init__(self, table_name, mapping_function, duration) -> None:
21+
def __init__(self, table_name, mapping_function, duration,cachedir: str = None,) -> None:
2222
"""Set location on disk data cache will reside.
2323
Also sets the table name and refresh duration
2424
"""
@@ -28,7 +28,7 @@ def __init__(self, table_name, mapping_function, duration) -> None:
2828
self.table_name = table_name
2929
self.update_table_name = f"latest_update_{table_name}"
3030
self.mapping_function = mapping_function
31-
self.disk_location = DISK_LOCATION_DEFAULT
31+
self.disk_location = cachedir if cachedir is not None else DISK_LOCATION_DEFAULT
3232
self.duration = duration
3333
self.conn: sqlite3.Connection | None = None
3434
self.cursor: sqlite3.Cursor | None = None

0 commit comments

Comments
 (0)