Skip to content

Commit

Permalink
Merge pull request #331 from SigmaHQ:typechecking-hb
Browse files Browse the repository at this point in the history
Fix some linting issues
  • Loading branch information
thomaspatzke authored Mar 8, 2025
2 parents 6503eff + d96cf3e commit 6d5a62c
Show file tree
Hide file tree
Showing 27 changed files with 34 additions and 97 deletions.
4 changes: 2 additions & 2 deletions sigma/backends/test/backend.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from collections import defaultdict
import re
from typing import Any, ClassVar, Dict, List, Optional, Pattern, Tuple, cast
from typing import Any, ClassVar, Dict, List, Optional, Pattern, cast

from sigma.conversion.base import TextQueryBackend
from sigma.conversion.state import ConversionState
from sigma.pipelines.test import dummy_test_pipeline
from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline
from sigma.processing.transformations import FieldMappingTransformation
from sigma.rule.rule import SigmaRule
from sigma.types import CompareOperators, SigmaCompareExpression
from sigma.types import CompareOperators


class TextQueryTestBackend(TextQueryBackend):
Expand Down
3 changes: 1 addition & 2 deletions sigma/collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import InitVar, dataclass, field
from functools import reduce
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, IO, cast
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, IO
from uuid import UUID

import yaml
Expand All @@ -15,7 +15,6 @@
)
from sigma.rule import SigmaRule, SigmaRuleBase
from sigma.filters import SigmaFilter
from typing import TypeVar, Union

NestedDict = Dict[str, Union[str, int, float, bool, None, "NestedDict"]]

Expand Down
2 changes: 1 addition & 1 deletion sigma/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ParseResults,
ParseException,
)
from typing import ClassVar, List, Literal, Optional, Union, Type, cast
from typing import ClassVar, List, Optional, Union, Type, cast
from sigma.types import SigmaType
from sigma.exceptions import SigmaConditionError, SigmaRuleLocation
import sigma
Expand Down
5 changes: 1 addition & 4 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from sigma.correlations import (
SigmaCorrelationCondition,
SigmaCorrelationConditionOperator,
SigmaCorrelationFieldAlias,
SigmaCorrelationFieldAliases,
SigmaCorrelationRule,
SigmaCorrelationTimespan,
Expand All @@ -18,7 +17,6 @@
from sigma.exceptions import (
ExceptionOnUsage,
SigmaBackendError,
SigmaConfigurationError,
SigmaConversionError,
SigmaError,
SigmaValueError,
Expand Down Expand Up @@ -47,7 +45,6 @@
ConditionNOT,
ConditionFieldEqualsValueExpression,
ConditionValueExpression,
ConditionType,
)
from sigma.types import (
CompareOperators,
Expand Down Expand Up @@ -1465,7 +1462,7 @@ def convert_condition_field_eq_val(
def is_parent_not(
cond: Union[
ConditionItem, ConditionFieldEqualsValueExpression, ConditionValueExpression
]
],
) -> bool:
if cond.parent is None:
return False
Expand Down
22 changes: 11 additions & 11 deletions sigma/correlations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Dict, Iterator, List, Literal, Optional, Set, Union, Iterable
from typing import Any, Dict, Iterator, List, Literal, Optional, Set, Union

import sigma.exceptions as sigma_exceptions
from sigma.exceptions import SigmaRuleLocation, SigmaTimespanError
Expand Down Expand Up @@ -72,7 +72,7 @@ def from_dict(
ops = frozenset(SigmaCorrelationConditionOperator.operators())
if len(d_keys.intersection(ops)) != 1:
raise sigma_exceptions.SigmaCorrelationConditionError(
f"Sigma correlation condition must have exactly one condition item", source=source
"Sigma correlation condition must have exactly one condition item", source=source
)
unknown_keys = d_keys.difference(ops).difference({"field"})
if unknown_keys:
Expand Down Expand Up @@ -268,7 +268,7 @@ def from_dict(
else: # no correlation type provided
errors.append(
sigma_exceptions.SigmaCorrelationTypeError(
f"Sigma correlation rule without type", source=source
"Sigma correlation rule without type", source=source
)
)

Expand All @@ -282,13 +282,13 @@ def from_dict(
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Rule reference must be plain string or list.", source=source
"Rule reference must be plain string or list.", source=source
)
)
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation rule without rule references", source=source
"Sigma correlation rule without rule references", source=source
)
)

Expand All @@ -298,7 +298,7 @@ def from_dict(
if not isinstance(generate, bool):
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation generate definition must be a boolean", source=source
"Sigma correlation generate definition must be a boolean", source=source
)
)
else:
Expand All @@ -314,7 +314,7 @@ def from_dict(
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation group-by definition must be string or list",
"Sigma correlation group-by definition must be string or list",
source=source,
)
)
Expand All @@ -329,7 +329,7 @@ def from_dict(
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation rule without timespan", source=source
"Sigma correlation rule without timespan", source=source
)
)

