Skip to content

Commit

Permalink
Merge pull request #206 from SigmaHQ/query_expressions
Browse files Browse the repository at this point in the history
Correlation query typing phase, query expressions
  • Loading branch information
thomaspatzke authored Mar 26, 2024
2 parents cece15b + de73540 commit 2c29ae1
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 21 deletions.
123 changes: 108 additions & 15 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from collections import ChainMap, defaultdict
import re

from pyparsing import Set
Expand Down Expand Up @@ -211,6 +211,7 @@ def convert_rule(self, rule: SigmaRule, output_format: Optional[str] = None) ->
for index, query in enumerate(queries)
]
rule.set_conversion_result(finalized_queries)
rule.set_conversion_states(states)
if rule._output:
return finalized_queries
else:
Expand Down Expand Up @@ -714,6 +715,20 @@ class variables. If this is not sufficient, the respective methods can be implem
None # Token inserted between field and value (without separator)
)

# Query structure
# The generated query can be embedded into further structures. One common example are data
# source commands that are prepended to the matching condition and specify data repositories or
# tables from which the data is queried.
# This is specified as format string that contains the following placeholders:
# * {query}: The generated query
# * {rule}: The Sigma rule from which the query was generated
# * {state}: Conversion state at the end of query generation. This state is initialized with the
# pipeline state.
query_expression: ClassVar[str] = "{query}"
# The following dict defines default values for the conversion state. They are used if
# the respective state is not set.
state_defaults: ClassVar[Dict[str, str]] = dict()

# String output
## Fields
### Quoting
Expand Down Expand Up @@ -870,6 +885,7 @@ class variables. If this is not sufficient, the respective methods can be implem
# The correlation query frame is the basic structure of a correlation query for each correlation
# type. It contains the following placeholders:
# * {search} is the search expression generated by the correlation query search phase.
# * {typing} is the event typing expression generated by the correlation query typing phase.
# * {aggregate} is the aggregation expression generated by the correlation query aggregation
# phase.
# * {condition} is the condition expression generated by the correlation query condition phase.
Expand Down Expand Up @@ -908,6 +924,25 @@ class variables. If this is not sufficient, the respective methods can be implem
# * A joiner string that is put between each search_multi_rule_query_expression:
correlation_search_multi_rule_query_expression_joiner: ClassVar[Optional[str]] = None

## Correlation query typing phase (optional)
# Event typing expression. In some query languages the initial search query only allows basic
# boolean expressions without the possibility to mark the matched events with a type, which is
# especially required by temporal correlation rules to distinguish between the different matched
# event types.
# This is the template for the event typing expression that is used to mark the matched events.
# It contains only a {queries} placeholder that is replaced by the result of joining
# typing_rule_query_expression with typing_rule_query_expression_joiner defined afterwards.
typing_expression: ClassVar[Optional[str]] = None
# This is the template for the event typing expression for each query generated from the
# referred Sigma rules. It contains the following placeholders:
# * {rule} is the referred Sigma rule.
# * {ruleid} is the rule name or if not available the id of the rule.
# * {query} is the query generated from the referred Sigma rule.
typing_rule_query_expression: ClassVar[Optional[str]] = None
# String that is used to join the event typing expressions for each rule query referred by the
# correlation rule:
typing_rule_query_expression_joiner: ClassVar[Optional[str]] = None

