1
1
import json
2
2
import logging
3
+ import re
3
4
from datetime import datetime
4
5
from pathlib import Path
5
6
from typing import Iterable
8
9
9
10
import dateparser
10
11
from packageurl import PackageURL
12
+ from univers .version_constraint import VersionConstraint
13
+ from univers .version_range import RANGE_CLASS_BY_SCHEMES
14
+ from univers .version_range import VersionRange
15
+ from univers .versions import AlpineLinuxVersion
16
+ from univers .versions import ArchLinuxVersion
17
+ from univers .versions import ComposerVersion
18
+ from univers .versions import DebianVersion
19
+ from univers .versions import GenericVersion
20
+ from univers .versions import GentooVersion
21
+ from univers .versions import GolangVersion
11
22
from univers .versions import InvalidVersion
23
+ from univers .versions import LegacyOpensslVersion
24
+ from univers .versions import MavenVersion
25
+ from univers .versions import NginxVersion
26
+ from univers .versions import NugetVersion
27
+ from univers .versions import OpensslVersion
28
+ from univers .versions import PypiVersion
29
+ from univers .versions import RpmVersion
12
30
from univers .versions import SemverVersion
13
31
from univers .versions import Version
14
32
23
41
from vulnerabilities .utils import get_advisory_url
24
42
from vulnerabilities .utils import get_cwe_id
25
43
44
+ logger = logging .getLogger (__name__ )
45
+
46
+ VULNRICH_VERSION_CLASS_SCHEMES = {
47
+ "semver" : SemverVersion ,
48
+ "python" : PypiVersion ,
49
+ "custom" : GenericVersion ,
50
+ "rpm" : RpmVersion ,
51
+ "maven" : MavenVersion ,
52
+ }
53
+
26
54
27
55
class VulnrichImporter (Importer ):
28
56
spdx_license_expression = "CC0-1.0"
@@ -44,7 +72,7 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
44
72
advisory_url = get_advisory_url (
45
73
file = file_path ,
46
74
base_path = base_path ,
47
- url = "https://github.com/rubysec/ruby-advisory-db /blob/master /" ,
75
+ url = "https://github.com/cisagov/vulnrichment /blob/develop /" ,
48
76
)
49
77
yield parse_cve_advisory (raw_data , advisory_url )
50
78
finally :
@@ -53,12 +81,12 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
53
81
54
82
55
83
def parse_cve_advisory (raw_data , advisory_url ):
56
- """"""
57
-
84
+ """ """
58
85
# Extract CVE Metadata
59
86
cve_metadata = raw_data .get ("cveMetadata" , {})
60
87
cve_id = cve_metadata .get ("cveId" )
61
88
state = cve_metadata .get ("state" )
89
+
62
90
date_published = cve_metadata .get ("datePublished" )
63
91
date_published = dateparser .parse (date_published )
64
92
@@ -68,25 +96,75 @@ def parse_cve_advisory(raw_data, advisory_url):
68
96
adp_data = containers .get ("adp" , {})
69
97
70
98
# Extract affected products
71
- # affected_products = cna_data.get("affected", [])
72
- # products = []
73
- # for product in affected_products:
74
- # product_info = {
75
- # "default_status": product.get("defaultStatus"),
76
- # "platforms": product.get("platforms", []),
77
- # "product": product.get("product"),
78
- # "vendor": product.get("vendor"),
79
- # "versions": product.get("versions", [])
80
- # }
81
- # products.append(product_info)
99
+ affected_packages = []
100
+ for affected_product in cna_data .get ("affected" , []):
101
+ if type (affected_product ) != dict :
102
+ continue
103
+ cpes = affected_product .get ("cpes" ) # TODO Add references cpes
104
+
105
+ vendor = affected_product .get ("vendor" ) or ""
106
+ collection_url = affected_product .get ("collectionURL" ) or ""
107
+ product = affected_product .get ("product" ) or ""
108
+ package_name = affected_product .get ("packageName" ) or ""
109
+
110
+ platforms = affected_product .get ("platforms" , [])
111
+ default_status = affected_product .get ("defaultStatus" )
112
+
113
+ affected_packages = []
114
+ # purl (vendor, collection_url, product, package_name, platforms)
115
+ purl = PackageURL (
116
+ type = vendor ,
117
+ name = product ,
118
+ namespace = package_name ,
119
+ )
120
+
121
+ versions = affected_product .get ("versions" , [])
122
+ for version_data in versions :
123
+ # version ≤ V ≤ (lessThanOrEqual/lessThan)
124
+ # right_version ≤ V ≤ left_version
125
+ version_constraints = []
126
+ r_version = version_data .get ("version" )
127
+ version_type = version_data .get ("versionType" )
128
+ version_class = VULNRICH_VERSION_CLASS_SCHEMES .get (version_type )
129
+ if not version_class :
130
+ logger .error (f"Invalid version_class type: { version_type } " )
131
+ continue
132
+
133
+ l_version , l_comparator = None , ""
134
+ if "lessThan" in version_data :
135
+ l_version = version_data .get ("lessThan" )
136
+ l_comparator = "<"
137
+ elif "lessThanOrEqual" in version_data :
138
+ l_version = version_data .get ("lessThanOrEqual" )
139
+ l_comparator = "<="
140
+ try :
141
+ if l_version and l_comparator :
142
+ version_constraints .append (
143
+ VersionConstraint (comparator = l_comparator , version = version_class (l_version ))
144
+ )
145
+ if r_version :
146
+ version_constraints .append (
147
+ VersionConstraint (comparator = ">" , version = version_class (r_version ))
148
+ )
149
+ except InvalidVersion :
150
+ logger .error (f"InvalidVersion: { l_version } -{ r_version } " )
151
+ continue
152
+
153
+ affected_packages .append (
154
+ AffectedPackage (
155
+ purl ,
156
+ affected_version_range = VersionRange (constraints = version_constraints ),
157
+ )
158
+ )
159
+ status = version_data .get ("status" )
82
160
83
161
# Extract descriptions
84
- description = ""
162
+ summary = ""
85
163
description_list = cna_data .get ("descriptions" , [])
86
164
for description_dict in description_list :
87
- if description_dict .get ("lang" ) != "en" :
165
+ if not description_dict .get ("lang" ) in [ "en" , "en-US" ] :
88
166
continue
89
- description = description_dict .get ("value" )
167
+ summary = description_dict .get ("value" )
90
168
91
169
# Extract metrics
92
170
severities = []
@@ -98,7 +176,7 @@ def parse_cve_advisory(raw_data, advisory_url):
98
176
"cvssV2_0" : SCORING_SYSTEMS ["cvssv2" ],
99
177
"other" : {
100
178
"ssvc" : SCORING_SYSTEMS ["ssvc" ],
101
- },
179
+ }, # ignore kev
102
180
}
103
181
104
182
for metric in metrics :
@@ -127,35 +205,34 @@ def parse_cve_advisory(raw_data, advisory_url):
127
205
severities .append (severity )
128
206
129
207
# Extract references
208
+ # TODO ADD reference type
130
209
references = [
131
210
Reference (url = ref .get ("url" ), severities = severities )
132
211
for ref in cna_data .get ("references" , [])
133
212
]
134
213
135
- # Extract problem types
136
214
weaknesses = []
137
- # problem_types = cna_data.get("problemTypes", [])
138
- # for problem in problem_types:
139
- # descriptions = problem.get("descriptions", [])
140
- # for description in descriptions:
141
- # weaknesses.append(
142
- # description.get("cweId")
143
- # "description": description.get("description"),
144
- # "lang": description.get("lang"),
145
- # "type": description.get("type")
146
- # )
147
- #
148
- # # cwe_id = description.get("cweId")
149
- # # cwe_id = get_cwe_id(cwe_id)
150
- # # weaknesses.append(cwe_id)
215
+ for problem_type in cna_data .get ("problemTypes" , []):
216
+ descriptions = problem_type .get ("descriptions" , [])
217
+ for description in descriptions :
218
+ cwe_id = description .get ("cweId" )
219
+ if cwe_id :
220
+ weaknesses .append (get_cwe_id (cwe_id ))
221
+
222
+ description_text = description .get ("description" )
223
+ if description_text :
224
+ pattern = r"CWE-(\d{3})"
225
+ match = re .search (pattern , description_text )
226
+ if match :
227
+ weaknesses .append (match .group (1 ))
151
228
152
229
return AdvisoryData (
153
230
aliases = [cve_id ],
154
- summary = description ,
155
- # affected_packages=affected_products ,
231
+ summary = summary ,
232
+ affected_packages = affected_packages ,
156
233
references = references ,
157
- date_published = date_published ,
158
- # weaknesses=weaknesses,
234
+ # date_published=dateparser.parse(self.cve_item.get("publishedDate")) ,
235
+ weaknesses = weaknesses ,
159
236
url = advisory_url ,
160
237
)
161
238
@@ -218,6 +295,7 @@ def ssvc_calculator(ssvc_data):
218
295
219
296
# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
220
297
decision_values = {"Track" : "T" , "Track*" : "R" , "Attend" : "A" , "Act" : "C" }
298
+
221
299
decision_lookup = {
222
300
("none" , "no" , "partial" , "low" ): "Track" ,
223
301
("none" , "no" , "partial" , "medium" ): "Track" ,
@@ -262,8 +340,8 @@ def ssvc_calculator(ssvc_data):
262
340
if decision :
263
341
ssvc_vector += f"D:{ decision_values .get (decision )} /"
264
342
265
- timestamp_formatted = datetime . strptime ( timestamp , "%Y-%m-%dT%H:%M:%S.%fZ" ). strftime (
266
- "%Y-%m-%dT%H:%M:%SZ"
267
- )
268
- ssvc_vector += f"{ timestamp_formatted } /"
343
+ if timestamp :
344
+ timestamp_formatted = dateparser . parse ( timestamp ). strftime ( "%Y-%m-%dT%H:%M:%SZ" )
345
+
346
+ ssvc_vector += f"{ timestamp_formatted } /"
269
347
return ssvc_vector , decision
0 commit comments