Expand All @@ -341,7 +341,7 @@ def from_dict(
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation aliases definition must be a dict", source=source
"Sigma correlation aliases definition must be a dict", source=source
)
)
else:
Expand All @@ -355,7 +355,7 @@ def from_dict(
else:
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Sigma correlation condition definition must be a dict", source=source
"Sigma correlation condition definition must be a dict", source=source
)
)
elif correlation_type not in (
Expand All @@ -364,7 +364,7 @@ def from_dict(
):
errors.append(
sigma_exceptions.SigmaCorrelationRuleError(
f"Non-temporal Sigma correlation rule without condition", source=source
"Non-temporal Sigma correlation rule without condition", source=source
)
)
elif correlation_type in (
Expand Down
6 changes: 0 additions & 6 deletions sigma/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ def __str__(self):
return f"{self.error} in expression '{self.expression}' at location {self.location}"


class SigmaFeatureNotSupportedByBackendError(SigmaError):
"""Sigma feature is not supported by the backend."""

pass


class SigmaDescriptionError(SigmaError):
"""Error in Sigma rule description"""

Expand Down
1 change: 0 additions & 1 deletion sigma/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import string
from dataclasses import dataclass, field
from typing import List, Optional, Union
from uuid import UUID

from sigma import exceptions as sigma_exceptions
from sigma.correlations import SigmaCorrelationRule, SigmaRuleReference
Expand Down
1 change: 0 additions & 1 deletion sigma/pipelines/test/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from sigma.processing.transformations import (
AddConditionTransformation,
FieldMappingTransformation,
FieldFunctionTransformation,
)


Expand Down
2 changes: 1 addition & 1 deletion sigma/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def is_installed(self) -> bool:
try:
subprocess.check_call([sys.executable, "-m", "pip", "-qqq", "show", self.package])
return True
except:
except Exception:
return False

def has_capability(self, capability: SigmaPluginCapability) -> bool:
Expand Down
3 changes: 1 addition & 2 deletions sigma/processing/conditions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from sigma.exceptions import (
SigmaConfigurationError,
SigmaProcessingItemError,
SigmaRegularExpressionError,
)


Expand All @@ -29,7 +28,7 @@ def set_pipeline(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline")
if self._pipeline is None:
self._pipeline = pipeline
else:
raise SigmaProcessingItemError(f"Pipeline for condition was already set.")
raise SigmaProcessingItemError("Pipeline for condition was already set.")

def _clear_pipeline(self) -> None:
self._pipeline = None
Expand Down
1 change: 0 additions & 1 deletion sigma/processing/conditions/fields.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass, field

import sigma
from sigma.processing.conditions.base import FieldNameProcessingCondition
from typing import List, Pattern, Literal, Optional
import re
Expand Down
3 changes: 1 addition & 2 deletions sigma/processing/conditions/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import date
from uuid import UUID

import sigma
from sigma.correlations import SigmaCorrelationRule
from sigma.processing.conditions.base import (
RuleDetectionItemCondition,
Expand Down Expand Up @@ -91,7 +90,7 @@ def find_detection_item(self, detection: Union[SigmaDetectionItem, SigmaDetectio
detection.field is not None
and detection.field == self.field
and self.sigma_value
in [v for v in detection.value if type(self.sigma_value) == type(v)]
in [v for v in detection.value if isinstance(self.sigma_value, type(v))]
):
return True
else:
Expand Down
1 change: 0 additions & 1 deletion sigma/processing/conditions/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass, field

import sigma
from sigma.correlations import SigmaCorrelationRule
from sigma.processing.conditions.base import (
DetectionItemProcessingCondition,
Expand Down
1 change: 0 additions & 1 deletion sigma/processing/conditions/values.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dataclasses import dataclass
from typing import Union

import sigma
from sigma.processing.conditions.base import (
ValueProcessingCondition,
)
Expand Down
4 changes: 2 additions & 2 deletions sigma/processing/finalization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import abstractmethod
from dataclasses import dataclass, field
import json
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Dict, List, Optional

import yaml
import sigma
Expand Down Expand Up @@ -107,7 +107,7 @@ def __post_init__(self):

@classmethod
def from_dict(cls, d: Dict) -> "NestedFinalizer":
if not "finalizers" in d:
if "finalizers" not in d:
raise SigmaConfigurationError("Nested finalizer requires a 'finalizers' key.")
fs = []
for finalizer in d["finalizers"]:
Expand Down
34 changes: 2 additions & 32 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,6 @@ def _check_conditions(
f"{name} '{str(condition)}' is not a {expected_condition_class.__name__}"
)

def __post_init__(self):
self._check_conditions(
"rule_condition_expression",
"rule_condition_linking",
"rule_conditions",
RuleProcessingCondition,
"Rule condition",
)
self.transformation.set_processing_item(
self
) # set processing item in transformation object after it is instantiated
self._resolve_condition_expression(
self.rule_condition_expression, self.rule_conditions, "Rule condition"
)

def _resolve_condition_expression(
self,
expr: Optional[ConditionExpression],
Expand Down Expand Up @@ -352,21 +337,6 @@ def _check_conditions(
f"{name} '{str(condition)}' is not a {expected_condition_class.__name__}"
)

def __post_init__(self):
self._check_conditions(
"rule_condition_expression",
"rule_condition_linking",
"rule_conditions",
RuleProcessingCondition,
"Rule condition",
)
self.transformation.set_processing_item(
self
) # set processing item in transformation object after it is instantiated
self._resolve_condition_expression(
self.rule_condition_expression, self.rule_conditions, "Rule condition"
)

def _resolve_condition_expression(
self,
expr: Optional[ConditionExpression],
Expand Down Expand Up @@ -516,7 +486,7 @@ def set_pipeline(self, pipeline: "ProcessingPipeline") -> None:
if self._pipeline is None:
self._pipeline = pipeline
else:
raise SigmaProcessingItemError(f"Pipeline for processing item was already set.")
raise SigmaProcessingItemError("Pipeline for processing item was already set.")

self.transformation.set_pipeline(pipeline)
for rule_condition in self.rule_conditions:
Expand Down Expand Up @@ -905,7 +875,7 @@ def from_yaml(cls, processing_pipeline: str) -> "ProcessingPipeline":
try:
parsed_pipeline = yaml.safe_load(processing_pipeline)
except yaml.parser.ParserError as e:
raise SigmaPipelineParsingError(f"Error in parsing of a Sigma processing pipeline")
raise SigmaPipelineParsingError("Error in parsing of a Sigma processing pipeline")
return cls.from_dict(parsed_pipeline)

def apply(
Expand Down
5 changes: 1 addition & 4 deletions sigma/processing/transformations/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from abc import ABC, abstractmethod
from functools import partial
from sigma.conditions import SigmaCondition
from typing import (
Any,
Iterable,
List,
Dict,
Optional,
Union,
)
Expand Down Expand Up @@ -58,7 +55,7 @@ def set_pipeline(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline")
if self._pipeline is None:
self._pipeline = pipeline
else:
raise SigmaTransformationError(f"Pipeline for transformation was already set.")
raise SigmaTransformationError("Pipeline for transformation was already set.")

def _clear_pipeline(self) -> None:
self._pipeline = None
Expand Down
3 changes: 0 additions & 3 deletions sigma/processing/transformations/condition.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from abc import abstractmethod
from sigma.conditions import SigmaCondition
from typing import (
Any,
List,
Dict,
Optional,
Expand All @@ -10,7 +8,6 @@
from dataclasses import dataclass, field
import random
import string
import sigma
from sigma.processing.transformations.base import (
ConditionTransformation,
)
Expand Down
1 change: 0 additions & 1 deletion sigma/processing/transformations/failure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
import sigma
from sigma.processing.transformations.base import (
DetectionItemTransformation,
Transformation,
Expand Down
1 change: 0 additions & 1 deletion sigma/processing/transformations/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Callable,
)
from dataclasses import dataclass, field
import sigma
from sigma.processing.transformations.base import (
FieldMappingTransformationBase,
Transformation,
Expand Down
Loading

0 comments on commit 6d5a62c

Please sign in to comment.