Skip to content

Commit 57585c0

Browse files
committed
cve_bin_tool: data_sources: Set cachedir and backupcachedir as parameter
The cachedir and backupcachedir was always on the default value for the data sources. For the cvedb is the cachedir configurable. With this patch the cve-bin-tool can run in different instances with a own cache for the CVE information. Additional the patch is a workaround for the cvedb access in line 441 purl2cpe_conn = sqlite3.connect(self.cachedir / "purl2cpe/purl2cpe.db"), which fails if cvedb has an other cachedir as the purl2cpe with the DISK_LOCATION_DEFAULT. Signed-off-by: Maik Otto <[email protected]>
1 parent 62941eb commit 57585c0

18 files changed

+67
-32
lines changed

cve_bin_tool/cve_scanner.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ class CVEScanner:
3131
all_cve_version_info: Dict[str, VersionInfo]
3232

3333
RANGE_UNSET: str = ""
34-
dbname: str = str(Path(DISK_LOCATION_DEFAULT) / DBNAME)
3534
CONSOLE: Console = Console(file=sys.stderr, theme=cve_theme)
3635
ALPHA_TO_NUM: Dict[str, int] = dict(zip(ascii_lowercase, range(26)))
36+
CACHEDIR = DISK_LOCATION_DEFAULT
3737

