Skip to content

Commit 91e8ee4

Browse files
author
Mike Trinkala
authored
Merge pull request #235 from mozilla-services/dev
Sprint2018-1 plus subdecoder refactor
2 parents b280245 + d19edcc commit 91e8ee4

File tree

13 files changed

+652
-223
lines changed

13 files changed

+652
-223
lines changed

README.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,14 @@ inject one or more Heka messages into the system.
8383
*Arguments*
8484
- data (string) - Raw data from the input sandbox that needs
8585
parsing/decoding/transforming
86-
- default_headers (optional table) - Heka message table containing the default
86+
- default_headers (table/nil/none) - Heka message table containing the default
8787
header values to use, if they are not populated by the decoder. If 'Fields'
8888
is specified it should be in the hashed based format see:
89-
http://mozilla-services.github.io/lua_sandbox/heka/message.html
89+
http://mozilla-services.github.io/lua_sandbox/heka/message.html. In the case
90+
of multiple decoders this may be the message from the previous input/decoding
91+
step.
92+
- mutable (bool/nil/none) - Flag indicating if the decoder can modify the
93+
default_headers/msg structure in place or if it has to be copied first.
9094

9195
*Return*
9296
- err (nil, string) or throws an error on invalid data or an inject message
+288
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
-- This Source Code Form is subject to the terms of the Mozilla Public
2+
-- License, v. 2.0. If a copy of the MPL was not distributed with this
3+
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
--[[
6+
# Sub Decoder Utility Module
7+
8+
Common funtionality to instantiate an LPeg based sub decoder configuration.
9+
10+
## Functions
11+
12+
### load_sub_decoders
13+
14+
Returns a table of sub_decoder functions, keyed by sub_decoder_name.
15+
16+
*Arguments*
17+
- sub_decoders (table) sub_decoders configuration table
18+
```lua
19+
sub_decoders = {
20+
-- sub_decoder_name (string) = (string/array) a sub_decoder_name of "*" is
21+
-- treated as the default when the name does not exist.
22+
-- string: decoder or grammar module name
23+
-- array: (string and/or array) list of specific messages to parse
24+
-- string: Sample message used to locate the correct grammar
25+
-- If no grammar matches the sample message then an error is thrown
26+
-- and another grammar or module must be added to the printf_messages
27+
-- configuration. If multiple grammars match the message, the first
28+
-- grammar with the most specific match is selected.
29+
-- Note: a special token of `<<DROP>>` and `<<FAIL>>` are reserved for
30+
-- the last entry in the array to handle the no match case; <<DROP>>
31+
-- silently discards the message and <<FAIL>> reports an error. If
32+
-- neither is specified the default no match behavior is to inject the
33+
-- original message produced by the syslog decoder.
34+
-- array:
35+
-- column 1: (string/array)
36+
-- string: Sample message (see above)
37+
-- array: printf.build_grammar format specification
38+
-- column 2: (table/nil)
39+
-- Transformation table with Heka message field name keys and a
40+
-- value of the fully qualified transformation function name. The
41+
-- function returns no values but can error; it receives two
42+
-- arguments: the Heka message table and the field name to act on.
43+
-- The function can modify the message in any way.
44+
45+
nginx = "decoders.nginx.access", -- decoder module name
46+
kernel = "lpeg.linux.kernel", -- grammar module name, must export an lpeg grammar named 'grammar' or 'syslog_grammar'
47+
sshd = {
48+
-- openssh_portable auth message, imported in printf_messages
49+
{"Accepted publickey for foobar from 10.11.12.13 port 4242 ssh2", {remote_addr = "geoip.heka.add_geoip"}},
50+
},
51+
foo = {
52+
"/tmp/input.tsv:23: invalid line", -- custom log defined in printf_messages
53+
{{"Status: %s", "status"}, nil}, -- inline printf spec, no transformation
54+
},
55+
}
56+
```
57+
- printf_messages (table/nil) see: https://mozilla-services.github.io/lua_sandbox_extensions/lpeg/modules/lpeg/printf.html
58+
59+
*Return*
60+
- sub_decoders (table)
61+
62+
### load_sub_decoder
63+
64+
Returns the decode function for a single sub_decoder.
65+
66+
*Arguments*
67+
- sub_decoder (string/table) sub_decoder configuration entry
68+
```lua
69+
sub_decoder = "decoders.nginx.access"
70+
```
71+
- printf_messages (table/nil) printf_message table (see above)
72+
73+
*Return*
74+
- decode (function)
75+
76+
### copy_message
77+
78+
Copies a message for use in decoder/subdecoder
79+
80+
*Arguments*
81+
* src (table) Heka message table. This is a shallow copy of the individual
82+
values in the Fields hash and assumes they will be replaced as opposed to
83+
modififed when they are tables. The main use of this function is to populate
84+
a new message with defaults.
85+
* mutable (bool/nil/none)
86+
87+
*Return*
88+
* msg (table) a Heka message hash schema format
89+
90+
### add_fields
91+
92+
Add the fields hash to the msg.Fields overwriting on collision.
93+
94+
*Arguments*
95+
* msg (table) Heka message
96+
* fields (table) Heka message Fields hash
97+
98+
*Return*
99+
* none - msg is modified in place
100+
101+
--]]
102+
103+
-- Imports
104+
local string = require "string"
105+
local printf = require "lpeg.printf"
106+
107+
local error = error
108+
local ipairs = ipairs
109+
local pairs = pairs
110+
local require = require
111+
local setmetatable = setmetatable
112+
local type = type
113+
114+
local inject_message = inject_message
115+
116+
local M = {}
117+
setfenv(1, M) -- Remove external access to contain everything in the module
118+
119+
local FAIL_TOKEN = "<<FAIL>>"
120+
local DROP_TOKEN = "<<DROP>>"
121+
122+
local function grammar_decode_fn(g)
123+
return function(data, dh, mutable)
124+
local fields = g:match(data)
125+
if not fields then return "parse failed" end
126+
local msg = copy_message(dh, mutable)
127+
add_fields(msg, fields)
128+
inject_message(msg)
129+
end
130+
end
131+
132+
133+
local function grammar_pick_fn(sd, nomatch_action)
134+
return function(data, dh, mutable)
135+
local msg = dh
136+
local fields
137+
for _,cpg in ipairs(sd) do -- individually check each grammar
138+
fields = cpg[1]:match(data)
139+
if fields then
140+
msg = copy_message(dh, mutable)
141+
add_fields(msg, fields)
142+
if cpg[2] then -- apply user defined transformation functions
143+
for k,f in pairs(cpg[2]) do
144+
f(msg, k)
145+
end
146+
end
147+
break
148+
end
149+
end
150+
if not fields and nomatch_action then
151+
if nomatch_action == DROP_TOKEN then
152+
return
153+
elseif nomatch_action == FAIL_TOKEN then
154+
return "parse failed"
155+
end
156+
end
157+
inject_message(msg)
158+
end
159+
end
160+
161+
162+
local function load_sub_decoder_impl(sd, grammars, sdk)
163+
local sdt = type(sd)
164+
if sdt == "string" then
165+
if sd:match("^decoders%.") then
166+
local decode = require(sd).decode
167+
if type(decode) ~= "function" then
168+
error(string.format("sub_decoder, no decode function defined: %s", sdk))
169+
end
170+
return decode
171+
else
172+
local m = require(sd)
173+
local g = m.grammar or m.syslog_grammar
174+
if type(g) ~= "userdata" then
175+
error(string.format("sub_decoder, no grammar defined: %s", sdk))
176+
end
177+
return grammar_decode_fn(g)
178+
end
179+
elseif sdt == "table" then -- cherry pick printf grammars
180+
local nomatch_action
181+
for i,cpg in ipairs(sd) do
182+
if type(cpg) ~= "table" then
183+
cpg = {cpg}
184+
sd[i] = cpg
185+
end
186+
187+
local g
188+
local typ = type(cpg[1])
189+
if typ == "string" then
190+
if (cpg[1] == DROP_TOKEN or cpg[1] == FAIL_TOKEN) and sd[i + 1] == nil then
191+
nomatch_action = cpg[1]
192+
sd[i] = nil
193+
break
194+
end
195+
g = printf.match_sample(grammars, cpg[1])
196+
if not g then
197+
error(string.format("no grammar found for: %s", cpg[1]))
198+
end
199+
elseif typ == "table" then
200+
g = printf.build_grammar(cpg[1])
201+
else
202+
error(string.format("sub_decoder: %s invalid entry: %d", sdk, i))
203+
end
204+
cpg[1] = g
205+
206+
if cpg[2] then
207+
for k,v in pairs(cpg[2]) do
208+
local fn
209+
local mname, fname = string.match(v, "(.-)%.([^.]+)$")
210+
if mname then
211+
fn = require(mname)[fname]
212+
else
213+
fn = _G[cpg[2]]
214+
end
215+
if type(fn) ~= "function" then
216+
error(string.format("invalid transformation function %s=%s", k, v))
217+
end
218+
cpg[2][k] = fn
219+
end
220+
end
221+
end
222+
return grammar_pick_fn(sd, nomatch_action)
223+
else
224+
error(string.format("sub_decoder: %s invalid type: %s", sdk, sdt))
225+
end
226+
end
227+
228+
229+
function load_sub_decoder(sd, pfm)
230+
local grammars = printf.load_messages(pfm or {})
231+
return load_sub_decoder_impl(sd, grammars, "")
232+
end
233+
234+
235+
function load_sub_decoders(sds, pfm)
236+
local sub_decoders = {}
237+
local grammars = printf.load_messages(pfm or {})
238+
239+
for sdk,sd in pairs(sds or {}) do
240+
if sdk == "*" then
241+
local fn = load_sub_decoder_impl(sd, grammars, sdk)
242+
local mt = {__index = function(t, k) return fn end }
243+
setmetatable(sub_decoders, mt);
244+
else
245+
sub_decoders[sdk] = load_sub_decoder_impl(sd, grammars, sdk)
246+
end
247+
end
248+
return sub_decoders
249+
end
250+
251+
252+
function copy_message(msg, mutable)
253+
if msg and not mutable then
254+
local t = {
255+
Uuid = msg.Uuid,
256+
Logger = msg.Logger,
257+
Hostname = msg.Hostname,
258+
Timestamp = msg.Timestamp,
259+
Type = msg.Type,
260+
Payload = msg.Payload,
261+
EnvVersion = msg.EnvVersion,
262+
Pid = msg.Pid,
263+
Severity = msg.Severity
264+
}
265+
if type(msg.Fields) == "table" then
266+
local f = {}
267+
t.Fields = f
268+
for k,v in pairs(msg.Fields) do
269+
f[k] = v
270+
end
271+
end
272+
return t
273+
end
274+
return msg or {}
275+
end
276+
277+
278+
function add_fields(msg, fields)
279+
if msg.Fields then
280+
for k,v in pairs(fields) do
281+
msg.Fields[k] = v
282+
end
283+
else
284+
msg.Fields = fields
285+
end
286+
end
287+
288+
return M

