Skip to content

Commit f4140cd

Browse files
committed
Adress review comments
Signed-off-by: Tushar Goel <[email protected]>
1 parent fabe035 commit f4140cd

File tree

3 files changed

+191
-0
lines changed

3 files changed

+191
-0
lines changed

vulnerabilities/improvers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from vulnerabilities.pipelines import enhance_with_exploitdb
1818
from vulnerabilities.pipelines import enhance_with_kev
1919
from vulnerabilities.pipelines import enhance_with_metasploit
20+
from vulnerabilities.pipelines import fill_vulnerability_summary_pipeline
2021
from vulnerabilities.pipelines import flag_ghost_packages
2122

2223
IMPROVERS_REGISTRY = [
@@ -45,6 +46,7 @@
4546
compute_package_version_rank.ComputeVersionRankPipeline,
4647
collect_commits.CollectFixCommitsPipeline,
4748
add_cvss31_to_CVEs.CVEAdvisoryMappingPipeline,
49+
fill_vulnerability_summary_pipeline.FillVulnerabilitySummariesPipeline,
4850
]
4951

5052
IMPROVERS_REGISTRY = {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
12+
from django.db.models import Q
13+
14+
from vulnerabilities.models import Advisory
15+
from vulnerabilities.models import Vulnerability
16+
from vulnerabilities.pipelines import VulnerableCodePipeline
17+
18+
19+
class FillVulnerabilitySummariesPipeline(VulnerableCodePipeline):
20+
"""Pipeline to fill missing vulnerability summaries from advisories."""
21+
22+
pipeline_id = "fill_vulnerability_summaries"
23+
24+
@classmethod
25+
def steps(cls):
26+
return (cls.fill_missing_summaries,)
27+
28+
def fill_missing_summaries(self):
29+
"""Find vulnerabilities without summaries and fill them using advisories with the same aliases."""
30+
vulnerabilities_qs = Vulnerability.objects.filter(summary="").prefetch_related("aliases")
31+
self.log(
32+
f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries",
33+
level=logging.INFO,
34+
)
35+
nvd_importer_advisories = Advisory.objects.filter(
36+
created_by="nvd_importer", summary__isnull=False
37+
).exclude(summary="")
38+
self.log(
39+
f"Found {nvd_importer_advisories.count()} advisories from NVD importer",
40+
level=logging.INFO,
41+
)
42+
43+
for vulnerability in vulnerabilities_qs.paginated():
44+
aliases = vulnerability.aliases.values_list("alias", flat=True)
45+
# get alias that start with CVE- with filter
46+
alias = aliases.filter(alias__startswith="CVE-").first()
47+
48+
# check if the vulnerability has an alias
49+
if not alias:
50+
self.log(
51+
f"Vulnerability {vulnerability.vulnerability_id} has no alias",
52+
level=logging.INFO,
53+
)
54+
continue
55+
56+
# check if the vulnerability has an alias that matches an advisory
57+
matching_advisories = nvd_importer_advisories.filter(Q(aliases__contains=alias))
58+
59+
if matching_advisories.exists():
60+
# Take the first matching advisory with a summary
61+
best_advisory = matching_advisories.first()
62+
vulnerability.summary = best_advisory.summary
63+
vulnerability.save()
64+
self.log(
65+
f"Updated summary for vulnerability {vulnerability.vulnerability_id}",
66+
level=logging.INFO,
67+
)
68+
else:
69+
self.log(f"No advisory found for alias {alias}", level=logging.INFO)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import datetime
11+
from pathlib import Path
12+
13+
import pytz
14+
from django.test import TestCase
15+
16+
from vulnerabilities.models import Advisory
17+
from vulnerabilities.models import Alias
18+
from vulnerabilities.models import Vulnerability
19+
from vulnerabilities.pipelines.fill_vulnerability_summary_pipeline import (
20+
FillVulnerabilitySummariesPipeline,
21+
)
22+
23+
24+
class FillVulnerabilitySummariesPipelineTest(TestCase):
25+
def setUp(self):
26+
self.data = Path(__file__).parent.parent / "test_data"
27+
28+
def test_fill_missing_summaries_from_nvd(self):
29+
"""
30+
Test that vulnerabilities without summaries get them from NVD advisories.
31+
"""
32+
33+
# Create a vulnerability without a summary
34+
vulnerability = Vulnerability.objects.create(
35+
vulnerability_id="VCID-1234",
36+
summary="",
37+
)
38+
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
39+
40+
# Create an NVD advisory with a summary
41+
Advisory.objects.create(
42+
summary="Test vulnerability summary",
43+
created_by="nvd_importer",
44+
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC),
45+
aliases=["CVE-2024-1234"],
46+
)
47+
48+
# Run the pipeline
49+
pipeline = FillVulnerabilitySummariesPipeline()
50+
pipeline.fill_missing_summaries()
51+
52+
# Check that the vulnerability now has a summary
53+
vulnerability.refresh_from_db()
54+
self.assertEqual(vulnerability.summary, "Test vulnerability summary")
55+
56+
def test_no_matching_advisory(self):
57+
"""
58+
Test handling of vulnerabilities that have no matching NVD advisory.
59+
"""
60+
# Create a vulnerability without a summary
61+
vulnerability = Vulnerability.objects.create(
62+
vulnerability_id="VCID-1234",
63+
summary="",
64+
)
65+
Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
66+
67+
# Run the pipeline
68+
pipeline = FillVulnerabilitySummariesPipeline()
69+
pipeline.fill_missing_summaries()
70+
71+
# Check that the vulnerability still has no summary
72+
vulnerability.refresh_from_db()
73+
self.assertEqual(vulnerability.summary, "")
74+
75+
def test_vulnerability_without_alias(self):
76+
"""
77+
Test handling of vulnerabilities that have no aliases.
78+
"""
79+
80+
# Create a vulnerability without a summary or alias
81+
vulnerability = Vulnerability.objects.create(
82+
vulnerability_id="VCID-1234",
83+
summary="",
84+
)
85+
86+
# Run the pipeline
87+
pipeline = FillVulnerabilitySummariesPipeline()
88+
pipeline.fill_missing_summaries()
89+
90+
# Check that the vulnerability still has no summary
91+
vulnerability.refresh_from_db()
92+
self.assertEqual(vulnerability.summary, "")
93+
94+
def test_non_nvd_advisory_ignored(self):
95+
"""
96+
Test that advisories from sources other than NVD are ignored.
97+
"""
98+
99+
# Create a vulnerability without a summary
100+
vulnerability = Vulnerability.objects.create(
101+
vulnerability_id="VCID-1234",
102+
summary="",
103+
)
104+
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
105+
106+
# Create a non-NVD advisory with a summary
107+
Advisory.objects.create(
108+
summary="Test vulnerability summary",
109+
created_by="other_importer",
110+
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC),
111+
aliases=["CVE-2024-1234"],
112+
)
113+
114+
# Run the pipeline
115+
pipeline = FillVulnerabilitySummariesPipeline()
116+
pipeline.fill_missing_summaries()
117+
118+
# Check that the vulnerability still has no summary
119+
vulnerability.refresh_from_db()
120+
self.assertEqual(vulnerability.summary, "")

0 commit comments

Comments
 (0)