|
17 | 17 | from vulnerabilities.pipelines import VulnerableCodePipeline
|
18 | 18 |
|
19 | 19 |
|
20 |
| -class FillVulnerabilitySummariesPipeline(VulnerableCodePipeline): |
21 |
| - """Pipeline to fill missing vulnerability summaries from advisories.""" |
| 20 | +class PopulateVulnerabilitySummariesPipeline(VulnerableCodePipeline): |
| 21 | + """Pipeline to populate missing vulnerability summaries from advisories.""" |
22 | 22 |
|
23 |
| - pipeline_id = "fill_vulnerability_summaries" |
| 23 | + pipeline_id = "populate_vulnerability_summaries" |
24 | 24 |
|
25 | 25 | @classmethod
|
26 | 26 | def steps(cls):
|
27 |
| - return (cls.fill_missing_summaries,) |
| 27 | + return (cls.populate_missing_summaries,) |
28 | 28 |
|
29 |
| - def fill_missing_summaries(self): |
30 |
| - """Find vulnerabilities without summaries and fill them using advisories with the same aliases.""" |
| 29 | + def populate_missing_summaries(self): |
| 30 | + """Find vulnerabilities with missing summaries and populate them using advisories with the same aliases.""" |
31 | 31 | vulnerabilities_qs = Vulnerability.objects.filter(summary="")
|
32 | 32 | self.log(
|
33 | 33 | f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries",
|
34 | 34 | level=logging.INFO,
|
35 | 35 | )
|
36 |
| - nvd_importer_advisories = Advisory.objects.filter( |
37 |
| - created_by="nvd_importer", summary__isnull=False |
38 |
| - ).exclude(summary="") |
39 |
| - self.log( |
40 |
| - f"Found {nvd_importer_advisories.count()} advisories with summaries from NVD importer", |
41 |
| - level=logging.INFO, |
42 |
| - ) |
| 36 | + # nvd_importer_advisories = Advisory.objects.filter(created_by="nvd_importer").exclude(summary="") |
| 37 | + # self.log( |
| 38 | + # f"Found {nvd_importer_advisories.count()} advisories with summaries from NVD importer", |
| 39 | + # level=logging.INFO, |
| 40 | + # ) |
43 | 41 |
|
44 | 42 | progress = LoopProgress(total_iterations=vulnerabilities_qs.count(), logger=self.log)
|
45 | 43 |
|
46 |
| - for vulnerability in progress.iter(vulnerabilities_qs.paginated()): |
47 |
| - aliases = vulnerability.aliases.values_list("alias", flat=True) |
48 |
| - # get alias that start with CVE- with filter |
49 |
| - alias = aliases.filter(alias__startswith="CVE-").first() |
| 44 | + for vulnerability in progress.iter(vulnerabilities_qs.iterator()): |
| 45 | + cve_alias = vulnerability.aliases.filter(alias__startswith="CVE-").first() |
50 | 46 |
|
51 |
| - # check if the vulnerability has an alias |
52 |
| - if not alias: |
| 47 | + if not cve_alias: |
53 | 48 | self.log(
|
54 |
| - f"Vulnerability {vulnerability.vulnerability_id} has no alias", |
| 49 | + f"Vulnerability {vulnerability.vulnerability_id} has no CVE alias", |
55 | 50 | level=logging.INFO,
|
56 | 51 | )
|
57 | 52 | continue
|
58 | 53 |
|
59 |
| - # check if the vulnerability has an alias that matches an advisory |
60 |
| - matching_advisories = nvd_importer_advisories.filter(Q(aliases__contains=alias)) |
| 54 | + matching_advisories = Advisory.objects.filter(aliases=cve_alias) |
61 | 55 |
|
62 | 56 | if matching_advisories.exists():
|
63 |
| - # Take the first matching advisory with a summary |
64 |
| - # get the advisory that was collected the most recently |
65 | 57 | best_advisory = matching_advisories.order_by("-date_collected").first()
|
66 | 58 | # Note: we filtered above to only get non-empty summaries
|
67 | 59 | vulnerability.summary = best_advisory.summary
|
|
0 commit comments