lpeg/modules/lpeg/printf.lua

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ Compile the provide list of printf_messages into the grammars table.
3030
3131
*Arguments*
3232
- printf_messages (array) - array of printf message specifications to be compiled into grammars
33+
```lua
34+
printf_messages = {
35+
-- array (string and/or array) the order specified here is the load and evaluation order.
36+
-- string: name of a module containing a `printf_messages` array to import
37+
-- array: creates an on the fly grammar using a printf format specifications.
38+
-- see: https://mozilla-services.github.io/lua_sandbox_extensions/lpeg/modules/lpeg/printf.html
39+
40+
{"%s:%lu: invalid line", "path", "linenum"},
41+
"lpeg.openssh_portable", -- must export a `printf_messages` array
42+
}
43+
```
3344
- grammars (array/nil) - optional existing grammar array to append to
3445
- grammars_size (number/nil) - number of items in the array
3546
- module (string/nil) - identifer to help locate errors in nested includes

moz_logging/CMakeLists.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
cmake_minimum_required(VERSION 3.0)
6-
project(moz-logging VERSION 0.0.3 LANGUAGES C)
6+
project(moz-logging VERSION 0.0.4 LANGUAGES C)
77
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Mozilla Infrastructure Logging Module")
8+
set(CPACK_DEBIAN_PACKAGE_DEPENDS "${PACKAGE_PREFIX}-lpeg (>= 1.0.8)")
9+
string(REGEX REPLACE "[()]" "" CPACK_RPM_PACKAGE_REQUIRES ${CPACK_DEBIAN_PACKAGE_DEPENDS})
810
include(sandbox_module)

0 commit comments

Comments
 (0)