3838
def __init__(
3939
self,
@@ -46,6 +46,7 @@ def __init__(
4646
check_exploits: bool = False,
4747
exploits_list: List[str] = [],
4848
disabled_sources: List[str] = [],
49+
cachedir: str = None,
4950
):
5051
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
5152
self.error_mode = error_mode
@@ -61,6 +62,7 @@ def __init__(
6162
self.exploits_list = exploits_list
6263
self.disabled_sources = disabled_sources
6364
self.all_product_data = dict()
65+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
6466

6567
def get_cves(self, product_info: ProductInfo, triage_data: TriageData):
6668
"""Get CVEs against a specific version of a product.

cve_bin_tool/data_sources/curl_source.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,16 @@ class Curl_Source(Data_Source):
3131
LOGGER = LOGGER.getChild("CVEDB")
3232
DATA_SOURCE_LINK = "https://curl.se/docs/vuln.json"
3333

34-
def __init__(self, error_mode=ErrorMode.TruncTrace):
34+
def __init__(
35+
self,
36+
error_mode=ErrorMode.TruncTrace,
37+
cachedir: str = None,
38+
backup_cachedir: str = None
39+
):
3540
"""Initialize a Curl_Source instance. Args: error_mode (ErrorMode): The error mode to be used."""
3641
self.cve_list = None
37-
self.cachedir = self.CACHEDIR
38-
self.backup_cachedir = self.BACKUPCACHEDIR
42+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
43+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
3944
self.error_mode = error_mode
4045
self.session = None
4146
self.affected_data = None

cve_bin_tool/data_sources/epss_source.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,16 @@ class Epss_Source:
2929
LOGGER = logging.getLogger().getChild("CVEDB")
3030
DATA_SOURCE_LINK = "https://epss.cyentia.com/epss_scores-current.csv.gz"
3131

32-
def __init__(self, error_mode=ErrorMode.TruncTrace):
32+
def __init__(
33+
self,
34+
error_mode=ErrorMode.TruncTrace,
35+
cachedir: str = None,
36+
backup_cachedir: str = None
37+
):
3338
self.epss_data = None
3439
self.error_mode = error_mode
35-
self.cachedir = self.CACHEDIR
36-
self.backup_cachedir = self.BACKUPCACHEDIR
40+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
41+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
3742
self.epss_path = str(Path(self.cachedir) / "epss")
3843
self.file_name = os.path.join(self.epss_path, "epss_scores-current.csv")
3944
self.source_name = self.SOURCE

cve_bin_tool/data_sources/gad_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ class GAD_Source(Data_Source):
3434
GAD_API_URL = "https://gitlab.com/api/v4/projects/12006272/repository/tree"
3535

3636
def __init__(
37-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
37+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3838
):
39-
self.cachedir = self.CACHEDIR
39+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
4040
self.slugs = None
4141
self.gad_path = str(Path(self.cachedir) / "gad")
4242
self.source_name = self.SOURCE

cve_bin_tool/data_sources/nvd_source.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@ def __init__(
6666
nvd_type: str = "json-mirror",
6767
incremental_update: bool = False,
6868
nvd_api_key: str = "",
69+
cachedir: str = None,
70+
backup_cachedir: str = None,
6971
):
7072
if feed is None:
7173
self.feed = self.FEED_NVD if nvd_type == "json-nvd" else self.FEED_MIRROR
7274
else:
7375
self.feed = feed
74-
self.cachedir = self.CACHEDIR
75-
self.backup_cachedir = self.BACKUPCACHEDIR
76+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
77+
self.backup_cachedir = Path(backup_cachedir) if backup_cachedir is not None else self.BACKUPCACHEDIR
7678
self.error_mode = error_mode
7779
self.source_name = self.SOURCE
7880

cve_bin_tool/data_sources/osv_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ class OSV_Source(Data_Source):
3030
OSV_GS_URL = "gs://osv-vulnerabilities/"
3131

3232
def __init__(
33-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
33+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3434
):
35-
self.cachedir = self.CACHEDIR
35+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3636
self.ecosystems = None
3737
self.osv_path = str(Path(self.cachedir) / "osv")
3838
self.source_name = self.SOURCE

cve_bin_tool/data_sources/purl2cpe_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ class PURL2CPE_Source(Data_Source):
2121
PURL2CPE_URL = "https://github.com/scanoss/purl2cpe/raw/main/purl2cpe.db.zip"
2222

2323
def __init__(
24-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
24+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str | None = None
2525
):
26-
self.cachedir = self.CACHEDIR
26+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
2727
self.purl2cpe_path = str(Path(self.cachedir) / "purl2cpe")
2828
self.source_name = self.SOURCE
2929
self.error_mode = error_mode

cve_bin_tool/data_sources/redhat_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ class REDHAT_Source(Data_Source):
2424
CVE_ENDPOINT = "/cve.json"
2525

2626
def __init__(
27-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
27+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None,
2828
):
29-
self.cachedir = self.CACHEDIR
29+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3030
self.redhat_path = str(Path(self.cachedir) / "redhat")
3131
self.source_name = self.SOURCE
3232

cve_bin_tool/data_sources/rsd_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class RSD_Source(Data_Source):
3232
RSD_API_URL = "https://gitlab.com/api/v4/projects/39314828/repository/tree"
3333

3434
def __init__(
35-
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False
35+
self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False, cachedir: str = None
3636
):
37-
self.cachedir = self.CACHEDIR
37+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
3838
self.rsd_path = str(Path(self.cachedir) / "rsd")
3939
self.source_name = self.SOURCE
4040

cve_bin_tool/helper_script.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
product_name: str | None = None,
3838
version_number: str | None = None,
3939
string_length: int = 40,
40+
cachedir: str = None,
4041
):
4142
self.extractor: TempDirExtractorContext = Extractor()
4243
self.product_name = product_name
@@ -45,7 +46,7 @@ def __init__(
4546

4647
# for setting the database
4748
self.connection = None
48-
self.dbpath = str(Path(DISK_LOCATION_DEFAULT) / DBNAME)
49+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
4950

5051
# for extraction
5152
self.walker = DirWalk().walk

cve_bin_tool/input_engine.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
)
2626
from cve_bin_tool.log import LOGGER
2727
from cve_bin_tool.util import ProductInfo, Remarks
28+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
2829

2930
# TriageData is dictionary of cve_number mapped to dictionary of remarks, comments and custom severity
3031
TriageData = Dict[str, Union[Dict[str, Any], Set[str]]]
@@ -66,6 +67,7 @@ def __init__(
6667
logger: Logger = None,
6768
error_mode=ErrorMode.TruncTrace,
6869
filetype="autodetect",
70+
cachedir: str = None,
6971
):
7072
"""
7173
Initializes the InputEngine instance.
@@ -82,8 +84,9 @@ def __init__(
8284
self.error_mode = error_mode
8385
self.filetype = filetype
8486
self.parsed_data = defaultdict(dict)
87+
self.cachedir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
8588
# Connect to the database
86-
self.cvedb = CVEDB(version_check=False)
89+
self.cvedb = CVEDB(version_check=False, cachedir=self.cachedir)
8790

8891
def parse_input(self) -> DefaultDict[ProductInfo, TriageData]:
8992
"""

cve_bin_tool/merge.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(
3838
merge_files: list[str],
3939
logger: Logger | None = None,
4040
error_mode=ErrorMode.TruncTrace,
41-
cache_dir=DISK_LOCATION_DEFAULT,
41+
cachedir: str = None,
4242
score=0,
4343
filter_tag=[],
4444
):
@@ -52,7 +52,7 @@ def __init__(
5252
self.total_files = 0
5353
self.products_with_cve = 0
5454
self.products_without_cve = 0
55-
self.cache_dir = cache_dir
55+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
5656
self.merged_files = ["tag"]
5757
self.score = score
5858
self.filter_tag = filter_tag

cve_bin_tool/output_engine/__init__.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from cve_bin_tool.sbom_manager.generate import SBOMGenerate
3535
from cve_bin_tool.version import VERSION
3636
from cve_bin_tool.vex_manager.generate import VEXGenerate
37-
37+
from cve_bin_tool.cvedb import DISK_LOCATION_DEFAULT
3838

3939
def save_intermediate(
4040
all_cve_data: dict[ProductInfo, CVEData],
@@ -130,9 +130,10 @@ def output_pdf(
130130
exploits: bool = False,
131131
metrics: bool = False,
132132
all_product_data=None,
133+
cachedir: str = DISK_LOCATION_DEFAULT,
133134
):
134135
"""Output a PDF of CVEs"""
135-
cvedb_data = CVEDB()
136+
cvedb_data = CVEDB(cachedir=cachedir)
136137
db_date = time.strftime(
137138
"%d %B %Y at %H:%M:%S", time.localtime(cvedb_data.get_db_update_date())
138139
)
@@ -595,6 +596,7 @@ def output_pdf(
595596
affected_versions: int = 0,
596597
exploits: bool = False,
597598
all_product_data=None,
599+
cachedir: str = DISK_LOCATION_DEFAULT
598600
):
599601
"""Output a PDF of CVEs
600602
Required module: Reportlab not found"""
@@ -673,6 +675,7 @@ def __init__(
673675
vex_product_info: dict[str, str] = {},
674676
offline: bool = False,
675677
organized_arguements: dict = None,
678+
cachedir: str = DISK_LOCATION_DEFAULT
676679
):
677680
"""Constructor for OutputEngine class."""
678681
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
@@ -705,6 +708,7 @@ def __init__(
705708
self.vex_type = vex_type
706709
self.vex_product_info = vex_product_info
707710
self.vex_filename = vex_filename
711+
self.cachedir = cachedir
708712

709713
def output_cves(self, outfile, output_type="console"):
710714
"""Output a list of CVEs
@@ -752,6 +756,7 @@ def output_cves(self, outfile, output_type="console"):
752756
self.affected_versions,
753757
self.exploits,
754758
self.metrics,
759+
self.cachedir,
755760
)
756761
elif output_type == "html":
757762
output_html(

cve_bin_tool/package_list_parser.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from cve_bin_tool.log import LOGGER
2222
from cve_bin_tool.util import ProductInfo, Remarks
23+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
2324

2425
ROOT_PATH = Path((__file__), "..").parent.absolute()
2526

@@ -35,6 +36,7 @@ def __init__(
3536
input_file: str,
3637
logger: Logger = LOGGER.getChild("PackageListParser"),
3738
error_mode=ErrorMode.TruncTrace,
39+
cachedir: str = None,
3840
) -> None:
3941
"""
4042
Initialize the PackageListParser object.
@@ -56,6 +58,7 @@ def __init__(
5658
self.parsed_data_with_vendor: Dict[Any, Any] = defaultdict(dict)
5759
self.package_names_with_vendor: List[Any] = []
5860
self.package_names_without_vendor: List[Any] = []
61+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT,
5962

6063
def parse_list(self):
6164
"""
@@ -147,7 +150,7 @@ def parse_list(self):
147150
if package_name in txt_package_names:
148151
self.package_names_without_vendor.append(installed_package)
149152

150-
cve_db = CVEDB()
153+
cve_db = CVEDB(cachedir=self.cache_dir)
151154
vendor_package_pairs = cve_db.get_vendor_product_pairs(
152155
self.package_names_without_vendor
153156
)

cve_bin_tool/parsers/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from cve_bin_tool.cvedb import DBNAME, DISK_LOCATION_DEFAULT
1111
from cve_bin_tool.error_handler import CVEDBError
1212
from cve_bin_tool.util import ProductInfo, ScanInfo, decode_cpe23
13+
from pathlib import Path
1314

1415
__all__ = [
1516
"parse",
@@ -41,14 +42,15 @@ class Parser:
4142
filename (str): The filename of the data to be processed.
4243
"""
4344

44-
def __init__(self, cve_db, logger):
45+
def __init__(self, cve_db, logger, cachedir : str = None):
4546
"""Initializes a Parser object."""
4647
self.cve_db = cve_db
4748
self.logger = logger
4849
self.filename = ""
4950
self.purl_pkg_type = "default"
5051
self.connection: sqlite3.Connection | None = None
5152
self.dbpath = DISK_LOCATION_DEFAULT / DBNAME
53+
self.dbname = str(Path(cachedir) / DBNAME) if cachedir is not None else DISK_LOCATION_DEFAULT
5254

5355
def run_checker(self, filename):
5456
"""

cve_bin_tool/sbom_manager/parse.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
validate_serialNumber,
2727
)
2828
from cve_bin_tool.validator import validate_cyclonedx, validate_spdx, validate_swid
29-
29+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
3030

3131
class SBOMParse:
3232
"""
@@ -45,12 +45,15 @@ class SBOMParse:
4545

4646
sbom_data: defaultdict[ProductInfo, TriageData]
4747

48+
CACHEDIR = DISK_LOCATION_DEFAULT
49+
4850
def __init__(
4951
self,
5052
filename: str,
5153
sbom_type: str = "spdx",
5254
logger: Logger | None = None,
5355
validate: bool = True,
56+
cachedir: str = None,
5457
):
5558
self.filename = filename
5659
self.sbom_data = defaultdict(dict)
@@ -60,9 +63,10 @@ def __init__(
6063
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
6164
self.validate = validate
6265
self.serialNumber = ""
66+
self.cachedir = Path(cachedir) if cachedir is not None else self.CACHEDIR
6367

6468
# Connect to the database
65-
self.cvedb = CVEDB(version_check=False)
69+
self.cvedb = CVEDB(version_check=False, cachedir=self.cachedir)
6670

6771
def parse_sbom(self) -> dict[ProductInfo, TriageData]:
6872
"""

cve_bin_tool/version_scanner.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from cve_bin_tool.parsers.parse import available_parsers, parse, valid_files
1919
from cve_bin_tool.strings import parse_strings
2020
from cve_bin_tool.util import DirWalk, ProductInfo, ScanInfo, inpath
21+
from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT
2122

2223
if sys.version_info >= (3, 10):
2324
from importlib import metadata as importlib_metadata
@@ -44,6 +45,7 @@ def __init__(
4445
score: int = 0,
4546
validate: bool = True,
4647
sources=None,
48+
cachedir: str = None,
4749
):
4850
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
4951
# Update egg if installed in development mode
@@ -66,7 +68,8 @@ def __init__(
6668
self.should_extract = should_extract
6769
self.file_stack: list[str] = []
6870
self.error_mode = error_mode
69-
self.cve_db = CVEDB(sources=sources)
71+
self.cache_dir = Path(cachedir) if cachedir is not None else DISK_LOCATION_DEFAULT
72+
self.cve_db = CVEDB(sources=sources,cachedir=self.cache_dir)
7073
self.validate = validate
7174
# self.logger.info("Checkers loaded: %s" % (", ".join(self.checkers.keys())))
7275
self.language_checkers = self.available_language_checkers()

cve_bin_tool/version_signature.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class InvalidVersionSignatureTable(ValueError):
1818
class VersionSignatureDb:
1919
"""Methods for version signature data stored in sqlite"""
2020

21-
def __init__(self, table_name, mapping_function, duration) -> None:
21+
def __init__(self, table_name, mapping_function, duration,cachedir: str = None,) -> None:
2222
"""Set location on disk data cache will reside.
2323
Also sets the table name and refresh duration
2424
"""
@@ -28,7 +28,7 @@ def __init__(self, table_name, mapping_function, duration) -> None:
2828
self.table_name = table_name
2929
self.update_table_name = f"latest_update_{table_name}"
3030
self.mapping_function = mapping_function
31-
self.disk_location = DISK_LOCATION_DEFAULT
31+
self.disk_location = cachedir if cachedir is not None else DISK_LOCATION_DEFAULT
3232
self.duration = duration
3333
self.conn: sqlite3.Connection | None = None
3434
self.cursor: sqlite3.Cursor | None = None

0 commit comments

Comments
 (0)