Skip to content

Commit

Permalink
Search by sbom
Browse files Browse the repository at this point in the history
Signed-off-by: Prabhu Subramanian <[email protected]>
  • Loading branch information
prabhu committed Mar 24, 2024
1 parent ce6eb90 commit dd37680
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 22 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ vdb --search "npm:gitblame:0.0.1"
vdb --search CVE-2024-25169

# Search by git url
vdb --search "https://github.com/electron/electron
vdb --search "https://github.com/electron/electron"

# Search by CycloneDX SBOM
vdb --bom bom.json
```

## License
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "appthreat-vulnerability-db"
version = "6.0.0rc2"
version = "6.0.0rc3"
description = "AppThreat's vulnerability database and package search library with a built-in sqlite based storage. OSV, CVE, GitHub, npm are the primary sources of vulnerabilities."
authors = [
{name = "Team AppThreat", email = "[email protected]"},
Expand Down
84 changes: 66 additions & 18 deletions vdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import logging
import os
import shutil
import types

import orjson
from rich.console import Console
from rich.syntax import Syntax
from rich.live import Live
from rich.markdown import Markdown
from rich.table import Table

from vdb.lib import config, db6 as db_lib, search
from vdb.lib.aqua import AquaSource
from vdb.lib.cve_model import CVE
from vdb.lib.gha import GitHubSource
from vdb.lib.osv import OSVSource

Expand Down Expand Up @@ -85,27 +88,62 @@ def build_args():
parser.add_argument(
"--search",
dest="search",
help="Search for the package or CVE ID in the database. Use purl, cpe, or colon-separated values.",
help="Search for the package or CVE ID in the database. Use purl, cpe, or git http url.",
)
parser.add_argument(
"--bom",
dest="bom_file",
help="Search for packages in the CycloneDX BOM file.",
)
return parser.parse_args()


def add_table_row(table: Table, res: dict, added_row_keys: dict):
# matched_by is the purl or cpe string
row_key = f"""{res["matched_by"]}|res.get("source_data_hash")"""
# Filter duplicate rows from getting printed
if added_row_keys.get(row_key):
return
source_data: CVE = res.get("source_data")
description = ""
if (
source_data.root.containers.cna
and source_data.root.containers.cna.descriptions
and source_data.root.containers.cna.descriptions.root
):
description = (
source_data.root.containers.cna.descriptions.root[0]
.value.replace("\\n", "\n")
.replace("\\t", " ")
)
table.add_row(
res.get("cve_id"),
res.get("matched_by"),
Markdown(description, justify="left", hyperlinks=True),
)
added_row_keys[row_key] = True


def print_results(results):
table = Table(title="VDB Results")
added_row_keys = {}
table = Table(title="VDB Results", show_lines=True)
table.add_column("CVE", justify="left")
table.add_column("Type")
table.add_column("Namespace")
table.add_column("Name")
table.add_column("Hash")
table.add_column("Source Data")
for res in results:
table.add_row(res.get("cve_id"), res.get("type"),
res.get("namespace", ""), res.get("name"),
res.get("source_data_hash"),
Syntax(orjson.dumps(
res.get("source_data").model_dump(mode="json", exclude_none=True),
option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE).decode("utf-8", errors="ignore"), "json", word_wrap=True))
console.print(table)
table.add_column("Locator")
table.add_column("Description")
if isinstance(results, types.GeneratorType):
with Live(
table, console=console, refresh_per_second=4, vertical_overflow="visible"
):
for result_gen in results:
if isinstance(result_gen, dict):
add_table_row(table, result_gen, added_row_keys)
if isinstance(result_gen, types.GeneratorType):
for res in result_gen:
add_table_row(table, res, added_row_keys)
elif isinstance(results, list):
for res in results:
add_table_row(table, res)
console.print(table)


def main():
Expand Down Expand Up @@ -133,12 +171,18 @@ def main():
LOG.info("Refreshing %s", s.__class__.__name__)
s.refresh()
cve_data_count, cve_index_count = db_lib.stats()
console.print("cve_data_count", cve_data_count, "cve_index_count", cve_index_count)
console.print(
"cve_data_count", cve_data_count, "cve_index_count", cve_index_count
)
db_lib.optimize_and_close_all()
if args.search:
if args.search.startswith("pkg:"):
results = search.search_by_purl_like(args.search, with_data=True)
elif args.search.startswith("CVE-") or args.search.startswith("GHSA-") or args.search.startswith("MAL-"):
elif (
args.search.startswith("CVE-")
or args.search.startswith("GHSA-")
or args.search.startswith("MAL-")
):
results = search.search_by_cve(args.search, with_data=True)
elif args.search.startswith("http"):
results = search.search_by_url(args.search, with_data=True)
Expand All @@ -148,6 +192,10 @@ def main():
print_results(results)
else:
console.print("No results found!")
elif args.bom_file:
if os.path.exists(args.bom_file):
results_generator = search.search_by_cdx_bom(args.bom_file, with_data=True)
print_results(results_generator)


if __name__ == "__main__":
Expand Down
15 changes: 13 additions & 2 deletions vdb/lib/search.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Generator

import orjson

Expand Down Expand Up @@ -27,7 +27,7 @@ def _filter_hits(raw_hits: list, compare_ver: str) -> list:

def get_cve_data(
db_conn, index_hits: list[dict, Any], search_str: str
) -> list[dict[str, str | CVE | None]]:
) -> Generator | list[dict[str, str | CVE | None]]:
"""Get CVE data for the index results
Args:
Expand Down Expand Up @@ -163,6 +163,17 @@ def search_by_url(url: str, with_data=False) -> list | None:
return search_by_purl_like(purl_str, with_data)


def search_by_cdx_bom(bom_file: str, with_data=False) -> Generator:
"""Search by CycloneDX BOM file"""
with open(bom_file, encoding="utf-8", mode="r") as fp:
cdx_obj = orjson.loads(fp.read())
for component in cdx_obj.get("components"):
if component.get("purl"):
yield search_by_purl_like(component.get("purl"), with_data)
if component.get("cpe"):
yield search_by_cpe_like(component.get("cpe"), with_data)


def exec_query(conn, query: str, args: tuple[str, ...]) -> list:
res = conn.execute(query, args)
return res.fetchall()

0 comments on commit dd37680

Please sign in to comment.