diff --git a/vulmatch/server/ctibutler_views/__init__.py b/vulmatch/server/ctibutler_views/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vulmatch/server/ctibutler_views/arango_helpers.py b/vulmatch/server/ctibutler_views/arango_helpers.py new file mode 100644 index 0000000..125994a --- /dev/null +++ b/vulmatch/server/ctibutler_views/arango_helpers.py @@ -0,0 +1,650 @@ +import contextlib +import json +from pathlib import Path +import re +import typing + +from django.http import HttpResponse +from arango import ArangoClient +from django.conf import settings +from vulmatch.server.utils import Pagination, Response +from drf_spectacular.utils import OpenApiParameter +from drf_spectacular.types import OpenApiTypes +from rest_framework.validators import ValidationError +from vulmatch.server import utils +if typing.TYPE_CHECKING: + from .. import settings + +import textwrap + +SDO_TYPES = set( + [ + "report", + "note", + "indicator", + "attack-pattern", + "weakness", + "campaign", + "course-of-action", + "infrastructure", + "intrusion-set", + "malware", + "threat-actor", + "tool", + "identity", + "location", + ] +) +SCO_TYPES = set( + [ + "ipv4-addr", + "network-traffic", + "ipv6-addr", + "domain-name", + "url", + "file", + "directory", + "email-addr", + "mac-addr", + "windows-registry-key", + "autonomous-system", + "user-agent", + "cryptocurrency-wallet", + "cryptocurrency-transaction", + "bank-card", + "bank-account", + "phone-number", + ] +) +TLP_TYPES = set([ + "marking-definition" +]) +ATTACK_TYPES = set([ + "attack-pattern", + "campaign", + "course-of-action", + "identity", + "intrusion-set", + "malware", + "marking-definition", + "tool", + "x-mitre-data-component", + "x-mitre-data-source", + "x-mitre-matrix", + "x-mitre-tactic" +] +) + +ATTACK_FORMS = { + "Tactic": [dict(type='x-mitre-tactic')], + "Technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=False), dict(type='attack-pattern', x_mitre_is_subtechnique=None)], + "Sub-technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=True)], + "Mitigation": [dict(type='course-of-action')], + "Group": [dict(type='intrusion-set')], + "Software": [dict(type='malware'), dict(type='tool')], + "Campaign": [dict(type='campaign')], + "Data Source": [dict(type='x-mitre-data-source')], + "Data Component": [dict(type='x-mitre-data-component')], + "Asset": [dict(type='x-mitre-asset')], +} + + +ATLAS_FORMS = { + "Tactic": [dict(type='x-mitre-tactic')], + "Technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=False), dict(type='attack-pattern', x_mitre_is_subtechnique=None)], + "Sub-technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=True)], + "Mitigation": [dict(type='course-of-action')], +} + + +DISARM_FORMS = { + "Tactic": [dict(type='x-mitre-tactic')], + "Technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=False), dict(type='attack-pattern', x_mitre_is_subtechnique=None)], + "Sub-technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=True)], +} + +LOCATION_TYPES = set([ + 'location' +]) +CWE_TYPES = set([ + "weakness", + # "grouping", + # "identity", + # "marking-definition", + # "extension-definition" +] +) + +DISARM_TYPES = set([ + "attack-pattern", + "identity", + "marking-definition", + "x-mitre-matrix", + "x-mitre-tactic" +]) + +ATLAS_TYPES = set([ + "attack-pattern", + "course-of-action", +# "identity", +# "marking-definition", + "x-mitre-collection", + "x-mitre-matrix", + "x-mitre-tactic" +]) + +SOFTWARE_TYPES = set([ + "software", + "identity", + "marking-definition" +] +) +CAPEC_TYPES = set([ + "attack-pattern", + "course-of-action", + "identity", + "marking-definition" +] +) + +LOCATION_SUBTYPES = set( +[ + "intermediate-region", + "sub-region", + "region", + "country" +] +) + +CVE_SORT_FIELDS = [ + "modified_descending", + "modified_ascending", + "created_ascending", + "created_descending", + "name_ascending", + "name_descending", + "epss_score_ascending", + "epss_score_descending", + "cvss_base_score_ascending", + "cvss_base_score_descending", +] +OBJECT_TYPES = SDO_TYPES.union(SCO_TYPES).union(["relationship"]) + + +def positive_int(integer_string, cutoff=None, default=1): + """ + Cast a string to a strictly positive integer. + """ + with contextlib.suppress(ValueError, TypeError): + ret = int(integer_string) + if ret <= 0: + return default + if cutoff: + return min(ret, cutoff) + return ret + return default + +class ArangoDBHelper: + max_page_size = settings.MAXIMUM_PAGE_SIZE + page_size = settings.DEFAULT_PAGE_SIZE + + def get_sort_stmt(self, sort_options: list[str], customs={}): + finder = re.compile(r"(.+)_((a|de)sc)ending") + sort_field = self.query.get('sort', sort_options[0]) + if sort_field not in sort_options: + return "" + if m := finder.match(sort_field): + field = m.group(1) + direction = m.group(2).upper() + if cfield := customs.get(field): + return f"SORT {cfield} {direction}" + return f"SORT doc.{field} {direction}" + + def query_as_array(self, key): + query = self.query.get(key) + if not query: + return [] + return query.split(',') + def query_as_bool(self, key, default=True): + query_str = self.query.get(key) + if not query_str: + return default + return query_str.lower() == 'true' + + @classmethod + def get_page_params(cls, request): + kwargs = request.GET.copy() + page_number = positive_int(kwargs.get('page')) + page_limit = positive_int(kwargs.get('page_size'), cutoff=ArangoDBHelper.max_page_size, default=ArangoDBHelper.page_size) + return page_number, page_limit + + @classmethod + def get_paginated_response(cls, container, data, page_number, page_size=page_size, full_count=0): + return Response( + { + "page_size": page_size or cls.page_size, + "page_number": page_number, + "page_results_count": len(data), + "total_results_count": full_count, + container: data, + } + ) + @classmethod + def get_paginated_response_schema(cls, container='objects', stix_type='identity'): + if stix_type == 'string': + container_schema = {'type':'string'} + else: + container_schema = { + "type": "object", + "properties": { + "type":{ + "example": stix_type, + }, + "id": { + "example": f"{stix_type}--a86627d4-285b-5358-b332-4e33f3ec1075", + }, + }, + "additionalProperties": True, + } + return { + "type": "object", + "required": ["page_results_count", container], + "properties": { + "page_size": { + "type": "integer", + "example": cls.max_page_size, + }, + "page_number": { + "type": "integer", + "example": 3, + }, + "page_results_count": { + "type": "integer", + "example": cls.page_size, + }, + "total_results_count": { + "type": "integer", + "example": cls.page_size * cls.max_page_size, + }, + container: {'type': 'array', 'items': container_schema} + } + } + + @classmethod + def get_relationship_schema_operation_parameters(cls): + return cls.get_schema_operation_parameters() + [ + OpenApiParameter( + "include_embedded_refs", + description=textwrap.dedent( + """ + If `ignore_embedded_relationships` is set to `false` in the POST request to download data, stix2arango will create SROS for embedded relationships (e.g. from `created_by_refs`). You can choose to show them (`true`) or hide them (`false`) using this parameter. Default value if not passed is `true`. + """ + ), + type=OpenApiTypes.BOOL + ), + OpenApiParameter( + "relationship_direction", + enum=["source_ref", "target_ref"], + description=textwrap.dedent( + """ + Filters the results to only include SROs which have this object in the specified SRO property (e.g. setting `source_ref` will only return SROs where the object is shown in the `source_ref` property). Default is both. + """ + ), + ), + OpenApiParameter( + "relationship_type", + description="filter by the `relationship_type` of the STIX SROs returned." + ), + OpenApiParameter( + "_arango_cti_processor_note", + description="Filter results by `_arango_cti_processor_note`" + ) + ] + @classmethod + def get_schema_operation_parameters(self): + parameters = [ + OpenApiParameter( + Pagination.page_query_param, + type=int, + description=Pagination.page_query_description, + ), + OpenApiParameter( + Pagination.page_size_query_param, + type=int, + description=Pagination.page_size_query_description, + ), + ] + return parameters + client = ArangoClient( + hosts=settings.ARANGODB_HOST_URL + ) + DB_NAME = f"{settings.ARANGODB_DATABASE}_database" + def __init__(self, collection, request, container='objects') -> None: + self.collection = collection + self.db = self.client.db( + self.DB_NAME, + username=settings.ARANGODB_USERNAME, + password=settings.ARANGODB_PASSWORD, + ) + self.container = container + self.page, self.count = self.get_page_params(request) + self.request = request + self.query = request.query_params.dict() + def execute_query(self, query, bind_vars={}, paginate=True, relationship_mode=False, container=None): + if relationship_mode: + return self.get_relationships(query, bind_vars) + if paginate: + bind_vars['offset'], bind_vars['count'] = self.get_offset_and_count(self.count, self.page) + cursor = self.db.aql.execute(query, bind_vars=bind_vars, count=True, full_count=True) + if paginate: + return self.get_paginated_response(container or self.container, cursor, self.page, self.page_size, cursor.statistics()["fullCount"]) + return list(cursor) + + def get_offset_and_count(self, count, page) -> tuple[int, int]: + page = page or 1 + if page >= 2**32: + raise ValidationError(f"invalid page `{page}`") + offset = (page-1)*count + return offset, count + + def get_attack_objects(self, matrix): + filters = [] + types = ATTACK_TYPES + if new_types := self.query_as_array('type'): + types = types.intersection(new_types) + bind_vars = { + "@collection": f'nvd_cve_vertex_collection', + "types": list(types), + } + + if attack_forms := self.query_as_array('attack_type'): + form_list = [] + for form in attack_forms: + form_list.extend(ATTACK_FORMS.get(form, [])) + + if form_list: + filters.append('FILTER @attack_form_list[? ANY FILTER MATCHES(doc, CURRENT)]') + bind_vars['attack_form_list'] = form_list + + + if q := self.query.get(f'attack_version'): + bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') + filters.append('FILTER doc._stix2arango_note == @mitre_version') + else: + filters.append('FILTER doc._is_latest') + + if value := self.query_as_array('id'): + bind_vars['ids'] = value + filters.append( + "FILTER doc.id in @ids" + ) + + bind_vars['include_deprecated'] = self.query_as_bool('include_deprecated', False) + bind_vars['include_revoked'] = self.query_as_bool('include_revoked', False) + + if value := self.query_as_array('attack_id'): + bind_vars['attack_ids'] = [v.lower() for v in value] + filters.append( + "FILTER LOWER(doc.external_references[0].external_id) in @attack_ids" + ) + if q := self.query.get('name'): + bind_vars['name'] = q.lower() + filters.append('FILTER CONTAINS(LOWER(doc.name), @name)') + + if q := self.query.get('alias'): + bind_vars['alias'] = q.lower() + filters.append('FILTER APPEND(doc.aliases, doc.x_mitre_aliases)[? ANY FILTER CONTAINS(LOWER(CURRENT), @alias)]') + + if q := self.query.get('description'): + bind_vars['description'] = q.lower() + filters.append('FILTER CONTAINS(LOWER(doc.description), @description)') + + query = """ + FOR doc in @@collection + FILTER doc.type IN @types AND doc._arango_cve_processor_note == 'cve-attack' AND (@include_revoked OR NOT doc.revoked) AND (@include_deprecated OR NOT doc.x_mitre_deprecated) + @filters + LIMIT @offset, @count + RETURN KEEP(doc, KEYS(doc, true)) + """.replace('@filters', '\n'.join(filters)) + # return HttpResponse(f"""{query}\n// {json.dumps(bind_vars)}""") + return self.execute_query(query, bind_vars=bind_vars) + + + def get_object_by_external_id(self, ext_id: str, note, relationship_mode=False, revokable=False, bundle=False): + bind_vars={'@collection': self.collection, 'ext_id': ext_id.lower(), "note": note} + filters = ['FILTER doc._is_latest'] + for version_param in ['attack_version', 'cwe_version', 'capec_version']: + if q := self.query.get(version_param): + bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') + filters[0] = 'FILTER doc._stix2arango_note == @mitre_version' + break + + if revokable: + bind_vars['include_deprecated'] = self.query_as_bool('include_deprecated', False) + bind_vars['include_revoked'] = self.query_as_bool('include_revoked', False) + filters.append('FILTER (@include_revoked OR NOT doc.revoked) AND (@include_deprecated OR NOT doc.x_mitre_deprecated)') + + query = ''' + FOR doc in @@collection + FILTER doc._arango_cve_processor_note == @note AND doc.type > "" AND LOWER(doc.external_references[0].external_id) == @ext_id + @filters + LIMIT @offset, @count + RETURN KEEP(doc, KEYS(doc, true)) + '''.replace('@filters', '\n'.join(filters)) + if bundle: + return self.get_bundle(query, bind_vars) + # return HttpResponse(f"""{query}\n// {json.dumps(bind_vars)}""".replace("@offset, @count", "100")) + return self.execute_query(query, bind_vars=bind_vars, relationship_mode=relationship_mode) + + def get_mitre_versions(self, stix_id=None): + query = """ + FOR doc IN @@collection + FILTER STARTS_WITH(doc._stix2arango_note, "version=") + RETURN DISTINCT doc._stix2arango_note + """ + bind_vars = {'@collection': self.collection} + versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) + versions = self.clean_and_sort_versions(versions) + return Response(dict(latest=versions[0] if versions else None, versions=versions)) + + def get_mitre_modified_versions(self, external_id: str=None, source_name='mitre-attack'): + query = """ + FOR doc IN @@collection + FILTER doc.external_references[? ANY FILTER LOWER(CURRENT.external_id) == @matcher.external_id AND @matcher.source_name == CURRENT.source_name] AND STARTS_WITH(doc._stix2arango_note, "version=") + FILTER (@include_revoked OR NOT doc.revoked) AND (@include_deprecated OR NOT doc.x_mitre_deprecated) // for MITRE ATT&CK, check if revoked + COLLECT modified = doc.modified INTO group + SORT modified DESC + RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)} + """ + bind_vars = { + '@collection': self.collection, 'matcher': dict(external_id=external_id.lower(), source_name=source_name), + # include_deprecated / include_revoked + 'include_revoked': self.query_as_bool('include_revoked', False), + 'include_deprecated': self.query_as_bool('include_deprecated', False), + } + versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) + for mod in versions: + mod['versions'] = self.clean_and_sort_versions(mod['versions']) + return Response(versions) + + def get_modified_versions(self, stix_id=None): + query = """ + FOR doc IN @@collection + FILTER doc.id == @stix_id AND STARTS_WITH(doc._stix2arango_note, "version=") + COLLECT modified = doc.modified INTO group + SORT modified DESC + RETURN {modified, versions: UNIQUE(group[*].doc._stix2arango_note)} + """ + bind_vars = {'@collection': self.collection, 'stix_id': stix_id} + versions = self.execute_query(query, bind_vars=bind_vars, paginate=False) + for mod in versions: + mod['versions'] = self.clean_and_sort_versions(mod['versions']) + return Response(versions) + + def clean_and_sort_versions(self, versions): + versions = sorted([ + v.split("=")[1].replace('_', ".") + for v in versions + ], key=utils.split_mitre_version, reverse=True) + return [f"{v}" for v in versions] + + def get_weakness_or_capec_objects(self, note, cwe=True, types=CWE_TYPES, lookup_kwarg='cwe_id', more_binds={}, more_filters=[], forms={}): + version_param = lookup_kwarg.replace('_id', '_version') + filters = [] + if new_types := self.query_as_array('type'): + types = types.intersection(new_types) + + bind_vars = { + "@collection": self.collection, + "types": list(types), + "note": note, + **more_binds + } + if q := self.query.get(version_param): + bind_vars['mitre_version'] = "version="+q.replace('.', '_').strip('v') + filters.append('FILTER doc._stix2arango_note == @mitre_version') + else: + filters.append('FILTER doc._is_latest') + + if value := self.query_as_array('id'): + bind_vars['ids'] = value + filters.append( + "FILTER doc.id in @ids" + ) + + + if generic_forms := self.query_as_array(lookup_kwarg.replace('_id', '_type')): + form_list = [] + for form in generic_forms: + form_list.extend(forms.get(form, [])) + + if form_list: + filters.append('FILTER @generic_form_list[? ANY FILTER MATCHES(doc, CURRENT)]') + bind_vars['generic_form_list'] = form_list + + if value := self.query_as_array(lookup_kwarg): + bind_vars['ext_ids'] = [v.lower() for v in value] + filters.append( + "FILTER LOWER(doc.external_references[0].external_id) in @ext_ids" + ) + if q := self.query.get('name'): + bind_vars['name'] = q.lower() + filters.append('FILTER CONTAINS(LOWER(doc.name), @name)') + + if q := self.query.get('description'): + bind_vars['description'] = q.lower() + filters.append('FILTER CONTAINS(LOWER(doc.description), @description)') + + query = """ + FOR doc in @@collection FILTER doc.type IN @types AND doc._arango_cve_processor_note == @note + @filters + LIMIT @offset, @count + RETURN KEEP(doc, KEYS(doc, true)) + """.replace('@filters', '\n'.join(filters+more_filters)) + # return HttpResponse(f"""{query}\n// {json.dumps(bind_vars)}""".replace("@offset, @count", "100")) + return self.execute_query(query, bind_vars=bind_vars) + + def get_object(self, stix_id, relationship_mode=False, version_param=None, bundle=False): + bind_vars={'@collection': self.collection, 'stix_id': stix_id} + filters = ['FILTER doc._is_latest'] + if version_param and self.query.get(version_param): + bind_vars['mitre_version'] = "version="+self.query.get(version_param).replace('.', '_').strip('v') + filters[0] = 'FILTER doc._stix2arango_note == @mitre_version' + + query = ''' + FOR doc in @@collection + FILTER doc.id == @stix_id + @filters + LIMIT @offset, @count + RETURN KEEP(doc, KEYS(doc, true)) + '''.replace('@filters', '\n'.join(filters)) + + if bundle: + return self.get_bundle(query, bind_vars) + + return self.execute_query(query, bind_vars=bind_vars, relationship_mode=relationship_mode) + + + def get_relationships(self, docs_query, binds): + regex = r"KEEP\((\w+),\s*\w+\(.*?\)\)" + binds['@view'] = settings.VIEW_NAME + other_filters = [] + + if term := self.query.get('relationship_type'): + binds['rel_relationship_type'] = term.lower() + other_filters.append("FILTER CONTAINS(LOWER(d.relationship_type), @rel_relationship_type)") + + if term := self.query.get('_arango_cti_processor_note'): + binds['rel_acp_note'] = term.lower() + other_filters.append("FILTER CONTAINS(LOWER(d._arango_cti_processor_note), @rel_acp_note)") + + if term := self.query_as_array('source_ref'): + binds['rel_source_ref'] = term + other_filters.append('FILTER d.source_ref IN @rel_source_ref') + + if terms := self.query_as_array('source_ref_type'): + binds['rel_source_ref_type'] = terms + other_filters.append('FILTER SPLIT(d.source_ref, "--")[0] IN @rel_source_ref_type') + + if term := self.query_as_array('target_ref'): + binds['rel_target_ref'] = term + other_filters.append('FILTER d.target_ref IN @rel_target_ref') + + if terms := self.query_as_array('target_ref_type'): + binds['rel_target_ref_type'] = terms + other_filters.append('FILTER SPLIT(d.target_ref, "--")[0] IN @rel_target_ref_type') + + match self.query.get('relationship_direction'): + case 'source_ref': + direction_query = 'd._from IN matched_ids' + case 'target_ref': + direction_query = 'd._to IN matched_ids' + case _: + direction_query = 'd._from IN matched_ids OR d._to IN matched_ids' + + if self.query_as_bool('include_embedded_refs', True): + embedded_refs_query = '' + else: + embedded_refs_query = 'AND d._is_ref != TRUE' + + new_query = """ + LET matched_ids = (@docs_query)[*]._id + FOR d IN @@view + SEARCH d.type == 'relationship' AND (@direction_query) @include_embedded_refs + @other_filters + LIMIT @offset, @count + RETURN KEEP(d, KEYS(d, TRUE)) + """.replace('@docs_query', re.sub(regex, lambda x: x.group(1), docs_query.replace('LIMIT @offset, @count', ''))) \ + .replace('@other_filters', "\n".join(other_filters)) \ + .replace('@direction_query', direction_query) \ + .replace('@include_embedded_refs', embedded_refs_query) + + return self.execute_query(new_query, bind_vars=binds, container='relationships') + + + def get_bundle(self, docs_query, binds): + regex = r"KEEP\((\w+),\s*\w+\(.*?\)\)" + binds['@view'] = settings.VIEW_NAME + more_search_filters = [] + + if not self.query_as_bool('include_embedded_refs', False): + more_search_filters.append('doc._is_ref != TRUE') + + query = ''' +LET matched_ids = (@docs_query)[*]._id + + LET bundle_ids = FLATTEN( + FOR doc IN @@view SEARCH doc.type == 'relationship' AND (doc._from IN matched_ids OR doc._to IN matched_ids) @@more_search_filters + RETURN [doc._id, doc._from, doc._to] + ) + + FOR d IN @@view SEARCH d._id IN APPEND(bundle_ids, matched_ids) + LIMIT @offset, @count + RETURN KEEP(d, KEYS(d, TRUE)) +''' + query = query \ + .replace('@docs_query', re.sub(regex, lambda x: x.group(1), docs_query.replace('LIMIT @offset, @count', ''))) \ + .replace('@@more_search_filters', "" if not more_search_filters else f" AND {' and '.join(more_search_filters)}") + # return Response([query, binds]) + return self.execute_query(query, bind_vars=binds) + \ No newline at end of file diff --git a/vulmatch/server/ctibutler_views/ctibutler_views.py b/vulmatch/server/ctibutler_views/ctibutler_views.py new file mode 100644 index 0000000..ab98feb --- /dev/null +++ b/vulmatch/server/ctibutler_views/ctibutler_views.py @@ -0,0 +1,265 @@ +import re +import textwrap +from rest_framework import viewsets, status, decorators + +from vulmatch.server.arango_helpers import ArangoDBHelper, ATTACK_TYPES, CAPEC_TYPES +from vulmatch.server.autoschema import DEFAULT_400_ERROR +from vulmatch.server.utils import Pagination, Response +from vulmatch.server import serializers +from django_filters.rest_framework import FilterSet, DjangoFilterBackend, ChoiceFilter, BaseCSVFilter, CharFilter, BooleanFilter +from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiParameter +from drf_spectacular.types import OpenApiTypes +from .arango_helpers import ATLAS_FORMS, ATLAS_TYPES, DISARM_FORMS, DISARM_TYPES, LOCATION_TYPES, ArangoDBHelper, ATTACK_TYPES, ATTACK_FORMS, CAPEC_TYPES, LOCATION_SUBTYPES + +REVOKED_AND_DEPRECATED_PARAMS = [ + OpenApiParameter('include_revoked', type=OpenApiTypes.BOOL, description="By default all objects with `revoked` are ignored. Set this to `true` to include them."), + OpenApiParameter('include_deprecated', type=OpenApiTypes.BOOL, description="By default all objects with `x_mitre_deprecated` are ignored. Set this to `true` to include them."), +] + +@extend_schema_view( + list_objects=extend_schema( + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + filters=True + ), + retrieve_objects=extend_schema( + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + parameters=REVOKED_AND_DEPRECATED_PARAMS, + ), + retrieve_object_relationships=extend_schema( + responses={200: ArangoDBHelper.get_paginated_response_schema('relationships', 'relationship'), 400: DEFAULT_400_ERROR}, + parameters=ArangoDBHelper.get_relationship_schema_operation_parameters() + REVOKED_AND_DEPRECATED_PARAMS, + ), +) +class AttackView(viewsets.ViewSet): + openapi_tags = ["ATT&CK"] + lookup_url_kwarg = 'stix_id' + openapi_path_params = [ + OpenApiParameter('stix_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The STIX ID (e.g. `attack-pattern--0042a9f5-f053-4769-b3ef-9ad018dfa298`, `malware--04227b24-7817-4de1-9050-b7b1b57f5866`)'), + OpenApiParameter('attack_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The ATT&CK ID, e.g `T1659`, `TA0043`, `S0066`'), + ] + + filter_backends = [DjangoFilterBackend] + MATRIX_TYPES = ["mobile", "ics", "enterprise"] + @property + def matrix(self): + m: re.Match = re.search(r"/attack-(\w+)/", self.request.path) + return "" + serializer_class = serializers.StixObjectsSerializer(many=True) + pagination_class = Pagination("objects") + + class filterset_class(FilterSet): + id = BaseCSVFilter(help_text='Filter the results using the STIX ID of an object. e.g. `attack-pattern--0042a9f5-f053-4769-b3ef-9ad018dfa298`, `malware--04227b24-7817-4de1-9050-b7b1b57f5866`.') + attack_id = BaseCSVFilter(help_text='The ATT&CK IDs of the object wanted. e.g. `T1659`, `TA0043`, `S0066`.') + description = CharFilter(help_text='Filter the results by the `description` property of the object. Search is a wildcard, so `exploit` will return all descriptions that contain the string `exploit`.') + name = CharFilter(help_text='Filter the results by the `name` property of the object. Search is a wildcard, so `exploit` will return all names that contain the string `exploit`.') + type = ChoiceFilter(choices=[(f,f) for f in ATTACK_TYPES], help_text='Filter the results by STIX Object type.') + alias = CharFilter(help_text='Filter the results by the `x_mitre_aliases` property of the object. Search is a wildcard, so `sun` will return all objects with x_mitre_aliases that contains the string `sun`, e.g `SUNBURST`.') + attack_type = ChoiceFilter(choices=[(f,f) for f in ATTACK_FORMS], help_text='Filter the results by Attack Object type.') + + @decorators.action(methods=['GET'], url_path="objects", detail=False) + def list_objects(self, request, *args, **kwargs): + return ArangoDBHelper('', request).get_attack_objects(self.matrix) + + @decorators.action(methods=['GET'], url_path="objects/", detail=False) + def retrieve_objects(self, request, *args, attack_id=None, **kwargs): + return ArangoDBHelper(f'nvd_cve_vertex_collection', request).get_object_by_external_id(attack_id, revokable=True) + + @decorators.action(methods=['GET'], url_path="objects//relationships", detail=False) + def retrieve_object_relationships(self, request, *args, attack_id=None, **kwargs): + return ArangoDBHelper(f'nvd_cve_vertex_collection', request).get_object_by_external_id(attack_id, relationship_mode=True, revokable=True) + + # @classmethod + # def attack_view(cls, matrix_name: str): + # matrix_name_human = matrix_name.title() + # if matrix_name == 'ics': + # matrix_name_human = "ICS" + + # @extend_schema_view( + # list_objects=extend_schema( + # summary=f"Search and filter MITRE ATT&CK {matrix_name_human} objects", + # description=textwrap.dedent( + # """ + # Search and filter MITRE ATT&CK objects. + # """ + # ), + # filters=True, + # ), + # retrieve_objects=extend_schema( + # summary=f"Get a specific MITRE ATT&CK {matrix_name_human} object by its ID", + # description=textwrap.dedent( + # """ + # Get a MITRE ATT&CK object by its MITRE ATT&CK ID (e.g. `T1659`, `TA0043`, `S0066`). + + # If you do not know the ID of the object you can use the GET MITRE ATT&CK Objects endpoint to find it. + # """ + # ), + # filters=False, + # ), + # retrieve_object_relationships=extend_schema( + # summary=f"Get the Relationships linked to the MITRE ATT&CK {matrix_name_human} Object", + # description=textwrap.dedent( + # """ + # This endpoint will return all the STIX `relationship` objects where the ATT&CK object is found as a `source_ref` or a `target_ref`. + + # If you want to see an overview of how MITRE ATT&CK objects are linked, [see this diagram](https://miro.com/app/board/uXjVKBgHZ2I=/). + + # MITRE ATT&CK objects can also be `target_ref` from CAPECs objects. Requires POST arango-cti-processor request using `capec-attack` mode for this data to show. + # """ + # ), + # ), + # ) + # class TempAttackView(cls): + # matrix = matrix_name + # openapi_tags = [f"ATT&CK {matrix_name_human}"] + # truncate_collections = [f"mitre_attack_{matrix}"] + # TempAttackView.__name__ = f'{matrix_name.title()}AttackView' + # return TempAttackView + +@extend_schema_view( + list_objects=extend_schema( + summary='Search and filter MITRE CWE objects', + description=textwrap.dedent( + """ + Search and filter MITRE CWE objects. + """ + ), + filters=True, + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + ), + retrieve_objects=extend_schema( + summary='Get a CWE object', + description=textwrap.dedent( + """ + Get an CWE object by its ID (e.g. `CWE-242` `CWE-250`). + + If you do not know the ID of the object you can use the GET MITRE CWE Objects endpoint to find it. + """ + ), + filters=False, + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + ), + retrieve_object_relationships=extend_schema( + summary='Get the Relationships linked to MITRE CWE Object', + description=textwrap.dedent( + """ + This endpoint will return all the STIX relationship objects where the CWE object is found as a `source_ref` or a `target_ref`. + + If you want to see an overview of how MITRE CWE objects are linked, [see this diagram](https://miro.com/app/board/uXjVKpOg6bM=/). + + MITRE CWE objects can also be `source_ref` to CAPEC objects. Requires POST arango-cti-processor request using `cwe-capec` mode for this data to show. + """ + ), + responses={200: ArangoDBHelper.get_paginated_response_schema('relationships', 'relationship'), 400: DEFAULT_400_ERROR}, + parameters=ArangoDBHelper.get_relationship_schema_operation_parameters(), + ), +) +class CweView(viewsets.ViewSet): + openapi_tags = ["CWE"] + truncate_collections = ['nvd_cve'] + lookup_url_kwarg = 'cwe_id' + openapi_path_params = [ + OpenApiParameter('stix_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The STIX ID (e.g. `weakness--f3496f30-5625-5b6d-8297-ddc074fb26c2`, `grouping--000ee024-ad9c-5557-8d49-2573a8e788d2`)'), + OpenApiParameter('cwe_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The CWE ID, e.g `CWE-242`, `CWE-250`'), + ] + + filter_backends = [DjangoFilterBackend] + + serializer_class = serializers.StixObjectsSerializer(many=True) + pagination_class = Pagination("objects") + + class filterset_class(FilterSet): + id = BaseCSVFilter(help_text='Filter the results using the STIX ID of an object. e.g. `weakness--f3496f30-5625-5b6d-8297-ddc074fb26c2`, `grouping--000ee024-ad9c-5557-8d49-2573a8e788d2`.') + cwe_id = BaseCSVFilter(help_text='Filter the results by the CWE ID of the object. e.g. `CWE-242` `CWE-250`.') + description = CharFilter(help_text='Filter the results by the `description` property of the object. Search is a wildcard, so `exploit` will return all descriptions that contain the string `exploit`.') + name = CharFilter(help_text='Filter the results by the `name` property of the object. Search is a wildcard, so `exploit` will return all names that contain the string `exploit`.') + # type = ChoiceFilter(choices=[(f,f) for f in CWE_TYPES], help_text='Filter the results by STIX Object type.') + + + + @decorators.action(methods=['GET'], url_path="objects", detail=False) + def list_objects(self, request, *args, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_weakness_or_capec_objects('cve-cwe') + + @decorators.action(methods=['GET'], url_path="objects/", detail=False) + def retrieve_objects(self, request, *args, cwe_id=None, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_object_by_external_id(cwe_id, 'cve-cwe') + + + @decorators.action(methods=['GET'], url_path="objects//relationships", detail=False) + def retrieve_object_relationships(self, request, *args, cwe_id=None, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_object_by_external_id(cwe_id, 'cve-cwe', relationship_mode=True) + +@extend_schema_view( + list_objects=extend_schema( + summary='Search and filter MITRE CAPEC objects', + description=textwrap.dedent( + """ + Search and filter MITRE CAPEC objects. + """ + ), + filters=True, + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + ), + retrieve_objects=extend_schema( + summary='Get a CAPEC object', + description=textwrap.dedent( + """ + Get a CAPEC object by its ID (e.g. `CAPEC-112`, `CAPEC-699`). + + If you do not know the ID of the object you can use the GET MITRE CAPEC Objects endpoint to find it. + """ + ), + filters=False, + responses={200: serializers.StixObjectsSerializer(many=True), 400: DEFAULT_400_ERROR}, + ), + retrieve_object_relationships=extend_schema( + summary='Get the Relationships linked to MITRE CAPEC Object', + description=textwrap.dedent( + """ + This endpoint will return all the STIX relationship objects where the CAPEC object is found as a `source_ref` or a `target_ref`. + + MITRE CAPEC objects can also be `source_ref` from ATT&CK Enterprise objects. Requires POST arango-cti-processor request using `capec-attack` mode for this data to show. + + MITRE CAPEC objects can also be `target_ref` to CWE objects. Requires POST arango-cti-processor request using `cwe-capec` mode for this data to show. + """ + ), + responses={200: ArangoDBHelper.get_paginated_response_schema('relationships', 'relationship'), 400: DEFAULT_400_ERROR}, + parameters=ArangoDBHelper.get_relationship_schema_operation_parameters(), + ), +) +class CapecView(viewsets.ViewSet): + openapi_tags = ["CAPEC"] + truncate_collections = ['nvd_cve'] + lookup_url_kwarg = 'capec_id' + openapi_path_params = [ + OpenApiParameter('stix_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The STIX ID (e.g. `attack-pattern--00268a75-3243-477d-9166-8c78fddf6df6`, `course-of-action--0002fa37-9334-41e2-971a-cc8cab6c00c4`)'), + OpenApiParameter('capec_id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH, description='The CAPEC ID, e.g `CAPEC-112`, `CAPEC-699`'), + ] + + filter_backends = [DjangoFilterBackend] + + serializer_class = serializers.StixObjectsSerializer(many=True) + pagination_class = Pagination("objects") + + class filterset_class(FilterSet): + id = BaseCSVFilter(help_text='Filter the results using the STIX ID of an object. e.g. `attack-pattern--00268a75-3243-477d-9166-8c78fddf6df6`, `course-of-action--0002fa37-9334-41e2-971a-cc8cab6c00c4`.') + capec_id = BaseCSVFilter(help_text='Filter the results by the CAPEC ID of the object. e.g. `CAPEC-112`, `CAPEC-699`.') + description = CharFilter(help_text='Filter the results by the `description` property of the object. Search is a wildcard, so `exploit` will return all descriptions that contain the string `exploit`.') + name = CharFilter(help_text='Filter the results by the `name` property of the object. Search is a wildcard, so `exploit` will return all names that contain the string `exploit`.') + type = ChoiceFilter(choices=[(f,f) for f in CAPEC_TYPES], help_text='Filter the results by STIX Object type.') + + + @decorators.action(methods=['GET'], url_path="objects", detail=False) + def list_objects(self, request, *args, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_weakness_or_capec_objects('cve-capec', types=CAPEC_TYPES, lookup_kwarg=self.lookup_url_kwarg) + + + @decorators.action(methods=['GET'], url_path="objects/", detail=False) + def retrieve_objects(self, request, *args, capec_id=None, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_object_by_external_id(capec_id, 'cve-capec') + + + @decorators.action(methods=['GET'], url_path="objects//relationships", detail=False) + def retrieve_object_relationships(self, request, *args, capec_id=None, **kwargs): + return ArangoDBHelper('nvd_cve_vertex_collection', request).get_object_by_external_id(capec_id, 'cve-capec', relationship_mode=True) + \ No newline at end of file diff --git a/vulmatch/urls.py b/vulmatch/urls.py index 059524b..9135d15 100644 --- a/vulmatch/urls.py +++ b/vulmatch/urls.py @@ -21,6 +21,7 @@ from drf_spectacular.views import SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView from vulmatch.server import views +from vulmatch.server.ctibutler_views import ctibutler_views import dogesec_commons.objects.views as arango_views @@ -43,6 +44,11 @@ router.register('objects/sdos', arango_views.SDOView, "object-view-sdo") router.register("object", arango_views.SingleObjectView, "object-view-orig") +### more views +router.register("cwe", ctibutler_views.CweView, "cwe-view") +router.register("capec", ctibutler_views.CapecView, "capec-view") +router.register("attack", ctibutler_views.AttackView, "attack-view") + urlpatterns = [ path(f'api/{API_VERSION}/', include(router.urls)), path('admin/', admin.site.urls), diff --git a/vulmatch/worker/populate_dbs.py b/vulmatch/worker/populate_dbs.py index b82df67..22e8359 100644 --- a/vulmatch/worker/populate_dbs.py +++ b/vulmatch/worker/populate_dbs.py @@ -45,6 +45,8 @@ def create_indexes(db: StandardDatabase): *[dict(name=f'x_cpe_struct.{name}', analyzer='norm_en') for name in ['product', 'vendor', 'version', 'update', 'edition', 'language', 'sw_edition', 'target_sw', 'target_hw', 'other']], "x_cpe_struct.part" ], inBackground=True)) + vertex_collection.add_index(dict(type='persistent', fields=["_arango_cve_processor_note", "type"], inBackground=True, name=f"acvep_imports-type", sparse=True)) + def create_collections():