|
| 1 | +-- This Source Code Form is subject to the terms of the Mozilla Public |
| 2 | +-- License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 | +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 4 | + |
| 5 | +--[[ |
| 6 | +# Mozilla Security Heavy Hitters, CMS + moving average |
| 7 | +
|
| 8 | +For events matching the message_matcher, this analysis plugin attempts to calculate typical |
| 9 | +request rates while identifying and flagging anomolous request patterns. |
| 10 | +
|
| 11 | +Request counts within a given window are stored in a Count-Min sketch data structure; based on |
| 12 | +a sample encountered during the window this data structure is used to consult request rates for |
| 13 | +given identifiers, where an identifier exceeds a threshold it is added to an analysis list for |
| 14 | +submission by the plugin. |
| 15 | +
|
| 16 | +## Sample Configuration |
| 17 | +```lua |
| 18 | +filename = "moz_security_hh_cms.lua" |
| 19 | +message_matcher = "Logger == 'input.nginx'" |
| 20 | +ticker_interval = 60 |
| 21 | +preserve_data = false -- This module cannot keep state at this time |
| 22 | +
|
| 23 | +id_field = "Fields[remote_addr]" -- field to use as the identifier |
| 24 | +-- id_field_capture = ",? *([^,]+)$", -- optional e.g. extract the last entry in a comma delimited list |
| 25 | +
|
| 26 | +sample_min_id = 5000 -- Minimum distinct identifers before a sample will be calculated |
| 27 | +sample_max_id = 10000 -- Maximum identifiers to use for sample calculation within a window |
| 28 | +sample_min_ev = 50000 -- Minimum number of events sampler must consume before calculation |
| 29 | +sample_window = 60 -- Sample window size |
| 30 | +sample_ticks = 2500 -- Recalculate sample every sample_ticks events |
| 31 | +threshold_cap = 10 -- Threshold will be calculated average + (calculated average * cap) |
| 32 | +-- cms_epsilon = 1 / 10000 -- optional CMS value for epsilon |
| 33 | +-- cms_delta = 0.0001 -- optional CMS value for delta |
| 34 | +``` |
| 35 | +--]] |
| 36 | + |
| 37 | +require "string" |
| 38 | +require "table" |
| 39 | + |
| 40 | +local ostime = require "os".time |
| 41 | + |
| 42 | +local sample_min_id = read_config("sample_min_id") or error("sample_min_id must be configured") |
| 43 | +local sample_max_id = read_config("sample_max_id") or error("sample_max_id must be configured") |
| 44 | +local sample_min_ev = read_config("sample_min_ev") or error("sample_min_ev must be configured") |
| 45 | +local sample_window = read_config("sample_window") or error("sample_window must be configured") |
| 46 | +local sample_ticks = read_config("sample_ticks") or error ("sample_ticks must be configured") |
| 47 | +local threshold_cap = read_config("threshold_cap") or error("threshold_cap must be configured") |
| 48 | +local id_field = read_config("id_field") or error("id_field must be configured") |
| 49 | +local id_fieldc = read_config("id_field_capture") |
| 50 | +local cms_epsilon = read_config("cms_epsilon") or 1 / 10000 |
| 51 | +local cms_delta = read_config("cms_delta") or 0.0001 |
| 52 | + |
| 53 | +local cms = require "streaming_algorithms.cm_sketch".new(cms_epsilon, cms_delta) |
| 54 | + |
| 55 | +local alist = {} |
| 56 | + |
| 57 | +function alist:reset() |
| 58 | + self.l = {} |
| 59 | +end |
| 60 | + |
| 61 | +function alist:add(i, c) |
| 62 | + if not i or not c then |
| 63 | + error("analysis list received nil argument") |
| 64 | + end |
| 65 | + self.l[i] = c |
| 66 | +end |
| 67 | + |
| 68 | +function alist:flush(t) |
| 69 | + for k,v in pairs(self.l) do |
| 70 | + if v < t then self.l[k] = nil end |
| 71 | + end |
| 72 | +end |
| 73 | + |
| 74 | +local sampler = {} |
| 75 | + |
| 76 | +function sampler:reset() |
| 77 | + self.s = {} |
| 78 | + self.n = 0 |
| 79 | + self.start_time = ostime() |
| 80 | + self.threshold = 0 |
| 81 | + self.validtick = 0 |
| 82 | + self.evcount = 0 |
| 83 | +end |
| 84 | + |
| 85 | +function sampler:calc() |
| 86 | + if self.n < sample_min_id or self.evcount < sample_min_ev then |
| 87 | + return |
| 88 | + end |
| 89 | + if self.validtick < sample_ticks then |
| 90 | + return |
| 91 | + end |
| 92 | + self.validtick = 0 |
| 93 | + local cnt = 0 |
| 94 | + local t = 0 |
| 95 | + for k,v in pairs(self.s) do |
| 96 | + t = t + cms:point_query(k) |
| 97 | + cnt = cnt + 1 |
| 98 | + end |
| 99 | + self.threshold = t / cnt |
| 100 | + self.threshold = self.threshold + (self.threshold * threshold_cap) |
| 101 | + -- Remove any elements in the analysis list that no longer conform |
| 102 | + -- to the set threshold |
| 103 | + alist:flush(self.threshold) |
| 104 | +end |
| 105 | + |
| 106 | +function sampler:add(x) |
| 107 | + if self.start_time + sample_window < ostime() then |
| 108 | + self:reset() |
| 109 | + alist:reset() |
| 110 | + cms:clear() |
| 111 | + end |
| 112 | + self.evcount = self.evcount + 1 |
| 113 | + self.validtick = self.validtick + 1 |
| 114 | + if self.n >= sample_max_id then |
| 115 | + return |
| 116 | + end |
| 117 | + -- If x is already present in the sample, don't add it again |
| 118 | + if self.s[x] then return end |
| 119 | + self.s[x] = 1 |
| 120 | + self.n = self.n + 1 |
| 121 | +end |
| 122 | + |
| 123 | +sampler:reset() |
| 124 | +alist:reset() |
| 125 | + |
| 126 | +function process_message() |
| 127 | + local id = read_message(id_field) |
| 128 | + if not id then return -1, "no id_field" end |
| 129 | + if id_fieldc then |
| 130 | + id = string.match(id, id_fieldc) |
| 131 | + if not id then return 0 end -- no error as the capture may intentionally reject entries |
| 132 | + end |
| 133 | + |
| 134 | + sampler:add(id) |
| 135 | + sampler:calc() |
| 136 | + local q = cms:update(id) |
| 137 | + if sampler.threshold ~= 0 and q > sampler.threshold then |
| 138 | + alist:add(id, q) |
| 139 | + end |
| 140 | + return 0 |
| 141 | +end |
| 142 | + |
| 143 | +function timer_event(ns) |
| 144 | + -- For now, just generate a tsv here but this could be modified to submit violations |
| 145 | + -- with a configured confidence to Tigerblood |
| 146 | + add_to_payload("sampler_threshold", "\t", sampler.threshold, "\n") |
| 147 | + add_to_payload("sampler_size", "\t", sampler.n, "\n") |
| 148 | + add_to_payload("sampler_evcount", "\t", sampler.evcount, "\n") |
| 149 | + for k,v in pairs(alist.l) do |
| 150 | + add_to_payload(k, "\t", v, "\n") |
| 151 | + end |
| 152 | + inject_payload("tsv", "statistics") |
| 153 | +end |
0 commit comments