From 2082165a8a40d36cb68d53515deb9a818084c79d Mon Sep 17 00:00:00 2001 From: Prabhu Subramanian Date: Wed, 20 Mar 2024 19:58:08 +0000 Subject: [PATCH] Tweaks Signed-off-by: Prabhu Subramanian Improve git url detection for generic purls Signed-off-by: Prabhu Subramanian --- README.md | 9 ++- contrib/cpe_research.py | 106 +++++++++++++++++++++------ vdb/cli.py | 2 +- vdb/lib/aqua.py | 22 +++--- vdb/lib/config.py | 1 - vdb/lib/cve.py | 2 +- vdb/lib/nvd.py | 158 +++++++++++++++++++++++++++++++++++----- vdb/lib/search.py | 90 +++++++++++++++-------- 8 files changed, 300 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index 413c13b..445b294 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Introduction -This repo is a vulnerability database and package search for sources such as AppThreat vuln-list, OSV, NVD, and GitHub. Vulnerability data are downloaded from the sources and stored in a sqlite based storage with indexes to allow offline access and quick searches. +This repo is a vulnerability database and package search for sources such as AppThreat vuln-list, OSV, NVD, and GitHub. Vulnerability data are downloaded from the sources and stored in a sqlite based storage with indexes to allow offline access and efficient searches. ## Vulnerability Data sources @@ -86,13 +86,16 @@ It is possible to customize the cache behavior by increasing the historic data p - NVD_START_YEAR - Default: 2018. Supports up to 2002 - GITHUB_PAGE_COUNT - Default: 2. Supports up to 20 -### Basic search +## CLI search -It is possible to perform a simple search using the cli. +It is possible to perform a range of searches using the cli. ```shell vdb --search pkg:pypi/xml2dict@0.2.2 +# Search based on a purl prefix +vdb --search pkg:pypi/xml2dict + # Full url and short form for swift vdb --search "pkg:swift/github.com/vapor/vapor@4.39.0" diff --git a/contrib/cpe_research.py b/contrib/cpe_research.py index 4d96759..c2a7ff2 100644 --- a/contrib/cpe_research.py +++ b/contrib/cpe_research.py @@ -2,10 +2,16 @@ import apsw import orjson +from rich.console import Console +from rich.live import Live +from rich.table import Table -from vdb.lib import KNOWN_PKG_TYPES, db6 +from vdb.lib import KNOWN_PKG_TYPES, db6, CPE_FULL_REGEX from vdb.lib.cve_model import CVE, CVE1 +console = Console(markup=False, highlight=False, emoji=False) + +purl_proposal_cache = {} def get_cve_data(db_conn, index_hits: list[dict, Any]): """Get CVE data for the index results @@ -22,42 +28,98 @@ def get_cve_data(db_conn, index_hits: list[dict, Any]): db_conn, _ = db6.get(read_only=True) for ahit in index_hits: results: apsw.Cursor = db_conn.execute( - "SELECT cve_id, type, namespace, name, json_object('source', source_data) FROM cve_data WHERE cve_id = ? AND type = ? ORDER BY cve_id DESC;", + "SELECT distinct json_object('source', source_data) FROM cve_data WHERE cve_id = ? AND type = ? ORDER BY cve_id DESC;", (ahit[0], ahit[1]), ) for res in results: yield { - "cve_id": res[0], - "type": res[1], - "namespace": res[2], - "name": res[3], "purl_prefix": ahit[-1], "source_data": ( CVE( root=CVE1.model_validate( - orjson.loads(res[4])["source"], strict=False + orjson.loads(res[0])["source"], strict=False ) ) - if res[4] + if res[0] else None - ) + ), } -def get_unmapped_namespaces() -> list: - """Get a list of namespaces without a precise purl prefix""" +def propose_pseudo_purls() -> list: + """Get a list of namespaces without a precise purl prefix and propose a pseudo purls""" db_conn, index_conn = db6.get(read_only=True) - raw_hits = index_conn.execute(f"""select distinct cve_id, type, namespace, name, purl_prefix from cve_index where type not in ({', '.join([f"'{p}'" for p in KNOWN_PKG_TYPES])})""") - for ahit in raw_hits: - data_list_gen = get_cve_data(db_conn, [ahit]) - for data_list in data_list_gen: - source_data: CVE1 = data_list["source_data"].root - affected = source_data.containers.cna.affected.root - cpes = [", ".join([b.root for b in a.cpes]) for a in affected] - references = source_data.containers.cna.references.root - ref_urls = [str(a.url.root) for a in references] - print(data_list["cve_id"], data_list["type"], data_list["namespace"], data_list["name"], data_list["purl_prefix"], cpes, ref_urls) + ptypes = KNOWN_PKG_TYPES + # These vendors are causing noise and slow-downs + ptypes.extend( + [ + "oracle", + "microsoft", + "adobe", + "f5", + "dell", + "cisco", + "symantec", + "gigabyte", + "mozilla", + "wireshark", + "schneider-electric", + "ibm", + "fujitsu", + "apple", + "netapp", + "synology", + "citrix", + ] + ) + raw_hits = index_conn.execute( + f"""select distinct cve_id, type, namespace, name, purl_prefix from cve_index where type not in ({', '.join([f"'{p}'" for p in ptypes])})""" + ) + table = Table(title="Results", highlight=False, show_lines=True) + table.add_column("PURL prefix") + table.add_column("CPEs") + table.add_column("References") + with Live( + table, console=console, refresh_per_second=4, vertical_overflow="visible" + ): + for ahit in raw_hits: + data_list_gen = get_cve_data(db_conn, [ahit]) + for data_list in data_list_gen: + source_data: CVE1 = data_list["source_data"].root + if not source_data.containers.cna.references: + continue + references = source_data.containers.cna.references.root + ref_urls = [ + str(a.url.root).lower() + for a in references + if "git" in str(a.url.root).lower() + ] + if not ref_urls: + continue + purl_prefix = data_list["purl_prefix"] + affected = source_data.containers.cna.affected.root + cpes = ["\n".join([b.root for b in a.cpes]) for a in affected] + generic_cpes = [ + acpe for acpe in cpes if acpe.startswith("cpe:2.3:a:generic") + ] + proposed_purls = [] + for generic_cpe in generic_cpes: + all_parts = CPE_FULL_REGEX.match(generic_cpe) + proposed_purl = f"pkg:generic/{all_parts.group('package')}" + version = all_parts.group("version") + if version and version != "*": + proposed_purl = f"{proposed_purl}@{version}" + proposed_purls.append(proposed_purl) + if proposed_purls: + purl_proposal_cache[purl_prefix] = proposed_purls + elif purl_proposal_cache.get(purl_prefix): + proposed_purls = purl_proposal_cache[purl_prefix] + table.add_row( + purl_prefix + "\n" + "\n".join(proposed_purls), + cpes[0], + "\n".join(ref_urls), + ) if __name__ == "__main__": - get_unmapped_namespaces() + propose_pseudo_purls() diff --git a/vdb/cli.py b/vdb/cli.py index 1d1bcf6..42ecb62 100644 --- a/vdb/cli.py +++ b/vdb/cli.py @@ -25,7 +25,7 @@ for _ in ("httpx",): logging.getLogger(_).disabled = True -AT_LOGO = """ +AT_LOGO = r""" ___ /\ ._ ._ | |_ ._ _ _. _|_ /--\ |_) |_) | | | | (/_ (_| |_ diff --git a/vdb/lib/aqua.py b/vdb/lib/aqua.py index c59fc7d..8b014a9 100644 --- a/vdb/lib/aqua.py +++ b/vdb/lib/aqua.py @@ -67,28 +67,28 @@ def fetch(self, url): return [] def convert(self, cve_data): + if cve_data.get("vulnStatus"): + return self.nvd_api_to_vuln(cve_data) if cve_data.get("updateinfo_id"): return self.alsa_to_vuln(cve_data) - elif cve_data.get("id", "").startswith("ALAS"): + if cve_data.get("id", "").startswith("ALAS"): return self.alas_rlsa_to_vuln(cve_data, "amazon") - elif cve_data.get("id", "").startswith("RLSA"): + if cve_data.get("id", "").startswith("RLSA"): return self.alas_rlsa_to_vuln(cve_data, "rocky") - elif cve_data.get("Candidate"): + if cve_data.get("Candidate"): return self.ubuntu_to_vuln(cve_data) - elif cve_data.get("affected_release"): + if cve_data.get("affected_release"): return self.redhat_to_vuln(cve_data) - elif cve_data.get("name", "").startswith("AVG"): + if cve_data.get("name", "").startswith("AVG"): return self.arch_to_vuln(cve_data) - elif cve_data.get("Tracking"): + if cve_data.get("Tracking"): return self.suse_to_vuln(cve_data) - elif cve_data.get("os_version"): + if cve_data.get("os_version"): return self.photon_to_vuln(cve_data) - elif cve_data.get("Annotations") and cve_data.get("Header"): + if cve_data.get("Annotations") and cve_data.get("Header"): return self.debian_to_vuln(cve_data) - elif cve_data.get("secfixes"): + if cve_data.get("secfixes"): return self.wolfi_to_vuln(cve_data) - elif cve_data.get("vulnStatus"): - return self.nvd_api_to_vuln(cve_data) return [] @staticmethod diff --git a/vdb/lib/config.py b/vdb/lib/config.py index 510d9aa..47fef62 100644 --- a/vdb/lib/config.py +++ b/vdb/lib/config.py @@ -135,7 +135,6 @@ "docker", "oci", "container", - "generic", "qpkg", "buildroot", "coreos", diff --git a/vdb/lib/cve.py b/vdb/lib/cve.py index 1011512..6f3cb71 100644 --- a/vdb/lib/cve.py +++ b/vdb/lib/cve.py @@ -172,7 +172,7 @@ def to_cve_affected(avuln: Vulnerability) -> Affected | None: # Similar to purl type vendor = parts.group("vendor") # Similar to purl namespace - product = parts.group("package") + product = parts.group("package").removesuffix("\\").removesuffix("!") # Similar to purl name package_name = parts.group("package") if "/" in product: diff --git a/vdb/lib/nvd.py b/vdb/lib/nvd.py index eea9a3e..46074fd 100644 --- a/vdb/lib/nvd.py +++ b/vdb/lib/nvd.py @@ -1,6 +1,8 @@ import datetime import gzip import logging +from collections import defaultdict +from urllib.parse import parse_qs, urlparse import httpx import orjson @@ -27,6 +29,8 @@ # Size of the stream to read and write to the file DOWNLOAD_CHUNK_SIZE = 128 +purl_proposal_cache = defaultdict(list) + def get_version(inc_version: str, exc_version: str) -> str: """ @@ -40,6 +44,116 @@ def get_version(inc_version: str, exc_version: str) -> str: return exc_version +def filterable_git_url(url: str, hostname: str) -> bool: + if "git" not in hostname: + return True + for part in ( + "cve", + "disclosure", + "secdb", + "research", + "exploit", + "security-advisory", + "advisories", + "bulletins", + "pocs", + "_poc", + "/poc", + "0day", + "vulnerabilit", + "xss", + "cisagov", + "-post/", + "_posts", + ".pdf", + "covering360", + "fuzz", + "-csrf", + "advisory-db", + "defcon", + "audit-", + "announcements", + "divide-by-zero", + "security-research", + "apidoc", + "-query-help", + "/blog", + "/news", + "/support/", + "/bug_report", + "nu11secur1ty" + ): + if part in url.lower(): + return True + for part in ("github.io", + "gist.github.com", + "about.gitlab.com", + "lists.apache.org", + "gitbooks.io", + "githubusercontent.com", + "enterprise.github.com", + "git-scm.com", + "docs." + ): + if part in hostname.lower(): + return True + return False + + +def get_alt_cpes(cpe_uri, git_urls): + alt_cpes = [] + parsed_git_repo_names: dict[str, bool] = {} + # Try to extract any git references from related urls + # See: https://github.com/AppThreat/vulnerability-db/issues/91 + for agit_url in git_urls: + url_obj = urlparse(agit_url) + # Ignore obvious filterable urls + if filterable_git_url(agit_url, url_obj.hostname) or (not url_obj.path and not url_obj.query): + continue + git_repo_name = url_obj.hostname + if url_obj.path: + paths = [ + p + for p in url_obj.path.split("/") + if p and p not in ("/", "pub", "scm", "cgi-bin", "cgit", "gitweb") + ] + if paths: + max_path = 2 if len(paths) >= 2 else 1 + git_repo_name = f"""{git_repo_name}/{'/'.join(paths[:max_path])}""" + if url_obj.query: + query_obj = parse_qs(url_obj.query) + # Eg: https://git.eyrie.org/?p=kerberos/remctl.git%3Ba=commit%3Bh=86c7e4 + if query_obj.get("p"): + git_repo_name = f"""{git_repo_name.removesuffix("/")}/{query_obj.get("p")[0].split(";")[0].removeprefix("/")}""" + git_repo_name = ( + git_repo_name.removeprefix("https://") + .removeprefix("http://") + .removeprefix("git@") + .removeprefix("www.") + .removesuffix("/") + .removesuffix("-") + .removesuffix("/commit") + .removesuffix(".git") + ) + if not parsed_git_repo_names.get(git_repo_name): + # Filter repo names without a path + # eg: github.com + url_obj = urlparse(git_repo_name) + if not url_obj.path: + continue + parsed_git_repo_names[git_repo_name] = True + # We only need 2 new aliases + if len(purl_proposal_cache.get(cpe_uri, [])) > 2: + purl_proposal_cache[cpe_uri].pop(0) + purl_proposal_cache[cpe_uri].append( + f"cpe:2.3:a:generic:{git_repo_name}:*:*:*:*:*:*:*:*" + ) + # See if there is something useful in the cache + if not alt_cpes: + alt_cpes = purl_proposal_cache.get(cpe_uri, []) + return alt_cpes + + class NvdSource(CVESource): """NVD CVE source. This uses CVE json 1.1 format that are split based on the year""" @@ -228,7 +342,8 @@ def convert_api_vuln_detail(vuln: dict) -> list[VulnerabilityDetail] | None: details = [] rdata = vuln.get("references", []) git_urls = [r["url"].lower() for r in rdata if "git" in r["url"]] - parsed_git_repo_names: dict[str, bool] = {} + # Alternative CPEs identified for the given vulnerability detail + vuln_alt_cpes = {} for aconfig in config_list: cpe_list = [] nodes = aconfig.get("nodes", []) @@ -249,14 +364,24 @@ def convert_api_vuln_detail(vuln: dict) -> list[VulnerabilityDetail] | None: for cpe in cpe_list: cpe_uri = cpe["criteria"] # Ignore os and hardware vulnerabilities from nvd - if cpe_uri and cpe_uri.startswith("cpe:2.3:o") or cpe_uri.startswith("cpe:2.3:h"): + if ( + cpe_uri + and cpe_uri.startswith("cpe:2.3:o") + or cpe_uri.startswith("cpe:2.3:h") + ): continue all_parts = CPE_FULL_REGEX.match(cpe_uri) # If a single version is mentioned using cpe then use that as a fallback single_version = "" - if all_parts and all_parts.group("version") and all_parts.group("version") != "*": + if ( + all_parts + and all_parts.group("version") + and all_parts.group("version") != "*" + ): single_version = all_parts.group("version") - version_start_including = cpe.get("versionStartIncluding", single_version) + version_start_including = cpe.get( + "versionStartIncluding", single_version + ) version_end_including = cpe.get("versionEndIncluding", single_version) detail = { "cpe_uri": cpe_uri, @@ -268,22 +393,15 @@ def convert_api_vuln_detail(vuln: dict) -> list[VulnerabilityDetail] | None: "source_orig_time": vuln["published"], } cpe_details_list.append(detail) - # Try to extract any git references from related urls - # See: https://github.com/AppThreat/vulnerability-db/issues/91 - if git_urls: - for agit_url in git_urls: - git_repo_name = agit_url - if "github.com" in agit_url and "cve" not in agit_url: - for part in ("/issues", "/commit", "/pull", "/blob", "/releases", "/wiki", "/security", "/compare"): - if part in agit_url: - git_repo_name = agit_url.split(part)[0] - if git_repo_name and not parsed_git_repo_names.get(git_repo_name): - parsed_git_repo_names[git_repo_name] = True - git_repo_name = git_repo_name.removeprefix("https://").removeprefix("http://").removeprefix("git@").removesuffix("/") - cpe_uri = f"cpe:2.3:a:generic:{git_repo_name}:*:*:*:*:*:*:*:*" - new_git_detail = detail.copy() - new_git_detail["cpe_uri"] = cpe_uri - cpe_details_list.append(new_git_detail) + alt_cpes = get_alt_cpes(detail["cpe_uri"], git_urls) + for altc in alt_cpes: + # Filter duplicates + if vuln_alt_cpes.get(altc): + continue + new_git_detail = detail.copy() + new_git_detail["cpe_uri"] = altc + cpe_details_list.append(new_git_detail) + vuln_alt_cpes[altc] = True for det in cpe_details_list: adetail = VulnerabilityDetail.from_dict(det) if adetail: diff --git a/vdb/lib/search.py b/vdb/lib/search.py index 09a5fdd..a774f24 100644 --- a/vdb/lib/search.py +++ b/vdb/lib/search.py @@ -12,17 +12,21 @@ def _filter_hits(raw_hits: list, compare_ver: str) -> list: cve_id = ahit[0] vers = ahit[-1] if utils.vers_compare(compare_ver, vers): - filtered_list.append({ - "cve_id": cve_id, - "type": ahit[1], - "namespace": ahit[2], - "name": ahit[3], - "vers": vers - }) + filtered_list.append( + { + "cve_id": cve_id, + "type": ahit[1], + "namespace": ahit[2], + "name": ahit[3], + "vers": vers, + } + ) return filtered_list -def get_cve_data(db_conn, index_hits: list[dict, Any], search_str: str) -> list[dict[str, str | CVE | None]]: +def get_cve_data( + db_conn, index_hits: list[dict, Any], search_str: str +) -> list[dict[str, str | CVE | None]]: """Get CVE data for the index results Args: @@ -37,20 +41,34 @@ def get_cve_data(db_conn, index_hits: list[dict, Any], search_str: str) -> list[ db_conn, _ = db6.get(read_only=True) data_list = [] for ahit in index_hits: - results = exec_query(db_conn, - "SELECT cve_id, type, namespace, name, json_object('source', source_data), json_object('override', override_data) FROM cve_data WHERE cve_id = ? AND type = ? ORDER BY cve_id DESC;", - (ahit["cve_id"], ahit["type"])) + results = exec_query( + db_conn, + "SELECT cve_id, type, namespace, name, json_object('source', source_data), json_object('override', override_data) FROM cve_data WHERE cve_id = ? AND type = ? ORDER BY cve_id DESC;", + (ahit["cve_id"], ahit["type"]), + ) for res in results: - data_list.append({ - "cve_id": res[0], - "type": res[1], - "namespace": res[2], - "name": res[3], - "matching_vers": ahit["vers"], - "matched_by": search_str, - "source_data": CVE(root=CVE1.model_validate(orjson.loads(res[4])["source"], strict=False)) if res[4] else None, - "override_data": orjson.loads(res[5])["override"] if res[5] else None - }) + data_list.append( + { + "cve_id": res[0], + "type": res[1], + "namespace": res[2], + "name": res[3], + "matching_vers": ahit["vers"], + "matched_by": search_str, + "source_data": ( + CVE( + root=CVE1.model_validate( + orjson.loads(res[4])["source"], strict=False + ) + ) + if res[4] + else None + ), + "override_data": ( + orjson.loads(res[5])["override"] if res[5] else None + ), + } + ) return data_list @@ -63,13 +81,17 @@ def search_by_cpe_like(cpe: str, with_data=False) -> list | None: vendor, package, version = cpe.split(":") else: return None - raw_hits = exec_query(index_conn, - "SELECT cve_id, type, namespace, name, vers FROM cve_index where namespace = ? AND name = ?;", - (vendor, package)) + raw_hits = exec_query( + index_conn, + "SELECT cve_id, type, namespace, name, vers FROM cve_index where namespace = ? AND name = ?;", + (vendor, package), + ) if not raw_hits: - raw_hits = exec_query(index_conn, - "SELECT cve_id, type, namespace, name, vers FROM cve_index where type = ? AND name = ?;", - (vendor, package)) + raw_hits = exec_query( + index_conn, + "SELECT cve_id, type, namespace, name, vers FROM cve_index where type = ? AND name = ?;", + (vendor, package), + ) filtered_list = _filter_hits(raw_hits, version) if with_data: return get_cve_data(db_conn, filtered_list, cpe) @@ -98,9 +120,11 @@ def search_by_purl_like(purl: str, with_data=False) -> list | None: extra_filter = " AND namespace IS NULL" extra_filter = f"{extra_filter} AND name = ?" args.append(name) - raw_hits = exec_query(index_conn, - f"SELECT cve_id, type, namespace, name, vers FROM cve_index where purl_prefix = ? OR (type = ?{extra_filter});", - args) + raw_hits = exec_query( + index_conn, + f"SELECT cve_id, type, namespace, name, vers FROM cve_index where purl_prefix = ? OR (type = ?{extra_filter});", + args, + ) filtered_list = _filter_hits(raw_hits, version) if with_data: return get_cve_data(db_conn, filtered_list, purl) @@ -110,7 +134,11 @@ def search_by_purl_like(purl: str, with_data=False) -> list | None: def search_by_cve(cve_id: str, with_data=False) -> list | None: """Search by CVE""" db_conn, index_conn = db6.get(read_only=True) - raw_hits = exec_query(index_conn, "SELECT cve_id, type, namespace, name, vers FROM cve_index where cve_id = ?", (cve_id, )) + raw_hits = exec_query( + index_conn, + "SELECT cve_id, type, namespace, name, vers FROM cve_index where cve_id = ?", + (cve_id,), + ) filtered_list = _filter_hits(raw_hits, "*") if with_data: return get_cve_data(db_conn, filtered_list, cve_id)