diff --git a/docs/query-guide/pipes.rst b/docs/query-guide/pipes.rst index 1cfad75..17fcaff 100644 --- a/docs/query-guide/pipes.rst +++ b/docs/query-guide/pipes.rst @@ -126,3 +126,30 @@ Get the top five network connections that transmitted the most data | sort total_out_bytes | tail 5 +``window`` +--------- +The ``window`` pipe will buffer events based on the timespan specify, which allows other pipes to function on a sliding +window. This allows pipes to function when streaming data continuously. + +Find suspicious recon commands that were executed within a 5 minute window + .. code-block:: eql + + process where process_name in ("whoami.exe", "netstat.exe", "hostname.exe", "net.exe", "sc.exe", "systeminfo.exe") + | window 5m + | unique process_name + | unique_count + | filter count >= 3 + +Find processes that have network connections to a single host with over 100 unique ports within a 10 second window + .. code-block:: eql + + network where wildcard(destination_address, "10.*", "172.*", "192.*") + | window 10s + | unique_count process_name, destination_port + | filter count >= 100 + +.. note:: + + The window pipe will emit all events within the window buffer from the first event, meaning events will appear like + so: [[1], [1,2], [1,2,3], ...]. Therefore, it is recommended to use a combination of ``unique_count`` and + ``filter`` to only show events over a certain threshold. diff --git a/eql/engine.py b/eql/engine.py index 9268ee1..3aff2db 100644 --- a/eql/engine.py +++ b/eql/engine.py @@ -463,20 +463,32 @@ def _convert_count_pipe(self, node, next_pipe): # type: (CountPipe, callable) - if len(node.arguments) == 0: # Counting only the total summary = {'key': 'totals', 'count': 0} - hosts = set() + + # mutable scoped variable + hosts = [set()] def count_total_callback(events): if events is PIPE_EOF: - if len(hosts): - summary['total_hosts'] = len(hosts) - summary['hosts'] = list(sorted(hosts)) + # immutable version of summary + event = summary.copy() + + if len(hosts[0]): + event['total_hosts'] = len(hosts[0]) + event['hosts'] = list(sorted(hosts[0])) - next_pipe([Event(EVENT_TYPE_GENERIC, 0, summary)]) + next_pipe([Event(EVENT_TYPE_GENERIC, 0, event)]) next_pipe(PIPE_EOF) + + # reset state + summary['count'] = 0 + if len(hosts[0]): + del summary['hosts'] + del summary['total_hosts'] + hosts[0] = set() else: summary['count'] += 1 if host_key in events[0].data: - hosts.add(events[0].data[host_key]) + hosts[0].add(events[0].data[host_key]) return count_total_callback @@ -506,6 +518,9 @@ def count_tuple_callback(events): # type: (list[Event]) -> None details['percent'] = float(details['count']) / total next_pipe([Event(EVENT_TYPE_GENERIC, 0, details)]) next_pipe(PIPE_EOF) + + # reset state + count_table.clear() else: key = get_key(events) insensitive_key = remove_case(key) @@ -529,18 +544,20 @@ def filter_callback(events): # type: (list[Event]) -> None return filter_callback def _convert_head_pipe(self, node, next_pipe): # type: (HeadPipe, callable) -> callable - totals = [0] # has to be mutable because of python scoping + output_buffer = [] max_count = node.count def head_callback(events): - if totals[0] < max_count: - if events is PIPE_EOF: - next_pipe(PIPE_EOF) - else: - totals[0] += 1 - next_pipe(events) - if totals[0] == max_count: - next_pipe(PIPE_EOF) + if events is PIPE_EOF: + for output in output_buffer: + next_pipe(output) + next_pipe(PIPE_EOF) + + # reset state + output_buffer.clear() + else: + if len(output_buffer) < max_count: + output_buffer.append(events) return head_callback @@ -552,6 +569,9 @@ def tail_callback(events): for output in output_buffer: next_pipe(output) next_pipe(PIPE_EOF) + + # reset state + output_buffer.clear() else: output_buffer.append(events) @@ -572,6 +592,9 @@ def get_converted_key(buffer_events): for output in output_buffer: next_pipe(output) next_pipe(PIPE_EOF) + + # reset state + output_buffer.clear() else: output_buffer.append(events) @@ -584,6 +607,9 @@ def _convert_unique_pipe(self, node, next_pipe): # type: (UniquePipe, callable) def unique_callback(events): if events is PIPE_EOF: next_pipe(PIPE_EOF) + + # reset state + seen.clear() else: key = get_unique_key(events) if key not in seen: @@ -592,6 +618,32 @@ def unique_callback(events): return unique_callback + def _convert_window_pipe(self, node, next_pipe): # type: (WindowPipe) -> callable + """Aggregate events over a sliding window using a buffer.""" + window_buf = deque() # tuple of (timestamp, events) + timespan = self.convert(node.timespan) + + def time_window_callback(events): # type: (list[Event]) -> None + if events is PIPE_EOF: + next_pipe(PIPE_EOF) + + # reset state + window_buf.clear() + else: + minimum_start = events[0].time - timespan + + # Remove any events that no longer sit within the time window + while len(window_buf) > 0 and window_buf[0][0] < minimum_start: + window_buf.popleft() + + window_buf.append((events[0].time, events)) + + for result in window_buf: + next_pipe(result[1]) + next_pipe(PIPE_EOF) + + return time_window_callback + def _convert_unique_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable """Aggregate counts coming into the pipe.""" host_key = self.host_key @@ -613,6 +665,8 @@ def count_unique_callback(events): # type: (list[Event]) -> None next_pipe(result) next_pipe(PIPE_EOF) + # reset state + results.clear() else: # Create a copy of these, because they can be modified events = [events[0].copy()] + events[1:] @@ -648,12 +702,19 @@ def _reduce_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable def count_total_aggregates(events): # type: (list[Event]) -> None if events is PIPE_EOF: hosts = result.pop('hosts') # type: set + + # immutable version of result + event = result.copy() if len(hosts) > 0: - result['hosts'] = list(sorted(hosts)) - result['total_hosts'] = len(hosts) + event['hosts'] = list(sorted(hosts)) + event['total_hosts'] = len(hosts) - next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)]) + next_pipe([Event(EVENT_TYPE_GENERIC, 0, event)]) next_pipe(PIPE_EOF) + + # reset state + result['count'] = 0 + result['hosts'] = set() else: piece = events[0].data result['count'] += piece['count'] @@ -684,6 +745,9 @@ def count_tuple_callback(events): # type: (list[Event]) -> None result['percent'] = float(result['count']) / total next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)]) next_pipe(PIPE_EOF) + + # reset state + results.clear() else: piece = events[0].data key = events[0].data['key'] diff --git a/eql/etc/eql.g b/eql/etc/eql.g index af62bba..a868088 100644 --- a/eql/etc/eql.g +++ b/eql/etc/eql.g @@ -15,7 +15,7 @@ sequence: "sequence" [join_values with_params? | with_params join_values?] subqu join: "join" join_values? subquery_by subquery_by+ until_subquery_by? until_subquery_by.2: "until" subquery_by pipes: pipe+ -pipe: "|" name [single_atom single_atom+ | expressions] +pipe: "|" name [single_atom single_atom+ | time_range | expressions] join_values.2: "by" expressions ?with_params.2: "with" named_params diff --git a/eql/etc/test_queries.toml b/eql/etc/test_queries.toml index e2ee95c..d6d60ea 100644 --- a/eql/etc/test_queries.toml +++ b/eql/etc/test_queries.toml @@ -375,7 +375,6 @@ sequence ''' expected_event_ids = [1, 2, 2, 3] - [[queries]] query = ''' sequence @@ -1296,3 +1295,25 @@ expected_event_ids = [] query = ''' process where length(between(process_name, 'g', 'z')) > 0 ''' + +[[queries]] +expected_event_ids = [11, 50] +description = "test window pipe" +query = ''' +process where subtype == "create" | +window 5m | +unique parent_process_name, process_name | +unique_count parent_process_name | +filter count == 5 +''' + +[[queries]] +expected_event_ids = [55] +description = "test window pipe with descendant" +query = ''' +file where event_subtype_full == "file_create_event" + and descendant of [process where process_name == "cmd.exe"] | + window 5m | + unique_count process_name | + filter count == 5 +''' diff --git a/eql/pipes.py b/eql/pipes.py index 745e180..26b8cc6 100644 --- a/eql/pipes.py +++ b/eql/pipes.py @@ -1,5 +1,5 @@ """EQL Pipes.""" -from .ast import PipeCommand +from .ast import PipeCommand, TimeRange from .schema import Schema, EVENT_TYPE_GENERIC, MIXED_TYPES from .types import dynamic, NUMBER, literal, PRIMITIVES, EXPRESSION, get_type, BASE_STRING from .utils import is_string @@ -14,6 +14,7 @@ "CountPipe", "FilterPipe", "UniqueCountPipe", + "WindowPipe" ) @@ -154,3 +155,27 @@ class FilterPipe(PipeCommand): def expression(self): """Get the filter expression.""" return self.arguments[0] + + +@PipeCommand.register('window') +class WindowPipe(PipeCommand): + """Maintains a time window buffer for streaming events.""" + + argument_types = [literal(NUMBER)] + + minimum_args = 1 + maximum_args = 1 + + @property + def timespan(self): + """Get timespan as a TimeRange object.""" + return TimeRange.convert(self.arguments[0]) + + @classmethod + def validate(cls, arguments, type_hints=None): + """After performing type checks, validate that the timespan is greater than zero.""" + index, arguments, type_hints = super(WindowPipe, cls).validate(arguments, type_hints) + ts = cls(arguments).timespan + if index is None and (ts is None or ts.delta.total_seconds() <= 0): + index = 0 + return index, arguments, type_hints diff --git a/setup.py b/setup.py index 487d919..ab968c9 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ class Test(TestCommand): def initialize_options(self): """Need to ensure pytest_args exists.""" TestCommand.initialize_options(self) - self.pytest_args = [] + self.pytest_args = ["--disable-warnings"] def run_tests(self): """Run pytest.""" diff --git a/tests/test_parser.py b/tests/test_parser.py index b88823a..d622e92 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -206,6 +206,7 @@ def test_valid_queries(self): 'any where true | unique a b c | sort a b c | count', 'any where true | unique a, b, c | sort a b c | count', 'any where true | unique a, b, c | sort a,b,c | count', + 'any where true | window 5s | unique a, b | unique_count a | filter count > 5', 'file where child of [registry where true]', 'file where event of [registry where true]', 'file where event of [registry where true]', @@ -282,9 +283,12 @@ def test_invalid_queries(self): 'process where process_name == "abc.exe" | head abc', 'process where process_name == "abc.exe" | head abc()', 'process where process_name == "abc.exe" | head abc(def, ghi)', + 'process where process_name == "abc.exe" | window abc', + 'process where process_name == "abc.exe" | window 10g', 'sequence [process where pid == pid]', 'sequence [process where pid == pid] []', 'sequence with maxspan=false [process where true] [process where true]', + 'sequence with maxspan=10g [process where true] [process where true]', 'sequence with badparam=100 [process where true] [process where true]', # check that the same number of BYs are in every subquery 'sequence [file where true] [process where true] by field1', diff --git a/tests/test_python_engine.py b/tests/test_python_engine.py index d9eb490..52a1c7a 100644 --- a/tests/test_python_engine.py +++ b/tests/test_python_engine.py @@ -494,3 +494,42 @@ def test_relationship_pid_collision(self): output = self.get_output(queries=[parse_query(query)], config=config, events=events) event_ids = [event.data['unique_pid'] for event in output] self.validate_results(event_ids, ['host1-1003'], "Relationships failed due to pid collision") + + def test_pipes_reset_state(self): + """Test that the pipes are clearing their state after receiving PIPE_EOF""" + events = self.get_events() + + queries = [ + 'process where true | unique opcode', + 'process where true | unique_count opcode', + 'process where true | count', + 'process where true | count opcode', + 'process where true | head 1', + 'process where true | tail', + 'process where true | sort opcode', + 'process where true | window 10s', + 'process where true | window 5m | head 1', + ] + + for query in queries: + engine = PythonEngine() + + results = [] # type: list[Event] + engine.add_output_hook(results.append) + engine.add_queries([parse_query(query)]) + + engine.stream_events(events) + engine.finalize() + expected_len = len(results) + + results.clear() + + engine.stream_events(events) + engine.finalize() + actual_len = len(results) + + self.assertEquals( + expected_len, + actual_len, + f"Expected results to be same when streaming events multiple times {query}" + )