Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
188 changes: 178 additions & 10 deletions detection_rules/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import json
import re
import time
from collections import OrderedDict
from collections import OrderedDict, defaultdict
from pathlib import Path
from typing import Any

import requests
from semver import Version

from .schemas import definitions
from .utils import cached, clear_caches, get_etc_glob_path, get_etc_path, gzip_compress, read_gzip

PLATFORMS = ["Windows", "macOS", "Linux"]
Expand Down Expand Up @@ -92,13 +93,54 @@ def load_attack_gz() -> dict[str, Any]:
for val in matrix.values():
val.sort(key=lambda tid: technique_lookup[tid]["name"].lower())

# External tactic id (TAxxxx) -> current MITRE ATT&CK data display name (e.g. TA0005 -> Stealth after v19 rename).
tactic_id_to_name: dict[str, str] = {ext_id: name for name, ext_id in tactics_map.items()}

technique_lookup = OrderedDict(sorted(technique_lookup.items()))
techniques = sorted({v["name"] for _, v in technique_lookup.items()})
technique_id_list = [t for t in technique_lookup if "." not in t]
sub_technique_id_list = [t for t in technique_lookup if "." in t]


def secondary_matrix_preferences_for_tactic_id(tactic_external_id: str) -> tuple[str, ...]:
"""Ordered extra tactic *names* when the row's primary tactic column does not contain a technique.

Source: ``definitions.ATTACK_TACTIC_MIGRATION_SECONDARY_MATRIX_PREFERENCES``.
"""
return definitions.ATTACK_TACTIC_MIGRATION_SECONDARY_MATRIX_PREFERENCES.get(tactic_external_id, ())


def filename_legacy_stem_prefixes() -> tuple[str, ...]:
"""Basename prefixes (e.g. ``defense_evasion_``) to rewrite using ``threat[0]`` tactic slug after a rename.

Source: ``definitions.ATTACK_TACTIC_MIGRATION_FILENAME_LEGACY_STEM_PREFIXES``.
"""
return definitions.ATTACK_TACTIC_MIGRATION_FILENAME_LEGACY_STEM_PREFIXES


@cached
def priority_mitre_tactic_display_names_from_migration_hints() -> tuple[str, ...]:
"""Tactic names that sort before others when saving ``rule.threat`` during migrate / update-rules.

**Where:** ``devtools._mitre_threat_persist_sort_key`` only.

**Why:** Rules list multiple tactics; CI expects filenames to match ``threat[0]``. Without this,
pure alphabetical order can put unrelated tactics first after MITRE renames legacy rows.

**How:** For each key in ``definitions.ATTACK_TACTIC_MIGRATION_SECONDARY_MATRIX_PREFERENCES``, take that
TA's current display name from ``tactic_id_to_name``, then each tuple entry, deduplicated.
"""
order: list[str] = []
for tactic_id, secondaries in definitions.ATTACK_TACTIC_MIGRATION_SECONDARY_MATRIX_PREFERENCES.items():
primary = tactic_id_to_name.get(tactic_id)
if primary and primary not in order:
order.append(primary)
for name in secondaries:
if name not in order:
order.append(name)
return tuple(order)


def refresh_attack_data(save: bool = True) -> tuple[dict[str, Any] | None, bytes | None]:
"""Refresh ATT&CK data from Mitre."""
attack_path = get_attack_file_path()
Expand Down Expand Up @@ -139,9 +181,136 @@ def get_version_from_tag(name: str, pattern: str = "att&ck-v") -> str:
return attack_data, compressed


def resolve_redirected_technique_id(tid: str) -> str:
"""Walk ``attack-technique-redirects`` until the ID is no longer remapped.

Revoked or merged techniques may need one or more hops. The result is the ID MITRE uses in
the current bundle, so ``matrix`` and ``technique_lookup`` lookups succeed.

Used by tactic inference (grouping) and by ``build_threat_map_entry`` when validating
techniques under a known tactic.
"""
techniques_redirect_map = load_techniques_redirect()
while tid in techniques_redirect_map:
tid = techniques_redirect_map[tid]
return tid


def retain_tactic_display_if_id_and_techniques_still_match(
tactic_external_id: str, raw_technique_ids: list[str]
) -> str | None:
"""When ``tactic.name`` on disk is stale but ``tactic.id`` is still a bundle tactic id.

If every resolved technique id still appears under that id's **current** display name in
``matrix``, return that name (rename-only). Otherwise return ``None`` so callers infer tactics
from technique IDs (see ``tactic_assignment_for_technique`` and
``secondary_matrix_preferences_for_tactic_id``).

**Why single-bucket retention:** multi-phase techniques may list several tactics; when the
whole row still fits the current name for the same ``tactic.id``, keep one row.
"""
current_name = tactic_id_to_name.get(tactic_external_id)
if not current_name:
return None
bucket = matrix.get(current_name)
if not bucket:
return None
for raw_tid in raw_technique_ids:
if resolve_redirected_technique_id(raw_tid) not in bucket:
return None
return current_name