# Event field normalization expression. This is used to normalize field names in events matched
# by the Sigma rules referred by the correlation rule. This is a dictionary mapping from
# correlation_method names to format strings hat can contain the following placeholders:
Expand Down Expand Up @@ -1546,6 +1581,7 @@ def convert_correlation_rule_from_template(
return [
template[method].format(
search=self.convert_correlation_search(rule),
typing=self.convert_correlation_typing(rule),
aggregate=self.convert_correlation_aggregation_from_template(
rule, correlation_type, method
),
Expand Down Expand Up @@ -1591,6 +1627,7 @@ def convert_correlation_temporal_ordered_rule(
def convert_correlation_search(
self,
rule: SigmaCorrelationRule,
**kwargs,
) -> str:
if ( # if the correlation rule refers only a single rule and this rule results only in a single query
len(rule.rules) == 1
Expand All @@ -1603,6 +1640,7 @@ def convert_correlation_search(
normalization=self.convert_correlation_search_field_normalization_expression(
rule.aliases, rule_reference
),
**kwargs,
)
else:
return self.correlation_search_multi_rule_expression.format(
Expand All @@ -1611,7 +1649,9 @@ def convert_correlation_search(
self.correlation_search_multi_rule_query_expression.format(
rule=rule_reference.rule,
ruleid=rule_reference.rule.name or rule_reference.rule.id,
query=query,
query=self.convert_correlation_search_multi_rule_query_postprocess(
query
),
normalization=self.convert_correlation_search_field_normalization_expression(
rule.aliases,
rule_reference,
Expand All @@ -1620,33 +1660,69 @@ def convert_correlation_search(
for rule_reference in rule.rules
for query in rule_reference.rule.get_conversion_result()
)
)
),
**kwargs,
)

def convert_correlation_search_multi_rule_query_postprocess(
self,
query: str,
) -> str:
"""This function is called for each query in the multi-rule correlation search phase. It can be used to postprocess the query before it is joined with the other queries."""
return query

def convert_correlation_search_field_normalization_expression(
self,
aliases: SigmaCorrelationFieldAliases,
rule_reference: SigmaRule,
) -> str:
if (
if len(aliases) == 0:
return ""
elif (
self.correlation_search_field_normalization_expression is None
or self.correlation_search_field_normalization_expression_joiner is None
):
raise NotImplementedError(
"Correlation field normalization is not supported by backend."
)
else:
return self.correlation_search_field_normalization_expression_joiner.join(
(
self.correlation_search_field_normalization_expression.format(
alias=alias.alias,
field=field,
)
for alias in aliases
for alias_rule_reference, field in alias.mapping.items()
if alias_rule_reference == rule_reference
)
)

return self.correlation_search_field_normalization_expression_joiner.join(
(
self.correlation_search_field_normalization_expression.format(
alias=alias.alias,
field=field,
# Implementation of the typing phase of the correlation query.
def convert_correlation_typing(self, rule: SigmaCorrelationRule) -> str:
if self.typing_expression is None:
return ""
else:
return self.typing_expression.format(
queries=self.typing_rule_query_expression_joiner.join(
(
self.typing_rule_query_expression.format(
rule=rule_reference.rule,
ruleid=rule_reference.rule.name or rule_reference.rule.id,
query=self.convert_correlation_typing_query_postprocess(query),
)
for rule_reference in rule.rules
for query in rule_reference.rule.get_conversion_result()
)
)
for alias in aliases
for alias_rule_reference, field in alias.mapping.items()
if alias_rule_reference == rule_reference
)
)

def convert_correlation_typing_query_postprocess(
self,
query: str,
) -> str:
"""This function is called for each query in the typing phase of the correlation query. It can be used to postprocess the query before it is joined with the other queries."""
return query

# Implementation of the aggregation phase of the correlation query.
def convert_correlation_aggregation_from_template(
Expand Down Expand Up @@ -1746,12 +1822,19 @@ def finalize_query(
Finalize query by appending deferred query parts to the main conversion result as specified
with deferred_start and deferred_separator.
"""
# TODO when Python 3.8 is dropped: replace ChainMap with | operator.
conversion_state = ChainMap(state.processing_state, self.state_defaults)

if state.has_deferred():
if isinstance(query, DeferredQueryExpression):
query = self.deferred_only_query
return super().finalize_query(
rule,
query
self.query_expression.format(
query=query,
rule=rule,
state=conversion_state,
)
+ self.deferred_start
+ self.deferred_separator.join(
(
Expand All @@ -1764,4 +1847,14 @@ def finalize_query(
output_format,
)
else:
return super().finalize_query(rule, query, index, state, output_format)
return super().finalize_query(
rule,
self.query_expression.format(
query=query,
rule=rule,
state=conversion_state,
),
index,
state,
output_format,
)
3 changes: 3 additions & 0 deletions sigma/correlations.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ class SigmaCorrelationFieldAliases:
def __iter__(self):
return iter(self.aliases.values())

def __len__(self):
return len(self.aliases)

@classmethod
def from_dict(cls, d: dict):
aliases = {}
Expand Down
20 changes: 18 additions & 2 deletions sigma/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,8 @@ class SigmaRuleBase:
references: List[str] = field(default_factory=list)
tags: List[SigmaRuleTag] = field(default_factory=list)
author: Optional[str] = None
date: Optional[date] = None
modified: Optional[date] = None
date: Optional["datetime.date"] = None
modified: Optional["datetime.date"] = None
fields: List[str] = field(default_factory=list)
falsepositives: List[str] = field(default_factory=list)
level: Optional[SigmaLevel] = None
Expand All @@ -758,6 +758,9 @@ class SigmaRuleBase:
_conversion_result: Optional[List[Any]] = field(
init=False, default=None, repr=False, compare=False
)
_conversion_states: Optional[List["sigma.conversion.state.ConversionState"]] = field(
init=False, default=None, repr=False, compare=False
)
_output: bool = field(init=False, default=True, repr=False, compare=False)

def __post_init__(self):
Expand Down Expand Up @@ -1058,6 +1061,19 @@ def get_conversion_result(self) -> List[Any]:
)
return self._conversion_result

def set_conversion_states(self, state: List["sigma.conversion.state.ConversionState"]):
"""Set conversion state."""
self._conversion_states = state

def get_conversion_states(self) -> List["sigma.conversion.state.ConversionState"]:
"""Get conversion state."""
if self._conversion_states is None:
raise sigma_exceptions.SigmaConversionError(
self,
"Conversion state not available",
)
return self._conversion_states

def disable_output(self):
"""Disable output of rule."""
self._output = False
Expand Down
84 changes: 80 additions & 4 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from sigma.backends.test import TextQueryTestBackend
from sigma.collection import SigmaCollection
from sigma.conversion.base import TextQueryBackend
from sigma.conversion.state import ConversionState
from sigma.processing.conditions import IncludeFieldCondition
from sigma.processing.finalization import ConcatenateQueriesFinalizer
from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem, QueryPostprocessingItem
Expand All @@ -21,7 +22,7 @@


@pytest.fixture
def test_backend():
def test_backend() -> TextQueryTestBackend:
return TextQueryTestBackend(
ProcessingPipeline(
[
Expand All @@ -42,6 +43,8 @@ def test_backend():
field_name_conditions=[IncludeFieldCondition(["prefix"])],
),
ProcessingItem(SetStateTransformation("index", "test")),
ProcessingItem(SetStateTransformation("data_source", "state_source")),
ProcessingItem(SetStateTransformation("output", "state_output")),
]
),
)
Expand Down Expand Up @@ -2216,6 +2219,46 @@ def test_convert_list_cidr_wildcard_asterisk(test_backend, monkeypatch):


def test_convert_state(test_backend):
rules = SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA: value
condition: sel
"""
)

assert test_backend.convert(
rules,
"state",
) == ['index=test (mappedA="value")']
assert rules[0].get_conversion_states() == [
ConversionState(
processing_state={
"index": "test",
"data_source": "state_source",
"output": "state_output",
}
)
]


def test_convert_query_expression(monkeypatch, test_backend: TextQueryTestBackend):
monkeypatch.setattr(
test_backend,
"query_expression",
"| from {state[data_source]} | where {query} | output {state[output]}",
)
monkeypatch.setattr(
test_backend,
"state_defaults",
{"data_source": "default_source", "output": "default_output"},
)
assert (
test_backend.convert(
SigmaCollection.from_yaml(
Expand All @@ -2230,10 +2273,43 @@ def test_convert_state(test_backend):
fieldA: value
condition: sel
"""
),
"state",
)
)
== ['| from state_source | where mappedA="value" | output state_output']
)


def test_convert_query_expression_defaults(
monkeypatch,
test_backend: TextQueryTestBackend,
):
monkeypatch.setattr(
test_backend,
"query_expression",
"| from {state[other_data_source]} | where {query} | output {state[other_output]}",
)
monkeypatch.setattr(
test_backend,
"state_defaults",
{"other_data_source": "default_source", "other_output": "default_output"},
)
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA: value
condition: sel
"""
)
)
== ['index=test (mappedA="value")']
== ['| from default_source | where mappedA="value" | output default_output']
)


Expand Down
Loading

0 comments on commit 2c29ae1

Please sign in to comment.