11from collections import defaultdict
22import re
3- from typing import ClassVar , Dict , Optional , Pattern , Tuple
3+ from typing import Any , ClassVar , Dict , List , Optional , Pattern , Tuple , cast
44
55from sigma .conversion .base import TextQueryBackend
66from sigma .conversion .state import ConversionState
77from sigma .pipelines .test import dummy_test_pipeline
88from sigma .processing .pipeline import ProcessingItem , ProcessingPipeline
99from sigma .processing .transformations import FieldMappingTransformation
10+ from sigma .rule .rule import SigmaRule
1011from sigma .types import CompareOperators , SigmaCompareExpression
1112
1213
1314class TextQueryTestBackend (TextQueryBackend ):
14- name : str = "Test backend"
15- formats : Dict [str , str ] = {
15+ name : ClassVar [ str ] = "Test backend"
16+ formats : ClassVar [ Dict [str , str ] ] = {
1617 "default" : "Default format" ,
1718 "test" : "Dummy test format" ,
1819 "state" : "Test format that obtains information from state" ,
@@ -29,15 +30,15 @@ class TextQueryTestBackend(TextQueryBackend):
2930 eq_token : ClassVar [str ] = "="
3031
3132 field_quote : ClassVar [str ] = "'"
32- field_quote_pattern : ClassVar [Pattern ] = re .compile ("^\\ w+$" )
33+ field_quote_pattern : ClassVar [Pattern [ str ] ] = re .compile ("^\\ w+$" )
3334
3435 str_quote : ClassVar [str ] = '"'
3536 escape_char : ClassVar [str ] = "\\ "
3637 wildcard_multi : ClassVar [str ] = "*"
3738 wildcard_single : ClassVar [str ] = "?"
3839 add_escaped : ClassVar [str ] = ":"
3940 filter_chars : ClassVar [str ] = "&"
40- bool_values : ClassVar [Dict [bool , str ]] = {
41+ bool_values : ClassVar [Dict [bool , Optional [ str ] ]] = {
4142 True : "1" ,
4243 False : "0" ,
4344 }
@@ -52,7 +53,7 @@ class TextQueryTestBackend(TextQueryBackend):
5253
5354 re_expression : ClassVar [str ] = "{field}=/{regex}/"
5455 re_escape_char : ClassVar [str ] = "\\ "
55- re_escape : ClassVar [Tuple [str ]] = ( "/" , "bar" )
56+ re_escape : ClassVar [List [str ]] = [ "/" , "bar" ]
5657
5758 case_sensitive_match_expression = "{field} casematch {value}"
5859 case_sensitive_startswith_expression : ClassVar [str ] = "{field} startswith_cased {value}"
@@ -115,8 +116,12 @@ class TextQueryTestBackend(TextQueryBackend):
115116 "test" : "Test correlation method" ,
116117 }
117118 default_correlation_method : ClassVar [str ] = "test"
118- default_correlation_query : ClassVar [str ] = {"test" : "{search}\n {aggregate}\n {condition}" }
119- temporal_correlation_query : ClassVar [str ] = {"test" : "{search}\n \n {aggregate}\n \n {condition}" }
119+ default_correlation_query : ClassVar [Dict [str , str ]] = {
120+ "test" : "{search}\n {aggregate}\n {condition}"
121+ }
122+ temporal_correlation_query : ClassVar [Dict [str , str ]] = {
123+ "test" : "{search}\n \n {aggregate}\n \n {condition}"
124+ }
120125
121126 correlation_search_single_rule_expression : ClassVar [str ] = "{query}"
122127 correlation_search_multi_rule_expression : ClassVar [str ] = "{queries}"
@@ -169,33 +174,39 @@ def __init__(
169174 processing_pipeline : Optional [ProcessingPipeline ] = None ,
170175 collect_errors : bool = False ,
171176 testparam : Optional [str ] = None ,
172- ** kwargs ,
177+ ** kwargs : Dict [ str , Any ] ,
173178 ):
174179 super ().__init__ (processing_pipeline , collect_errors , ** kwargs )
175180 self .testparam = testparam
176181
177- def finalize_query_test (self , rule , query , index , state ):
178- return "[ " + self .finalize_query_default (rule , query , index , state ) + " ]"
182+ def finalize_query_test (
183+ self , rule : SigmaRule , query : str , index : int , state : ConversionState
184+ ) -> str :
185+ return "[ " + cast (str , self .finalize_query_default (rule , query , index , state )) + " ]"
179186
180- def finalize_output_test (self , queries ) :
181- return self .finalize_output_default (queries )
187+ def finalize_output_test (self , queries : List [ str ]) -> str :
188+ return cast ( str , self .finalize_output_default (queries ) )
182189
183- def finalize_query_state (self , rule , query , index , state : ConversionState ):
190+ def finalize_query_state (
191+ self , rule : SigmaRule , query : str , index : int , state : ConversionState
192+ ) -> str :
184193 return (
185194 "index="
186- + state .processing_state .get ("index" , "default" )
195+ + cast ( str , state .processing_state .get ("index" , "default" ) )
187196 + " ("
188- + self .finalize_query_default (rule , query , index , state )
197+ + cast ( str , self .finalize_query_default (rule , query , index , state ) )
189198 + ")"
190199 )
191200
192- def finalize_output_state (self , queries ) :
193- return self .finalize_output_default (queries )
201+ def finalize_output_state (self , queries : List [ str ]) -> str :
202+ return cast ( str , self .finalize_output_default (queries ) )
194203
195- def finalize_query_list_of_dict (self , rule , query , index , state ):
196- return self .finalize_query_default (rule , query , index , state )
204+ def finalize_query_list_of_dict (
205+ self , rule : SigmaRule , query : str , index : int , state : ConversionState
206+ ) -> str :
207+ return cast (str , self .finalize_query_default (rule , query , index , state ))
197208
198- def finalize_output_list_of_dict (self , queries ) :
209+ def finalize_output_list_of_dict (self , queries : List [ str ]) -> List [ Dict [ str , Optional [ str ]]] :
199210 return [
200211 (
201212 {"query" : query , "test" : self .testparam }
@@ -205,18 +216,22 @@ def finalize_output_list_of_dict(self, queries):
205216 for query in self .finalize_output_default (queries )
206217 ]
207218
208- def finalize_query_bytes (self , rule , query , index , state ):
209- return self .finalize_query_default (rule , query , index , state )
219+ def finalize_query_bytes (
220+ self , rule : SigmaRule , query : str , index : int , state : ConversionState
221+ ) -> str :
222+ return cast (str , self .finalize_query_default (rule , query , index , state ))
210223
211- def finalize_output_bytes (self , queries ) :
224+ def finalize_output_bytes (self , queries : List [ str ]) -> bytes :
212225 return bytes ("\x00 " .join (self .finalize_output_default (queries )), "utf-8" )
213226
214- def finalize_query_str (self , rule , query , index , state ):
215- return self .finalize_query_default (rule , query , index , state )
227+ def finalize_query_str (
228+ self , rule : SigmaRule , query : str , index : int , state : ConversionState
229+ ) -> str :
230+ return cast (str , self .finalize_query_default (rule , query , index , state ))
216231
217- def finalize_output_str (self , queries ) :
232+ def finalize_output_str (self , queries : List [ str ]) -> str :
218233 return "\n " .join (self .finalize_output_default (queries ))
219234
220235
221236class MandatoryPipelineTestBackend (TextQueryTestBackend ):
222- requires_pipeline : bool = True
237+ requires_pipeline : ClassVar [ bool ] = True
0 commit comments