From d0622f696de7d8a171c000c22f3b9ad773d3d76f Mon Sep 17 00:00:00 2001 From: prabhu Date: Thu, 28 Mar 2024 22:15:19 +0000 Subject: [PATCH] Tweaks (#120) * Extract search by any from cli Signed-off-by: Prabhu Subramanian * Fix tests Signed-off-by: Prabhu Subramanian * Added usage to readme. list malware api Signed-off-by: Prabhu Subramanian --------- Signed-off-by: Prabhu Subramanian --- README.md | 49 +++++++++++++++++--- test/test_source.py | 108 ++++++++++++++++++++++++++------------------ vdb/cli.py | 28 ++++++------ vdb/lib/search.py | 34 ++++++++++++-- 4 files changed, 149 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 58847f2..56d6798 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,11 @@ This repo is a vulnerability database and package search for sources such as App A good vulnerability database must have the following properties: - Accuracy -- Easy to download, [integrate](./INTEGRATION.md), and use +- Easy to [download](#download-pre-built-database-recommended), [integrate](./INTEGRATION.md), and use - Performance Multiple upstream sources are used by vdb to improve accuracy and reduce false negatives. SQLite database containing data in CVE 5.0 schema format is precompiled and distributed as files via ghcr to simplify download. With automatic purl prefix generation even for git repos, searches on the database can be performed with purl, cpe, or even http git url string. Every row in the database uses an open specification such as CVE 5.0 or Package URL (purl and vers) thus preventing the possibility of vendor lock-in. -Freeloaders are welcome! - ## Vulnerability Data sources - Linux [vuln-list](https://github.com/appthreat/vuln-list) (Forked from AquaSecurity) @@ -40,8 +38,14 @@ Freeloaders are welcome! ## Installation -```bash -pip install appthreat-vulnerability-db +```shell +pip install appthreat-vulnerability-db>=6.0.0 +``` + +VDB v6 is a major rewrite to use sqlite database. Current users of depscan v5 must continue using version 5.6.x + +```shell +pip install appthreat-vulnerability-db==5.6.4 ``` ## Usage @@ -98,7 +102,27 @@ It is possible to customize the cache behavior by increasing the historic data p - NVD_START_YEAR - Default: 2018. Supports up to 2002 - GITHUB_PAGE_COUNT - Default: 2. Supports up to 20 -## CLI search +## Usage + +```shell +usage: vdb [-h] [--clean] [--cache] [--cache-os] [--only-osv] [--only-aqua] [--only-ghsa] [--search SEARCH] [--list-malware] [--bom BOM_FILE] + +AppThreat's vulnerability database and package search library with a built-in sqlite based storage. + +options: + -h, --help show this help message and exit + --clean Clear the vulnerability database cache from platform specific user_data_dir. + --cache Cache vulnerability information in platform specific user_data_dir. + --cache-os Cache OS vulnerability information in platform specific user_data_dir. + --only-osv Use only OSV as the source. Use with --cache. + --only-aqua Use only Aqua vuln-list as the source. Use with --cache. + --only-ghsa Use only recent ghsa as the source. Use with --cache. + --search SEARCH Search for the package or CVE ID in the database. Use purl, cpe, or git http url. + --list-malware List latest malwares with CVE ID beginning with MAL-. + --bom BOM_FILE Search for packages in the CycloneDX BOM file. +``` + +### CLI search It is possible to perform a range of searches using the cli. @@ -122,6 +146,9 @@ vdb --search "npm:gitblame:0.0.1" # Search by CVE id vdb --search CVE-2024-25169 +# Search with wildcard for CVE +vdb --search CVE-2024-% + # Search by git url vdb --search "https://github.com/electron/electron" @@ -129,6 +156,16 @@ vdb --search "https://github.com/electron/electron" vdb --bom bom.json ``` +### List recent malware + +```shell +vdb --list-malware +``` + ## License MIT + +## Discord support + +The developers could be reached via the [Discord](https://discord.gg/DCNxzaeUpd) channel for free and paid enterprise support. diff --git a/test/test_source.py b/test/test_source.py index e71f260..de0a7f1 100644 --- a/test/test_source.py +++ b/test/test_source.py @@ -389,7 +389,7 @@ def test_aqua_wolfi_json(): ) with open(test_cve_data, "r") as fp: return json.loads(fp.read()) - + def test_convert(test_cve_json): nvdlatest = NvdSource() @@ -402,15 +402,17 @@ def test_convert(test_cve_json): assert detail.severity assert detail.package assert detail.package_type - + db6.clear_all() nvdlatest.store(vulnerabilities) cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 496 assert cve_index_count == 1155 - results_count = len(list(search_db("CVE-2020-0001"))) + results_count = len(list(search.search_by_any("CVE-2020-0001"))) assert results_count == 4 - results_count = len(list(search_db("cpe:2.3:o:google:android:8.1:*:*:*:*:*:*:*"))) + results_count = len( + list(search.search_by_any("cpe:2.3:o:google:android:8.1:*:*:*:*:*:*:*")) + ) assert results_count == 25 cvesource = CVESource() @@ -442,9 +444,11 @@ def test_convert2(test_cve_wconfig_json): cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 2 assert cve_index_count == 4 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 4 - results_count = len(list(search_db("cpe:2.3:o:opensuse:leap:15.1:*:*:*:*:*:*:*"))) + results_count = len( + list(search.search_by_any("cpe:2.3:o:opensuse:leap:15.1:*:*:*:*:*:*:*")) + ) assert results_count == 1 cvesource = CVESource() @@ -458,8 +462,14 @@ def test_convert2(test_cve_wconfig_json): assert cve_index_count == 0 -def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_json3, test_nvd_api_json4, test_nvd_api_git_json): - #json1 +def test_nvd_api_convert( + test_nvd_api_json1, + test_nvd_api_json2, + test_nvd_api_json3, + test_nvd_api_json4, + test_nvd_api_git_json, +): + # json1 nvdlatest = NvdSource() vulnerabilities = nvdlatest.convert(test_nvd_api_json1) assert len(vulnerabilities) == 1 @@ -471,20 +481,22 @@ def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_js assert detail.package assert detail.package_type assert not detail.fixed_location - + db6.clear_all() nvdlatest.store(vulnerabilities) cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 4 assert cve_index_count == 20 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 - results_count = len(list(search_db("CVE-2024-0057"))) + results_count = len(list(search.search_by_any("CVE-2024-0057"))) assert results_count == 10 - results_count = len(list(search_db("cpe:2.3:a:microsoft:.net:*:*:*:*:*:*:*:*"))) + results_count = len( + list(search.search_by_any("cpe:2.3:a:microsoft:.net:*:*:*:*:*:*:*:*")) + ) assert results_count == 1 - #json2 + # json2 vulnerabilities = nvdlatest.convert(test_nvd_api_json2) assert len(vulnerabilities) == 1 cvesource = CVESource() @@ -496,12 +508,12 @@ def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_js cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 1 assert cve_index_count == 7 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 - results_count = len(list(search_db("CVE-2024-21312"))) + results_count = len(list(search.search_by_any("CVE-2024-21312"))) assert results_count == 7 - #json3 + # json3 vulnerabilities = nvdlatest.convert(test_nvd_api_json3) assert len(vulnerabilities) == 0 cve = cvesource.convert5(vulnerabilities) @@ -512,12 +524,12 @@ def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_js cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 0 assert cve_index_count == 0 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 - results_count = len(list(search_db("CVE-2024-23771"))) + results_count = len(list(search.search_by_any("CVE-2024-23771"))) assert results_count == 0 - #json4 + # json4 vulnerabilities = nvdlatest.convert(test_nvd_api_json4) assert len(vulnerabilities) == 1 @@ -526,14 +538,20 @@ def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_js cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 2 assert cve_index_count == 21 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 - results_count = len(list(search_db("CVE-2015-3192"))) + results_count = len(list(search.search_by_any("CVE-2015-3192"))) assert results_count == 21 - results_count = len(list(search_db("cpe:2.3:a:pivotal_software:spring_framework:3.2.0:*:*:*:*:*:*:*"))) + results_count = len( + list( + search.search_by_any( + "cpe:2.3:a:pivotal_software:spring_framework:3.2.0:*:*:*:*:*:*:*" + ) + ) + ) assert results_count == 2 - #git_json + # git_json vulnerabilities = nvdlatest.convert(test_nvd_api_git_json) assert len(vulnerabilities) == 1 assert len(vulnerabilities[0].details) == 2 @@ -543,11 +561,15 @@ def test_nvd_api_convert(test_nvd_api_json1, test_nvd_api_json2, test_nvd_api_js cve_data_count, cve_index_count = db6.stats() assert cve_data_count == 2 assert cve_index_count == 2 - results_count = len(list(search_db("CVE-2020-8022"))) + results_count = len(list(search.search_by_any("CVE-2020-8022"))) assert results_count == 0 - results_count = len(list(search_db("CVE-2023-52426"))) + results_count = len(list(search.search_by_any("CVE-2023-52426"))) assert results_count == 2 - results_count = len(list(search_db("cpe:2.3:a:libexpat_project:libexpat:*:*:*:*:*:*:*:*"))) + results_count = len( + list( + search.search_by_any("cpe:2.3:a:libexpat_project:libexpat:*:*:*:*:*:*:*:*") + ) + ) assert results_count == 1 @@ -561,8 +583,7 @@ def test_nvd_download(): @pytest.mark.skip(reason="This downloads and tests with live data") def test_download_all(): nvdlatest = NvdSource() - data = nvdlatest.download_all() - assert len(data) > 128000 + nvdlatest.download_all() @pytest.mark.skip(reason="This downloads and tests with live data") @@ -747,22 +768,19 @@ def test_wolfi_convert(test_aqua_cg_json, test_aqua_wolfi_json): def test_vuln_location(): - vl = VulnerabilityLocation.from_values("cpe:2.3:a:pivotal_software:spring_framework:3.2.0:*:*:*:*:*:*:*", "3.2.0", - "3.2.0", "", "") + vl = VulnerabilityLocation.from_values( + "cpe:2.3:a:pivotal_software:spring_framework:3.2.0:*:*:*:*:*:*:*", + "3.2.0", + "3.2.0", + "", + "", + ) assert vl.version == "3.2.0" - vl = VulnerabilityLocation.from_values("cpe:2.3:a:org.springframework:spring-web:*:*:*:*:*:*:*:*", "5.0.0.RC2", - "*", "", "5.0.0.RC3") + vl = VulnerabilityLocation.from_values( + "cpe:2.3:a:org.springframework:spring-web:*:*:*:*:*:*:*:*", + "5.0.0.RC2", + "*", + "", + "5.0.0.RC3", + ) assert vl.version == ">=5.0.0.RC2-<5.0.0.RC3" - - -def search_db(query): - if query.startswith("pkg:"): - results = search.search_by_purl_like(query, with_data=True) - elif query.startswith("CVE-") or query.startswith("GHSA-") or query.startswith("MAL-"): - results = search.search_by_cve(query, with_data=True) - elif query.startswith("http"): - results = search.search_by_url(query, with_data=True) - else: - results = search.search_by_cpe_like(query, with_data=True) - - return results \ No newline at end of file diff --git a/vdb/cli.py b/vdb/cli.py index cca17ae..e1e02f4 100644 --- a/vdb/cli.py +++ b/vdb/cli.py @@ -7,7 +7,6 @@ import shutil import types -import orjson from rich.console import Console from rich.live import Live from rich.markdown import Markdown @@ -90,6 +89,13 @@ def build_args(): dest="search", help="Search for the package or CVE ID in the database. Use purl, cpe, or git http url.", ) + parser.add_argument( + "--list-malware", + action="store_true", + default=False, + dest="list_malware", + help="List latest malwares with CVE ID beginning with MAL-.", + ) parser.add_argument( "--bom", dest="bom_file", @@ -100,7 +106,7 @@ def build_args(): def add_table_row(table: Table, res: dict, added_row_keys: dict): # matched_by is the purl or cpe string - row_key = f"""{res["matched_by"]}|res.get("source_data_hash")""" + row_key = f"""{res["matched_by"]}|{res.get("source_data_hash")}""" # Filter duplicate rows from getting printed if added_row_keys.get(row_key): return @@ -142,7 +148,7 @@ def print_results(results): add_table_row(table, res, added_row_keys) elif isinstance(results, list): for res in results: - add_table_row(table, res) + add_table_row(table, res, added_row_keys) console.print(table) @@ -176,18 +182,7 @@ def main(): ) db_lib.optimize_and_close_all() if args.search: - if args.search.startswith("pkg:"): - results = search.search_by_purl_like(args.search, with_data=True) - elif ( - args.search.startswith("CVE-") - or args.search.startswith("GHSA-") - or args.search.startswith("MAL-") - ): - results = search.search_by_cve(args.search, with_data=True) - elif args.search.startswith("http"): - results = search.search_by_url(args.search, with_data=True) - else: - results = search.search_by_cpe_like(args.search, with_data=True) + results = search.search_by_any(args.search, with_data=True) if results: print_results(results) else: @@ -196,6 +191,9 @@ def main(): if os.path.exists(args.bom_file): results_generator = search.search_by_cdx_bom(args.bom_file, with_data=True) print_results(results_generator) + elif args.list_malware: + results_generator = search.latest_malware(with_data=True) + print_results(results_generator) if __name__ == "__main__": diff --git a/vdb/lib/search.py b/vdb/lib/search.py index 1df6c08..3f7d5fb 100644 --- a/vdb/lib/search.py +++ b/vdb/lib/search.py @@ -65,6 +65,21 @@ def get_cve_data( } +def search_by_any(any_str: str, with_data: bool = False) -> list | None: + """Convenient method to search by a string""" + if any_str.startswith("pkg:"): + return search_by_purl_like(any_str, with_data) + if ( + any_str.startswith("CVE-") + or any_str.startswith("GHSA-") + or any_str.startswith("MAL-") + ): + return search_by_cve(any_str, with_data) + if any_str.startswith("http"): + return search_by_url(any_str, with_data) + return search_by_cpe_like(any_str, with_data) + + def search_by_cpe_like(cpe: str, with_data=False) -> list | None: """Search by CPE or colon-separate strings""" db_conn, index_conn = db6.get(read_only=True) @@ -106,22 +121,29 @@ def search_by_purl_like(purl: str, with_data=False) -> list | None: args = (purl_prefix,) raw_hits = exec_query( index_conn, - f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;", + "SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;", args, ) filtered_list = _filter_hits(raw_hits, version) if with_data: return get_cve_data(db_conn, filtered_list, purl) return filtered_list + return None -def search_by_cve(cve_id: str, with_data=False) -> list | None: +def search_by_cve(cve_id: str, with_data=False, with_limit=None) -> list | None: """Search by CVE""" db_conn, index_conn = db6.get(read_only=True) + filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?" + filter_part = f"{filter_part} ORDER BY cve_id DESC" + args = [cve_id] + if with_limit: + filter_part = f"{filter_part} LIMIT ?" + args.append(with_limit) raw_hits = exec_query( index_conn, - "SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where cve_id = ?", - (cve_id,), + f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where {filter_part}", + args, ) filtered_list = _filter_hits(raw_hits, "*") if with_data: @@ -157,6 +179,10 @@ def search_by_cdx_bom(bom_file: str, with_data=False) -> Generator: yield search_by_cpe_like(component.get("cpe"), with_data) +def latest_malware(with_limit=20, with_data=False) -> Generator: + yield search_by_cve("MAL-%", with_data=with_data, with_limit=with_limit) + + def exec_query(conn, query: str, args: tuple[str, ...]) -> list: res = conn.execute(query, args) return res.fetchall()