Skip to content

Commit

Permalink
Processing condition negation
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Apr 3, 2022
1 parent 549761a commit c3edabb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
36 changes: 23 additions & 13 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class ProcessingItem:
"""
transformation : Transformation
rule_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all
rule_condition_negation : bool = False
rule_conditions : List[RuleProcessingCondition] = field(default_factory=list)
detection_item_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all
detection_item_condition_negation : bool = False
detection_item_conditions : List[DetectionItemProcessingCondition] = field(default_factory=list)
identifier : Optional[str] = None

Expand Down Expand Up @@ -70,6 +72,9 @@ def from_dict(cls, d : dict):
rule_condition_linking = condition_linking[d.get("rule_cond_op", "and")] # default: conditions are linked with and operator
detection_item_condition_linking = condition_linking[d.get("detection_item_cond_op", "and")] # same for detection item conditions

rule_condition_negation = d.get("rule_cond_not", False)
detection_item_condition_negation = d.get("detection_item_cond_not", False)

# Transformation
try:
transformation_class_name = d["type"]
Expand All @@ -84,14 +89,14 @@ def from_dict(cls, d : dict):
params = {
k: v
for k, v in d.items()
if k not in {"rule_conditions", "rule_cond_op", "detection_item_conditions", "detection_item_cond_op", "type", "id"}
if k not in {"rule_conditions", "rule_cond_op", "rule_cond_not", "detection_item_conditions", "detection_item_cond_op", "detection_item_cond_not", "type", "id"}
}
try:
transformation = transformation_class(**params)
except (SigmaConfigurationError, TypeError) as e:
raise SigmaConfigurationError("Error in transformation: " + str(e)) from e

return cls(transformation, rule_condition_linking, rule_conds, detection_item_condition_linking, detection_item_conds, identifier)
return cls(transformation, rule_condition_linking, rule_condition_negation, rule_conds, detection_item_condition_linking, detection_item_condition_negation, detection_item_conds, identifier)

def __post_init__(self):
self.transformation.set_processing_item(self) # set processing item in transformation object after it is instantiated
Expand All @@ -101,23 +106,28 @@ def apply(self, pipeline : "ProcessingPipeline", rule : SigmaRule) -> Tuple[Sigm
Matches condition against rule and performs transformation if condition is true or not present.
Returns Sigma rule and bool if transformation was applied.
"""
if not self.rule_conditions or \
self.rule_condition_linking([
condition.match(pipeline, rule)
for condition in self.rule_conditions
]): # apply transformation if conditions match or no condition defined
cond_result = self.rule_condition_linking([
condition.match(pipeline, rule)
for condition in self.rule_conditions
])
if self.rule_condition_negation:
cond_result = not cond_result
if not self.rule_conditions or cond_result: # apply transformation if conditions match or no condition defined
self.transformation.apply(pipeline, rule)
return True
else: # just pass rule through
return False

def match_detection_item(self, pipeline : "ProcessingPipeline", detection_item : SigmaDetectionItem) -> bool:
"""Evalutates detection item conditions from processing item to detection item and returns result."""
return not self.detection_item_conditions or \
self.detection_item_condition_linking([
condition.match(pipeline, detection_item)
for condition in self.detection_item_conditions
])
"""Evalutates detection item conditions from processing item to detection item and returns
result."""
cond_result = self.detection_item_condition_linking([
condition.match(pipeline, detection_item)
for condition in self.detection_item_conditions
])
if self.detection_item_condition_negation:
cond_result = not cond_result
return not self.detection_item_conditions or cond_result

@dataclass
class ProcessingPipeline:
Expand Down
28 changes: 25 additions & 3 deletions tests/test_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,34 @@ def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipelin
applied = processing_item.apply(dummy_processing_pipeline, sigma_rule)
assert not applied and sigma_rule.title == "Test"

def test_processingitem_apply_notapplied_any_without_true(dummy_processing_pipeline, sigma_rule):
def test_processingitem_apply_negated_true(dummy_processing_pipeline, sigma_rule):
processing_item = ProcessingItem(
transformation=TransformationAppend(s="Test"),
rule_condition_linking=any,
rule_condition_negation=True,
rule_conditions=[
RuleConditionTrue(dummy="test-true"),
],
)
applied = processing_item.apply(dummy_processing_pipeline, sigma_rule)
assert not applied and sigma_rule.title == "Test"

def test_processingitem_apply_negated_false(dummy_processing_pipeline, sigma_rule):
processing_item = ProcessingItem(
transformation=TransformationAppend(s="Test"),
rule_condition_negation=True,
rule_conditions=[
RuleConditionFalse(dummy="test-false"),
],
)
applied = processing_item.apply(dummy_processing_pipeline, sigma_rule)
assert applied and sigma_rule.title == "TestTest"

def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipeline, sigma_rule):
processing_item = ProcessingItem(
transformation=TransformationAppend(s="Test"),
rule_condition_linking=all,
rule_conditions=[
RuleConditionFalse(dummy="test-true"),
RuleConditionTrue(dummy="test-true"),
RuleConditionFalse(dummy="test-false"),
],
)
Expand Down

0 comments on commit c3edabb

Please sign in to comment.