Skip to content

Commit 08d3d7a

Browse files
author
Aaron Meihm
committed
heavy hitters analysis, moving avg + CMS
Add a heavy hitters analysis plugin that identifies heavy hitters by comparing frequency per identifier in a sample window against a calculated average for the window. The frequency counts are stored in a count-min sketch data structure.
1 parent 46aeafc commit 08d3d7a

File tree

12 files changed

+598
-0
lines changed

12 files changed

+598
-0
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ aws
6868
bloom_filter
6969
circular_buffer
7070
cjson
71+
cms
7172
compat
7273
cuckoo_filter
7374
elasticsearch
@@ -95,6 +96,7 @@ ssl
9596
struct
9697
syslog
9798
systemd
99+
xxhash
98100
zlib
99101
)
100102

cms/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
cmake_minimum_required(VERSION 3.0)
6+
project(cms VERSION 0.0.1 LANGUAGES C)
7+
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Count-Min Sketch implementation")
8+
include(sandbox_module)

cms/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Lua Count-Min Sketch implementation
2+
3+
* http://theory.stanford.edu/~tim/s15/l/l2.pdf
4+
* https://github.com/mikalsande/lua-count-min

cms/modules/cms/cms.lua

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
--[[
2+
# Count-min Sketch implementation in Lua
3+
4+
Provides a probabilistic data structure to inform frequency calculation in an
5+
event stream.
6+
7+
## Functions
8+
9+
### new
10+
11+
Create and return a new CMS data structure. Typical operations on this value will
12+
involve calling the add and check functions.
13+
14+
*Arguments*
15+
- epsilon (number) - CMS epsilon parameter
16+
- delta (number) - CMS delta parameter
17+
18+
*Return*
19+
- Returns initialized CMS structure
20+
--]]
21+
22+
local math = require("math")
23+
local xxhash = require("xxhash")
24+
local hash = xxhash.xxh32
25+
local ostime = require("os").time
26+
27+
local type = type
28+
local assert = assert
29+
local setmetatable = setmetatable
30+
31+
local M = {}
32+
setfenv(1, M)
33+
34+
math.randomseed(ostime())
35+
36+
function new(epsilon, delta)
37+
assert(type(epsilon) == "number", "epsilon must be a number")
38+
assert(epsilon > 0, "epsilon must be bigger than zero")
39+
assert(type(delta) == "number", "delta must be a number")
40+
assert(delta> 0, "delta must be bigger than zero")
41+
42+
-- Count Min Sketch variables
43+
local w = math.ceil(2.718281828459045 / epsilon)
44+
local d = math.ceil(math.log(1 / delta))
45+
local l = d * w
46+
47+
-- main datastructure
48+
local array = {}
49+
50+
-- hash variables
51+
local seed1
52+
local seed2
53+
54+
-- counter
55+
local addedItems
56+
local maxval = 2^53 - 1
57+
58+
-- temporary index table
59+
local temp_index = {}
60+
for x = 1, d do
61+
temp_index[x] = 0
62+
end
63+
64+
local query = function(input, update)
65+
assert(type(input) == "string", 'input must be a string')
66+
assert(type(update) == "boolean", 'update must be a boolean')
67+
68+
-- make two different hash values from the input with different seeds
69+
local h1 = hash(input, seed1)
70+
local h2 = hash(input, seed2)
71+
72+
-- track whether or not we have added a new value, 0 if added 1 if not added
73+
local existed = 1
74+
75+
-- create hashes and compute array index and bit index for them to get
76+
-- the individual bits
77+
local min = maxval
78+
for i = 1, d do
79+
-- set offset, we are using one big array instead of many smaller ones
80+
local offset = (i - 1) * w
81+
82+
-- use enhanced double hashing (Kirsh & Mitzenmacher) to create the hash
83+
-- value used for the array index. +1 to account for Lua arrays
84+
local x = ((h1 + (i * h2) + i^2) % w) + 1
85+
temp_index[i] = x
86+
local num = array[x + offset]
87+
88+
-- if the bit is zero we know it did not exist already just return if we are
89+
-- not updating it
90+
if num == 0 then
91+
existed = 0
92+
if not update then
93+
return 0
94+
end
95+
end
96+
97+
-- update the smallest seen value
98+
if num < min then
99+
min = num
100+
end
101+
end
102+
103+
-- update the item counter if we added a new item
104+
if update then
105+
-- conservative update, only update values that smaller than min + 1
106+
for i = 1, d do
107+
local offset = (i - 1) * w
108+
local index = temp_index[i] + offset
109+
if array[index] < (min + 1) and min < maxval then
110+
array[index] = array[index] + 1
111+
end
112+
end
113+
114+
-- update min in order to return the updated count
115+
min = min + 1
116+
117+
-- update the items counter if we addded a new item
118+
if existed == 0 then
119+
addedItems = addedItems + 1
120+
end
121+
end
122+
123+
return min
124+
end
125+
126+
--[[
127+
Adds a new entry to the set. returns 1 if the element
128+
already existed and 0 if it does not exist in the set.
129+
]]
130+
local add = function(input)
131+
return query(input, true)
132+
end
133+
134+
--[[
135+
checks whether or not an element is in the set.
136+
returns 1 if the element exists and 0 if it does
137+
not exist in the set.
138+
]]
139+
local check = function(input)
140+
return query(input, false)
141+
end
142+
143+
--[[ resets the arrays used in the filter ]]
144+
local reset = function()
145+
addedItems = 0
146+
147+
-- since 2*rand and 3*rand is done modulo a prime these number will always
148+
-- be different and should therefore safe to use as seeds to generate two
149+
-- different hashes for the same input.
150+
local rand = math.random(-2147483648, 2147483647)
151+
seed1 = (2*rand) % 2147483647
152+
seed2 = (3*rand) % 2147483647
153+
154+
for x = 1, l do
155+
array[x] = 0
156+
end
157+
end
158+
159+
-- [[ get number of unique added items ]]
160+
local getNumItems = function()
161+
return addedItems
162+
end
163+
164+
-- [[ get number of bits this instance uses ]]
165+
local getDepth = function()
166+
return d
167+
end
168+
169+
-- [[ get number of keys this instance uses ]]
170+
local getWidth = function()
171+
return w
172+
end
173+
174+
-- [[ get number of keys this instance uses ]]
175+
local getAccumulatedError = function()
176+
return addedItems * epsilon
177+
end
178+
179+
-- reset the array to initialize this instance
180+
reset()
181+
182+
-- methods we expose for this instance
183+
local ret = {
184+
add = add,
185+
check = check,
186+
reset = reset,
187+
getNumItems = getNumItems,
188+
getDepth = getDepth,
189+
getWidth = getWidth,
190+
getAccumulatedError = getAccumulatedError
191+
}
192+
setmetatable(ret, {})
193+
return ret
194+
end
195+
196+
return M
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
-- This Source Code Form is subject to the terms of the Mozilla Public
2+
-- License, v. 2.0. If a copy of the MPL was not distributed with this
3+
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
--[[
6+
# Mozilla Security Heavy Hitters, CMS + moving average
7+
8+
For events matching the message_matcher, this analysis plugin attempts to calculate typical
9+
request rates while identifying and flagging anomolous request patterns.
10+
11+
Request counts within a given window are stored in a Count-Min sketch data structure; based on
12+
a sample encountered during the window this data structure is used to consult request rates for
13+
given identifiers, where an identifier exceeds a threshold it is added to an analysis list for
14+
submission by the plugin.
15+
16+
## Sample Configuration
17+
```lua
18+
filename = "moz_security_hh_cms.lua"
19+
message_matcher = "Logger == 'input.nginx'"
20+
ticker_interval = 60
21+
preserve_data = false -- This module cannot keep state at this time
22+
23+
id_field = "Fields[remote_addr]" -- field to use as the identifier
24+
-- id_field_capture = ",? *([^,]+)$", -- optional e.g. extract the last entry in a comma delimited list
25+
26+
sample_min_id = 5000 -- Minimum distinct identifers before a sample will be calculated
27+
sample_max_id = 10000 -- Maximum identifiers to use for sample calculation within a window
28+
sample_min_ev = 50000 -- Minimum number of events sampler must consume before calculation
29+
sample_window = 60 -- Sample window size
30+
sample_ticks = 2500 -- Recalculate sample every sample_ticks events
31+
threshold_cap = 10 -- Threshold will be calculated average + (calculated average * cap)
32+
-- cms_epsilon = 1 / 10000 -- optional CMS value for epsilon
33+
-- cms_delta = 0.0001 -- optional CMS value for delta
34+
```
35+
--]]
36+
37+
require "string"
38+
require "table"
39+
40+
local c = require "cms.cms"
41+
local ostime = require "os".time
42+
43+
local sample_min_id = read_config("sample_min_id") or error("sample_min_id must be configured")
44+
local sample_max_id = read_config("sample_max_id") or error("sample_max_id must be configured")
45+
local sample_min_ev = read_config("sample_min_ev") or error("sample_min_ev must be configured")
46+
local sample_window = read_config("sample_window") or error("sample_window must be configured")
47+
local sample_ticks = read_config("sample_ticks") or error ("sample_ticks must be configured")
48+
local threshold_cap = read_config("threshold_cap") or error("threshold_cap must be configured")
49+
local id_field = read_config("id_field") or error("id_field must be configured")
50+
local id_fieldc = read_config("id_field_capture")
51+
local cms_epsilon = read_config("cms_epsilon") or 1 / 10000
52+
local cms_delta = read_config("cms_delta") or 0.0001
53+
54+
local cms = c.new(cms_epsilon, cms_delta)
55+
56+
local alist = {}
57+
58+
function alist:reset()
59+
self.l = {}
60+
end
61+
62+
function alist:add(i, c)
63+
if not i or not c then
64+
error("analysis list received nil argument")
65+
end
66+
self.l[i] = c
67+
end
68+
69+
function alist:flush(t)
70+
for k,v in pairs(self.l) do
71+
if v < t then self.l[k] = nil end
72+
end
73+
end
74+
75+
local sampler = {}
76+
77+
function sampler:reset()
78+
self.s = {}
79+
self.n = 0
80+
self.start_time = ostime()
81+
self.threshold = 0
82+
self.validtick = 0
83+
self.evcount = 0
84+
end
85+
86+
function sampler:calc()
87+
if not cms or self.n < sample_min_id or self.evcount < sample_min_ev then
88+
return
89+
end
90+
if self.validtick < sample_ticks then
91+
return
92+
end
93+
self.validtick = 0
94+
local cnt = 0
95+
local t = 0
96+
for k,v in pairs(self.s) do
97+
t = t + cms.check(k)
98+
cnt = cnt + 1
99+
end
100+
self.threshold = t / cnt
101+
self.threshold = self.threshold + (self.threshold * threshold_cap)
102+
-- Remove any elements in the analysis list that no longer conform
103+
-- to the set threshold
104+
alist:flush(self.threshold)
105+
end
106+
107+
function sampler:add(x)
108+
if self.start_time + sample_window < ostime() then
109+
self:reset()
110+
alist:reset()
111+
cms = c.new(cms_epsilon, cms_delta)
112+
end
113+
self.evcount = self.evcount + 1
114+
self.validtick = self.validtick + 1
115+
if self.n >= sample_max_id then
116+
return
117+
end
118+
-- If x is already present in the sample, don't add it again
119+
if self.s[x] then return end
120+
self.s[x] = 1
121+
self.n = self.n + 1
122+
end
123+
124+
sampler:reset()
125+
alist:reset()
126+
127+
function process_message()
128+
local id = read_message(id_field)
129+
if not id then return -1, "no id_field" end
130+
if id_fieldc then
131+
id = string.match(id, id_fieldc)
132+
if not id then return 1 end -- no error as the capture may intentionally reject entries
133+
end
134+
135+
sampler:add(id)
136+
sampler:calc()
137+
local q = cms.add(id)
138+
if sampler.threshold ~= 0 and q > sampler.threshold then
139+
alist:add(id, q)
140+
end
141+
return 0
142+
end
143+
144+
function timer_event(ns)
145+
-- For now, just generate a tsv here but this could be modified to submit violations
146+
-- with a configured confidence to Tigerblood
147+
add_to_payload("sampler_threshold", "\t", sampler.threshold, "\n")
148+
add_to_payload("sampler_size", "\t", sampler.n, "\n")
149+
add_to_payload("sampler_evcount", "\t", sampler.evcount, "\n")
150+
for k,v in pairs(alist.l) do
151+
add_to_payload(k, "\t", v, "\n")
152+
end
153+
inject_payload("tsv", "statistics")
154+
end
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
filename = "moz_security_hh_cms.lua"
2+
message_matcher = "Logger == 'input.hh_cms'"
3+
ticker_interval = 0
4+
5+
id_field = "Fields[id]"
6+
7+
sample_min_id = 100
8+
sample_max_id = 10000
9+
sample_min_ev = 0
10+
sample_window = 60
11+
sample_ticks = 1000
12+
threshold_cap = 2
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
filename = "generate_hh_cms.lua"

0 commit comments

Comments
 (0)