From dd37680d5f97cd2e060e45195c7b61a0c9b5a047 Mon Sep 17 00:00:00 2001 From: Prabhu Subramanian Date: Sun, 24 Mar 2024 12:03:04 +0000 Subject: [PATCH] Search by sbom Signed-off-by: Prabhu Subramanian --- README.md | 5 ++- pyproject.toml | 2 +- vdb/cli.py | 84 +++++++++++++++++++++++++++++++++++++---------- vdb/lib/search.py | 15 +++++++-- 4 files changed, 84 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index b8adbb9..58847f2 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,10 @@ vdb --search "npm:gitblame:0.0.1" vdb --search CVE-2024-25169 # Search by git url -vdb --search "https://github.com/electron/electron +vdb --search "https://github.com/electron/electron" + +# Search by CycloneDX SBOM +vdb --bom bom.json ``` ## License diff --git a/pyproject.toml b/pyproject.toml index 413fd13..beb6048 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "appthreat-vulnerability-db" -version = "6.0.0rc2" +version = "6.0.0rc3" description = "AppThreat's vulnerability database and package search library with a built-in sqlite based storage. OSV, CVE, GitHub, npm are the primary sources of vulnerabilities." authors = [ {name = "Team AppThreat", email = "cloud@appthreat.com"}, diff --git a/vdb/cli.py b/vdb/cli.py index fc499ed..cca17ae 100644 --- a/vdb/cli.py +++ b/vdb/cli.py @@ -5,14 +5,17 @@ import logging import os import shutil +import types import orjson from rich.console import Console -from rich.syntax import Syntax +from rich.live import Live +from rich.markdown import Markdown from rich.table import Table from vdb.lib import config, db6 as db_lib, search from vdb.lib.aqua import AquaSource +from vdb.lib.cve_model import CVE from vdb.lib.gha import GitHubSource from vdb.lib.osv import OSVSource @@ -85,27 +88,62 @@ def build_args(): parser.add_argument( "--search", dest="search", - help="Search for the package or CVE ID in the database. Use purl, cpe, or colon-separated values.", + help="Search for the package or CVE ID in the database. Use purl, cpe, or git http url.", + ) + parser.add_argument( + "--bom", + dest="bom_file", + help="Search for packages in the CycloneDX BOM file.", ) return parser.parse_args() +def add_table_row(table: Table, res: dict, added_row_keys: dict): + # matched_by is the purl or cpe string + row_key = f"""{res["matched_by"]}|res.get("source_data_hash")""" + # Filter duplicate rows from getting printed + if added_row_keys.get(row_key): + return + source_data: CVE = res.get("source_data") + description = "" + if ( + source_data.root.containers.cna + and source_data.root.containers.cna.descriptions + and source_data.root.containers.cna.descriptions.root + ): + description = ( + source_data.root.containers.cna.descriptions.root[0] + .value.replace("\\n", "\n") + .replace("\\t", " ") + ) + table.add_row( + res.get("cve_id"), + res.get("matched_by"), + Markdown(description, justify="left", hyperlinks=True), + ) + added_row_keys[row_key] = True + + def print_results(results): - table = Table(title="VDB Results") + added_row_keys = {} + table = Table(title="VDB Results", show_lines=True) table.add_column("CVE", justify="left") - table.add_column("Type") - table.add_column("Namespace") - table.add_column("Name") - table.add_column("Hash") - table.add_column("Source Data") - for res in results: - table.add_row(res.get("cve_id"), res.get("type"), - res.get("namespace", ""), res.get("name"), - res.get("source_data_hash"), - Syntax(orjson.dumps( - res.get("source_data").model_dump(mode="json", exclude_none=True), - option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE).decode("utf-8", errors="ignore"), "json", word_wrap=True)) - console.print(table) + table.add_column("Locator") + table.add_column("Description") + if isinstance(results, types.GeneratorType): + with Live( + table, console=console, refresh_per_second=4, vertical_overflow="visible" + ): + for result_gen in results: + if isinstance(result_gen, dict): + add_table_row(table, result_gen, added_row_keys) + if isinstance(result_gen, types.GeneratorType): + for res in result_gen: + add_table_row(table, res, added_row_keys) + elif isinstance(results, list): + for res in results: + add_table_row(table, res) + console.print(table) def main(): @@ -133,12 +171,18 @@ def main(): LOG.info("Refreshing %s", s.__class__.__name__) s.refresh() cve_data_count, cve_index_count = db_lib.stats() - console.print("cve_data_count", cve_data_count, "cve_index_count", cve_index_count) + console.print( + "cve_data_count", cve_data_count, "cve_index_count", cve_index_count + ) db_lib.optimize_and_close_all() if args.search: if args.search.startswith("pkg:"): results = search.search_by_purl_like(args.search, with_data=True) - elif args.search.startswith("CVE-") or args.search.startswith("GHSA-") or args.search.startswith("MAL-"): + elif ( + args.search.startswith("CVE-") + or args.search.startswith("GHSA-") + or args.search.startswith("MAL-") + ): results = search.search_by_cve(args.search, with_data=True) elif args.search.startswith("http"): results = search.search_by_url(args.search, with_data=True) @@ -148,6 +192,10 @@ def main(): print_results(results) else: console.print("No results found!") + elif args.bom_file: + if os.path.exists(args.bom_file): + results_generator = search.search_by_cdx_bom(args.bom_file, with_data=True) + print_results(results_generator) if __name__ == "__main__": diff --git a/vdb/lib/search.py b/vdb/lib/search.py index 70a11a4..4102ba3 100644 --- a/vdb/lib/search.py +++ b/vdb/lib/search.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Generator import orjson @@ -27,7 +27,7 @@ def _filter_hits(raw_hits: list, compare_ver: str) -> list: def get_cve_data( db_conn, index_hits: list[dict, Any], search_str: str -) -> list[dict[str, str | CVE | None]]: +) -> Generator | list[dict[str, str | CVE | None]]: """Get CVE data for the index results Args: @@ -163,6 +163,17 @@ def search_by_url(url: str, with_data=False) -> list | None: return search_by_purl_like(purl_str, with_data) +def search_by_cdx_bom(bom_file: str, with_data=False) -> Generator: + """Search by CycloneDX BOM file""" + with open(bom_file, encoding="utf-8", mode="r") as fp: + cdx_obj = orjson.loads(fp.read()) + for component in cdx_obj.get("components"): + if component.get("purl"): + yield search_by_purl_like(component.get("purl"), with_data) + if component.get("cpe"): + yield search_by_cpe_like(component.get("cpe"), with_data) + + def exec_query(conn, query: str, args: tuple[str, ...]) -> list: res = conn.execute(query, args) return res.fetchall()