Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.35"
version = "1.6.36"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
98 changes: 98 additions & 0 deletions tests/test_all_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uuid
from collections import defaultdict
from pathlib import Path
from typing import ClassVar

import eql
import kql
Expand Down Expand Up @@ -1572,3 +1573,100 @@ def test_eql_non_sequence_support_only(self):
# is_sequence method not yet available during schema validation
# so we have to check in a unit test
self.fail(f"{self.rule_str(rule)} Sequence rules cannot have alert suppression")


class TestEQLEventFieldUsage(BaseRuleTest):
"""Test that EQL rules reference fields mapped in the endpoint integration schema."""

SAFE_EVENT_TYPES: ClassVar[set[str]] = {"process", "any"}
ALERT_INDEX_HINTS: ClassVar[tuple[str, ...]] = (
".alerts-security.",
"logs-endpoint.alerts",
"-alerts-",
)
EVENT_TYPE_TO_DATASET: ClassVar[dict[str, str]] = {
"file": "file",
"network": "network",
"registry": "registry",
"library": "library",
"dns": "network",
}

@classmethod
def _bad_fields_for_event_query(
cls,
event_query: eql.ast.EventQuery,
schema_by_dataset: dict[str, dict[str, str]],
) -> tuple[str, list[str]] | None:
if event_query.event_type in cls.SAFE_EVENT_TYPES:
return None
dataset = cls.EVENT_TYPE_TO_DATASET.get(event_query.event_type)
if not dataset or dataset not in schema_by_dataset:
return None
schema_fields = schema_by_dataset[dataset]
bad_fields = sorted(
{
str(node)
for node in event_query
if isinstance(node, eql.ast.Field)
and str(node).startswith("process.")
and str(node) not in schema_fields
}
)
return (dataset, bad_fields) if bad_fields else None

def test_process_fields_present_in_endpoint_schema(self):
"""Ensure process.* fields used in non-process EQL clauses exist in the endpoint integration schema."""
load_integrations_schemas.clear()
schemas = load_integrations_schemas()
endpoint_versions = schemas.get("endpoint", {})
if not endpoint_versions:
self.skipTest("endpoint integration schema not available")
latest = sorted(endpoint_versions, key=Version.parse)[-1]
schema_by_dataset = endpoint_versions[latest]

offenders: list[str] = []
for rule in self.all_rules:
data = rule.contents.data
if not isinstance(data, EQLRuleData):
continue

indices = data.get("index") or []
if not any("logs-endpoint.events." in idx for idx in indices):
continue
if any(any(hint in idx for hint in self.ALERT_INDEX_HINTS) for idx in indices):
continue

try:
rule_ast = data.ast
except Exception: # noqa: BLE001, S112
continue
if rule_ast is None:
continue

first = rule_ast.first
event_queries = (
[sq.query for sq in first.queries] if isinstance(first, (eql.ast.Sequence, eql.ast.Join)) else [first]
)

for event_query in event_queries:
if not isinstance(event_query, eql.ast.EventQuery):
continue
result = self._bad_fields_for_event_query(event_query, schema_by_dataset)
if result is None:
continue
dataset, bad_fields = result
offenders.append(
f"{self.rule_str(rule, trailer=None)} references {bad_fields} "
f"inside `{event_query.event_type} where` but the endpoint integration "
f"schema (dataset `{dataset}`) does not include these fields"
)
break

if offenders:
self.fail(
"process.* fields below are not in the endpoint integration schema for the "
"targeted dataset, so the predicates referencing them never match. Move the "
"check into a `process where` clause (e.g. via an EQL sequence) or use a field "
"that the dataset actually populates.\nOffenders:\n - " + "\n - ".join(offenders)
)
Loading