Skip to content

Commit 5da8c44

Browse files
authored
Migrate EPSS importer for advisory V2 (#2067)
* Migrate EPSS importer for advisory V2 Signed-off-by: ziad hany <[email protected]> * Add a test for epss Signed-off-by: ziad hany <[email protected]> --------- Signed-off-by: ziad hany <[email protected]>
1 parent 973ee5c commit 5da8c44

File tree

5 files changed

+824
-0
lines changed

5 files changed

+824
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from vulnerabilities.pipelines.v2_importers import (
4949
elixir_security_importer as elixir_security_importer_v2,
5050
)
51+
from vulnerabilities.pipelines.v2_importers import epss_importer_v2
5152
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
5253
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
5354
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
@@ -83,6 +84,7 @@
8384
github_osv_importer_v2.GithubOSVImporterPipeline,
8485
redhat_importer_v2.RedHatImporterPipeline,
8586
aosp_importer_v2.AospImporterPipeline,
87+
epss_importer_v2.EPSSImporterPipeline,
8688
nvd_importer.NVDImporterPipeline,
8789
github_importer.GitHubAPIImporterPipeline,
8890
gitlab_importer.GitLabImporterPipeline,
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
import csv
10+
import gzip
11+
import logging
12+
import urllib.request
13+
from datetime import datetime
14+
from typing import Iterable
15+
16+
from vulnerabilities import severity_systems
17+
from vulnerabilities.importer import AdvisoryData
18+
from vulnerabilities.importer import ReferenceV2
19+
from vulnerabilities.importer import VulnerabilitySeverity
20+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
class EPSSImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
26+
"""Exploit Prediction Scoring System (EPSS) Importer"""
27+
28+
advisory_url = "https://epss.cyentia.com/epss_scores-current.csv.gz"
29+
pipeline_id = "epss_importer_v2"
30+
spdx_license_expression = "unknown"
31+
importer_name = "EPSS Importer"
32+
33+
def advisories_count(self):
34+
return len(self.lines)
35+
36+
@classmethod
37+
def steps(cls):
38+
return (
39+
cls.fetch_db,
40+
cls.collect_and_store_advisories,
41+
)
42+
43+
def fetch_db(self):
44+
logger.info(f"Fetching EPSS database from {self.advisory_url}")
45+
response = urllib.request.urlopen(self.advisory_url)
46+
with gzip.open(response, "rb") as f:
47+
self.lines = [l.decode("utf-8") for l in f.readlines()]
48+
49+
def collect_advisories(self) -> Iterable[AdvisoryData]:
50+
if not self.lines:
51+
logger.error("No EPSS data loaded")
52+
raise ValueError("EPSS data is empty")
53+
54+
epss_reader = csv.reader(self.lines)
55+
model_version, score_date = next(
56+
epss_reader
57+
) # score_date='score_date:2024-05-19T00:00:00+0000'
58+
published_at = datetime.strptime(score_date[11::], "%Y-%m-%dT%H:%M:%S%z")
59+
60+
next(epss_reader) # skip the header row
61+
for epss_row in epss_reader:
62+
cve, score, percentile = epss_row
63+
64+
if not cve or not score or not percentile:
65+
logger.error(f"Invalid epss row: {epss_row}")
66+
continue
67+
68+
severity = VulnerabilitySeverity(
69+
system=severity_systems.EPSS,
70+
value=score,
71+
scoring_elements=percentile,
72+
published_at=published_at,
73+
)
74+
75+
references = ReferenceV2(
76+
url=f"https://api.first.org/data/v1/epss?cve={cve}",
77+
)
78+
79+
yield AdvisoryData(
80+
advisory_id=cve,
81+
severities=[severity],
82+
references_v2=[references],
83+
url=self.advisory_url,
84+
)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from pathlib import Path
11+
12+
import pytest
13+
14+
from vulnerabilities.pipelines.v2_importers.epss_importer_v2 import EPSSImporterPipeline
15+
from vulnerabilities.tests import util_tests
16+
17+
TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "epss"
18+
19+
TEST_CVE_FILES = (TEST_DATA / "epss_scores-2025-x-x.csv",)
20+
21+
22+
@pytest.mark.django_db
23+
@pytest.mark.parametrize("csv_file", TEST_CVE_FILES)
24+
def test_epss_advisories_per_file(csv_file):
25+
pipeline = EPSSImporterPipeline()
26+
27+
with open(csv_file, "r") as f:
28+
pipeline.lines = f.readlines()
29+
30+
result = [adv.to_dict() for adv in pipeline.collect_advisories()]
31+
expected_file = Path(TEST_DATA / "epss-expected.json")
32+
util_tests.check_results_against_json(result, expected_file)

0 commit comments

Comments
 (0)