def tactic_assignment_for_technique(
resolved_tid: str,
*,
row_tactic_external_id: str | None = None,
secondary_matrix_preferences: tuple[str, ...] = (),
) -> str:
"""Pick one enterprise tactic for ``resolved_tid`` when building ``[[rule.threat]]`` rows.

**Order:** (1) If ``row_tactic_external_id`` is a live bundle tactic id, use its **current**
display name when that technique appears in that tactic's matrix column—honors TA0005→Stealth
(etc.) over lowest-``TAxxxx`` when MITRE lists the technique under multiple phases. (2) Else
each name in ``secondary_matrix_preferences`` (from
``definitions.ATTACK_TACTIC_MIGRATION_SECONDARY_MATRIX_PREFERENCES``).
(3) Else lowest MITRE tactic id among candidates.

**Callers:** ``group_technique_ids_by_matrix_tactic`` (and ``choose_canonical_tactic_for_technique``
for backward compatibility with no row context).

**Raises:** if the ID is unknown or not placed in any enterprise tactic in the loaded matrix.
"""
if resolved_tid not in technique_lookup:
raise ValueError(f"Unknown technique ID: {resolved_tid}")
candidates = [tactic for tactic, techs in matrix.items() if resolved_tid in techs]
if not candidates:
raise ValueError(f"Technique {resolved_tid} is not mapped to any enterprise tactic in the loaded ATT&CK matrix")
if row_tactic_external_id and row_tactic_external_id in tactic_id_to_name:
primary_name = tactic_id_to_name[row_tactic_external_id]
if primary_name in candidates:
return primary_name
for preferred in secondary_matrix_preferences:
if preferred in candidates:
return preferred
return min(candidates, key=lambda t: tactics_map[t])


def choose_canonical_tactic_for_technique(resolved_tid: str) -> str:
"""Lowest-``TAxxxx`` tie-break only—no row context. Prefer ``tactic_assignment_for_technique``."""
return tactic_assignment_for_technique(resolved_tid)


def group_technique_ids_by_matrix_tactic(
technique_ids: list[str], *, row_tactic_external_id: str | None = None
) -> dict[str, list[str]]:
"""Partition technique IDs by which enterprise tactic they should live under in the current bundle.

**``row_tactic_external_id``:** when remapping a row whose ``tactic.id`` is still valid in the
bundle (e.g. ``TA0005``) but ``tactic.name`` is stale, pass it so techniques still mapped under
that id's current name stay there before tie-break. Secondary preferences come from
``secondary_matrix_preferences_for_tactic_id``.
"""
secondaries: tuple[str, ...] = ()
if row_tactic_external_id and row_tactic_external_id in tactic_id_to_name:
secondaries = secondary_matrix_preferences_for_tactic_id(row_tactic_external_id)
groups: dict[str, list[str]] = defaultdict(list)
seen: set[str] = set()
for raw_tid in technique_ids:
if raw_tid in seen:
continue
seen.add(raw_tid)
resolved = resolve_redirected_technique_id(raw_tid)
row_id = row_tactic_external_id if row_tactic_external_id in tactic_id_to_name else None
tactic = tactic_assignment_for_technique(
resolved,
row_tactic_external_id=row_id,
secondary_matrix_preferences=secondaries,
)
groups[tactic].append(raw_tid)
return {t: sorted(set(ids), key=lambda x: (x.split(".")[0], x)) for t, ids in groups.items()}


def rebuild_threat_dicts_from_technique_ids(
technique_ids: list[str], *, row_tactic_external_id: str | None = None
) -> list[dict[str, Any]]:
"""Produce full MITRE ``[[rule.threat]]``-shaped dicts when the tactic label on disk is unusable.

**``row_tactic_external_id``:** pass the row's MITRE tactic id when ``tactic.name`` is unknown to
the bundle but ``tactic.id`` is still valid (see ``group_technique_ids_by_matrix_tactic``).

**Returns:** empty list if ``technique_ids`` is empty.
"""
if not technique_ids:
return []
groups = group_technique_ids_by_matrix_tactic(
technique_ids, row_tactic_external_id=row_tactic_external_id
)
return [build_threat_map_entry(tactic, *groups[tactic]) for tactic in sorted(groups.keys())]


def build_threat_map_entry(tactic: str, *technique_ids: str) -> dict[str, Any]:
"""Build rule threat map from technique IDs."""
techniques_redirect_map = load_techniques_redirect()
url_base = "https://attack.mitre.org/{type}/{id}/"
tactic_id = tactics_map[tactic]
tech_entries: dict[str, Any] = {}
Expand All @@ -157,19 +326,18 @@ def make_entry(_id: str) -> dict[str, Any]:
# fail if deprecated or else convert if it has been replaced
if tid in deprecated:
raise ValueError(f"Technique ID: {tid} has been deprecated and should not be used")
if tid in techniques_redirect_map:
tid = techniques_redirect_map[tid] # noqa: PLW2901
resolved_id = resolve_redirected_technique_id(tid)

if tid not in matrix[tactic]:
raise ValueError(f"Technique ID: {tid} does not fall under tactic: {tactic}")
if resolved_id not in matrix[tactic]:
raise ValueError(f"Technique ID: {resolved_id} does not fall under tactic: {tactic}")

# sub-techniques
if "." in tid:
parent_technique, _ = tid.split(".", 1)
if "." in resolved_id:
parent_technique, _ = resolved_id.split(".", 1)
tech_entries.setdefault(parent_technique, make_entry(parent_technique))
tech_entries[parent_technique].setdefault("subtechnique", []).append(make_entry(tid))
tech_entries[parent_technique].setdefault("subtechnique", []).append(make_entry(resolved_id))
else:
tech_entries.setdefault(tid, make_entry(tid))
tech_entries.setdefault(resolved_id, make_entry(resolved_id))

entry: dict[str, Any] = {
"framework": "MITRE ATT&CK",
Expand Down
Loading
Loading