7
7
# See https://aboutcode.org for more information about nexB OSS projects.
8
8
#
9
9
10
+ import csv
10
11
import hashlib
11
12
import json
12
13
import logging
14
+ import xml .etree .ElementTree as ET
13
15
from contextlib import suppress
14
16
from functools import cached_property
17
+ from itertools import groupby
18
+ from operator import attrgetter
15
19
from typing import Union
16
20
21
+ from cvss .exceptions import CVSS2MalformedError
22
+ from cvss .exceptions import CVSS3MalformedError
23
+ from cvss .exceptions import CVSS4MalformedError
17
24
from cwe2 .database import Database
25
+ from cwe2 .mappings import xml_database_path
26
+ from cwe2 .weakness import Weakness as DBWeakness
18
27
from django .contrib .auth import get_user_model
19
28
from django .contrib .auth .models import UserManager
20
29
from django .core import exceptions
41
50
from univers .version_range import AlpineLinuxVersionRange
42
51
from univers .versions import Version
43
52
44
- from aboutcode import hashid
45
53
from vulnerabilities import utils
54
+ from vulnerabilities .severity_systems import EPSS
46
55
from vulnerabilities .severity_systems import SCORING_SYSTEMS
47
56
from vulnerabilities .utils import normalize_purl
48
57
from vulnerabilities .utils import purl_to_dict
@@ -371,6 +380,127 @@ def get_related_purls(self):
371
380
"""
372
381
return [p .package_url for p in self .packages .distinct ().all ()]
373
382
383
+ def aggregate_fixed_and_affected_packages (self ):
384
+ from vulnerabilities .utils import get_purl_version_class
385
+
386
+ sorted_fixed_by_packages = self .fixed_by_packages .filter (is_ghost = False ).order_by (
387
+ "type" , "namespace" , "name" , "qualifiers" , "subpath"
388
+ )
389
+
390
+ if sorted_fixed_by_packages :
391
+ sorted_fixed_by_packages .first ().calculate_version_rank
392
+
393
+ sorted_affected_packages = self .affected_packages .all ()
394
+
395
+ if sorted_affected_packages :
396
+ sorted_affected_packages .first ().calculate_version_rank
397
+
398
+ grouped_fixed_by_packages = {
399
+ key : list (group )
400
+ for key , group in groupby (
401
+ sorted_fixed_by_packages ,
402
+ key = attrgetter ("type" , "namespace" , "name" , "qualifiers" , "subpath" ),
403
+ )
404
+ }
405
+
406
+ all_affected_fixed_by_matches = []
407
+
408
+ for sorted_affected_package in sorted_affected_packages :
409
+ affected_fixed_by_matches = {
410
+ "affected_package" : sorted_affected_package ,
411
+ "matched_fixed_by_packages" : [],
412
+ }
413
+
414
+ # Build the key to find matching group
415
+ key = (
416
+ sorted_affected_package .type ,
417
+ sorted_affected_package .namespace ,
418
+ sorted_affected_package .name ,
419
+ sorted_affected_package .qualifiers ,
420
+ sorted_affected_package .subpath ,
421
+ )
422
+
423
+ # Get matching group from pre-grouped fixed_by_packages
424
+ matching_fixed_packages = grouped_fixed_by_packages .get (key , [])
425
+
426
+ # Get version classes for comparison
427
+ affected_version_class = get_purl_version_class (sorted_affected_package )
428
+ affected_version = affected_version_class (sorted_affected_package .version )
429
+
430
+ # Compare versions and filter valid matches
431
+ matched_fixed_by_packages = [
432
+ fixed_by_package .purl
433
+ for fixed_by_package in matching_fixed_packages
434
+ if get_purl_version_class (fixed_by_package )(fixed_by_package .version )
435
+ > affected_version
436
+ ]
437
+
438
+ affected_fixed_by_matches ["matched_fixed_by_packages" ] = matched_fixed_by_packages
439
+ all_affected_fixed_by_matches .append (affected_fixed_by_matches )
440
+ return sorted_fixed_by_packages , sorted_affected_packages , all_affected_fixed_by_matches
441
+
442
+ def get_severity_vectors_and_values (self ):
443
+ """
444
+ Collect severity vectors and values, excluding EPSS scoring systems and handling errors gracefully.
445
+ """
446
+ severity_vectors = []
447
+ severity_values = set ()
448
+
449
+ # Exclude EPSS scoring system
450
+ base_severities = self .severities .exclude (scoring_system = EPSS .identifier )
451
+
452
+ # QuerySet for severities with valid scoring_elements and scoring_system in SCORING_SYSTEMS
453
+ valid_scoring_severities = base_severities .filter (
454
+ scoring_elements__isnull = False , scoring_system__in = SCORING_SYSTEMS .keys ()
455
+ )
456
+
457
+ for severity in valid_scoring_severities :
458
+ try :
459
+ vector_values = SCORING_SYSTEMS [severity .scoring_system ].get (
460
+ severity .scoring_elements
461
+ )
462
+ if vector_values :
463
+ severity_vectors .append (vector_values )
464
+ except (
465
+ CVSS2MalformedError ,
466
+ CVSS3MalformedError ,
467
+ CVSS4MalformedError ,
468
+ NotImplementedError ,
469
+ ) as e :
470
+ logging .error (f"CVSSMalformedError for { severity .scoring_elements } : { e } " )
471
+
472
+ valid_value_severities = base_severities .filter (value__isnull = False ).exclude (value = "" )
473
+
474
+ severity_values .update (valid_value_severities .values_list ("value" , flat = True ))
475
+
476
+ return severity_vectors , severity_values
477
+
478
+
479
+ def get_cwes (self ):
480
+ """Yield CWE Weakness objects"""
481
+ for cwe_category in self .cwe_files :
482
+ cwe_category .seek (0 )
483
+ reader = csv .DictReader (cwe_category )
484
+ for row in reader :
485
+ yield DBWeakness (* list (row .values ())[0 :- 1 ])
486
+ tree = ET .parse (xml_database_path )
487
+ root = tree .getroot ()
488
+ for tag_num in [1 , 2 ]: # Categories , Views
489
+ tag = root [tag_num ]
490
+ for child in tag :
491
+ yield DBWeakness (
492
+ * [
493
+ child .attrib ["ID" ],
494
+ child .attrib .get ("Name" ),
495
+ None ,
496
+ child .attrib .get ("Status" ),
497
+ child [0 ].text ,
498
+ ]
499
+ )
500
+
501
+
502
+ Database .get_cwes = get_cwes
503
+
374
504
375
505
class Weakness (models .Model ):
376
506
"""
@@ -379,7 +509,15 @@ class Weakness(models.Model):
379
509
380
510
cwe_id = models .IntegerField (help_text = "CWE id" )
381
511
vulnerabilities = models .ManyToManyField (Vulnerability , related_name = "weaknesses" )
382
- db = Database ()
512
+
513
+ cwe_by_id = {}
514
+
515
+ def get_cwe (self , cwe_id ):
516
+ if not self .cwe_by_id :
517
+ db = Database ()
518
+ for weakness in db .get_cwes ():
519
+ self .cwe_by_id [str (weakness .cwe_id )] = weakness
520
+ return self .cwe_by_id [cwe_id ]
383
521
384
522
@property
385
523
def cwe (self ):
@@ -391,7 +529,7 @@ def weakness(self):
391
529
Return a queryset of Weakness for this vulnerability.
392
530
"""
393
531
try :
394
- weakness = self .db . get ( self .cwe_id )
532
+ weakness = self .get_cwe ( str ( self .cwe_id ) )
395
533
return weakness
396
534
except Exception as e :
397
535
logger .warning (f"Could not find CWE { self .cwe_id } : { e } " )
0 commit comments