Skip to content

Commit 887ebf4

Browse files
author
Mike Trinkala
authored
Merge pull request #175 from mozilla-services/dev
Sprint Oct 16
2 parents 25aacfa + bda6445 commit 887ebf4

File tree

11 files changed

+188
-35
lines changed

11 files changed

+188
-35
lines changed

moz_logging/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
cmake_minimum_required(VERSION 3.0)
6-
project(moz-logging VERSION 0.0.1 LANGUAGES C)
6+
project(moz-logging VERSION 0.0.2 LANGUAGES C)
77
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Mozilla Infrastructure Logging Module")
88
include(sandbox_module)

moz_logging/io_modules/decoders/moz_logging/json_heka.lua

+9
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Decode and inject the resulting message
3434
--]]
3535

3636
-- Imports
37+
local clf = require "lpeg.common_log_format"
3738
local cjson = require "cjson"
3839
local string = require "string"
3940
local util = require "heka.util"
@@ -73,6 +74,14 @@ function decode(data, dh)
7374
else
7475
msg.Type = dh.Type
7576
end
77+
78+
local agent = msg.Fields.agent
79+
if agent then
80+
msg.Fields.user_agent_browser,
81+
msg.Fields.user_agent_version,
82+
msg.Fields.user_agent_os = clf.normalize_user_agent(agent)
83+
end
84+
7685
ok, msg = pcall(inject_message, msg)
7786
if not ok then
7887
err_msg.Payload = msg
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
-- This Source Code Form is subject to the terms of the Mozilla Public
2+
-- License, v. 2.0. If a copy of the MPL was not distributed with this
3+
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
--[[
6+
# Line Splitter Message Decoder Module
7+
8+
Converts a single message packed with individual new line delimited messages
9+
into multiple messages.
10+
11+
## Decoder Configuration Table
12+
13+
```lua
14+
decoders_line_splitter = {
15+
sub_decoder = Decoder module name or grammar module name
16+
}
17+
```
18+
19+
20+
## Functions
21+
22+
### decode
23+
24+
Decode and inject the resulting message
25+
26+
*Arguments*
27+
- data (string) - JSON messages, one per line, with a Heka schema
28+
- default_headers (optional table) - Heka message table containing the default
29+
header values to use, if they are not populated by the decoder. If 'Fields'
30+
is specified it should be in the hashed based format see:
31+
http://mozilla-services.github.io/lua_sandbox/heka/message.html
32+
33+
*Return*
34+
- nil - throws an error on an invalid data type, JSON parse error,
35+
inject_message failure etc.
36+
37+
--]]
38+
39+
-- Imports
40+
local module_name = ...
41+
local module_cfg = require "string".gsub(module_name, "%.", "_")
42+
local cfg = read_config(module_cfg) or {}
43+
assert(type(cfg.sub_decoder) == "string", "sub_decoder must be set")
44+
45+
local string = require "string"
46+
local sd_module = require(cfg.sub_decoder)
47+
48+
local assert = assert
49+
local pairs = pairs
50+
local pcall = pcall
51+
local type = type
52+
53+
local inject_message = inject_message
54+
local read_config = read_config
55+
56+
local M = {}
57+
setfenv(1, M) -- Remove external access to contain everything in the module
58+
59+
local sub_decoder
60+
if cfg.sub_decoder:match("^decoders%.") then
61+
sub_decoder = sd_module.decode
62+
assert(type(sub_decoder) == "function", "sub_decoders, no decode function defined")
63+
else
64+
local grammar = sd_module.grammar or sd_module.syslog_grammar
65+
assert(type(grammar) == "userdata", "sub_decoders, no grammar defined")
66+
sub_decoder = function(data, dh)
67+
if not dh then dh = {} end
68+
local fields = grammar:match(data)
69+
if not fields then return "parse failed" end
70+
for k,v in pairs(fields) do
71+
dh.Fields[k] = v
72+
end
73+
inject_message(dh)
74+
end
75+
end
76+
77+
local err_msg = {
78+
Logger = read_config("Logger"),
79+
Type = "error",
80+
Payload = nil,
81+
Fields = {
82+
data = nil
83+
}
84+
}
85+
86+
function decode(data, dh)
87+
for line in string.gmatch(data, "([^\n]+)\n*") do
88+
local err = sub_decoder(line, dh)
89+
if err then
90+
err_msg.Payload = err
91+
err_msg.Fields.data = line
92+
pcall(inject_message, err_msg)
93+
end
94+
end
95+
end
96+
97+
return M

moz_pioneer/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
cmake_minimum_required(VERSION 3.0)
6-
project(moz-pioneer VERSION 0.0.2 LANGUAGES C)
6+
project(moz-pioneer VERSION 0.0.3 LANGUAGES C)
77
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Mozilla Firefox Pioneer Data Processing")
88
set(CPACK_DEBIAN_PACKAGE_DEPENDS "${PACKAGE_PREFIX}-moz-ingest (>= 0.0.1), ${PACKAGE_PREFIX}-moz-telemetry (>= 1.2.13), ${PACKAGE_PREFIX}-jose (>= 0.0.1), ${PACKAGE_PREFIX}-lpeg (>= 1.0.0), ${PACKAGE_PREFIX}-rjson (>= 1.1.0)")
99
string(REGEX REPLACE "[()]" "" CPACK_RPM_PACKAGE_REQUIRES ${CPACK_DEBIAN_PACKAGE_DEPENDS})

moz_pioneer/io_modules/decoders/moz_ingest/pioneer.lua

+27-9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ local string = require "string"
6666
local module_cfg = string.gsub(module_name, "%.", "_")
6767

6868
local io = require "io"
69+
local lfs = require "lfs"
6970
local jose = require "jose"
7071
local rjson = require "rjson"
7172
local dt = require "lpeg.date_time"
@@ -84,6 +85,24 @@ local inject_message = inject_message
8485
local M = {}
8586
setfenv(1, M) -- Remove external access to contain everything in the module
8687

88+
89+
function load_json_schemas_dir(path)
90+
local schemas = {}
91+
for fn in lfs.dir(path) do
92+
local name = fn:match("(.+%.%d+)%.schema%.json$")
93+
if name then
94+
local fh = assert(io.input(string.format("%s/%s", path, fn)))
95+
local schema = fh:read("*a")
96+
fh:close()
97+
local ok, rjs = pcall(rjson.parse_schema, schema)
98+
if not ok then error(string.format("%s: %s", fn, rjs)) end
99+
schemas[name] = rjs
100+
end
101+
end
102+
return schemas
103+
end
104+
105+
87106
local function load_cfg()
88107
local cfg = read_config(module_cfg)
89108
assert(type(cfg) == "table", module_cfg .. " must be a table")
@@ -96,7 +115,7 @@ local function load_cfg()
96115
if not ok then error(string.format("%s: %s", cfg.envelope_schema_file, envelope_schema)) end
97116

98117
assert(type(cfg.schema_path) == "string", "schema_path must be set")
99-
local schemas = miu.load_json_schemas(cfg.schema_path)
118+
local schemas = load_json_schemas_dir(cfg.schema_path)
100119

101120
local cnt = 0
102121
local jose_keys = {}
@@ -173,9 +192,10 @@ function transform_message(hsr, msg)
173192
msg.Fields.normalizedChannel = mtn.channel(msg.Fields.appUpdateChannel)
174193

175194
local pay = env:find("payload")
176-
msg.Fields.studyVersion = env:value(env:find(pay, "studyVersion"))
177-
msg.Fields.studyName = env:value(env:find(pay, "studyName"))
178-
msg.Fields.pioneerId = env:value(env:find(pay, "pioneerId"))
195+
msg.Fields.schemaName = env:value(env:find(pay, "schemaName"))
196+
msg.Fields.schemaVersion = env:value(env:find(pay, "schemaVersion"))
197+
msg.Fields.studyName = env:value(env:find(pay, "studyName"))
198+
msg.Fields.pioneerId = env:value(env:find(pay, "pioneerId"))
179199

180200
-- verify the decryption and validation metadata
181201
local ekey = env:value(env:find(pay, "encryptionKeyId"))
@@ -184,12 +204,10 @@ function transform_message(hsr, msg)
184204
error("jose\tno encryptionKeyId: " .. ekey, 0)
185205
end
186206

187-
local schema = schemas[msg.Fields.studyName]
188-
if schema then schema = schema[msg.Fields.studyVersion] end
189-
207+
local sn = string.format("%s.%d", msg.Fields.schemaName, msg.Fields.schemaVersion)
208+
local schema = schemas[sn]
190209
if not schema then
191-
error(string.format("schema\tno study schema: %s ver: %d",
192-
msg.Fields.studyName, msg.Fields.studyVersion), 0)
210+
error(string.format("schema\tno schema: %s study: %s", sn, msg.Fields.studyName), 0)
193211
end
194212

195213
-- decrypt and validate the study data

moz_pioneer/tests/hindsight/input.hpb

261 Bytes
Binary file not shown.

moz_pioneer/tests/hindsight/run/analysis/verify_decoder.lua

+17-13
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ require "string"
1010

1111
local messages = {
1212
{Logger = "telemetry", Type = "telemetry", Fields = {
13-
studyVersion = 1,
13+
schemaName = "event",
14+
schemaVersion = 1,
1415
appVersion = "45.0",
15-
studyName = "example",
16+
studyName = "test-study",
1617
pioneerId = "11111111-1111-1111-1111-111111111111",
17-
submission = '{"exampleString":"foobar"}',
18+
submission = '{"eventId":"enrolled"}',
1819
appUpdateChannel = "beta",
1920
normalizedChannel = "beta",
2021
creationTimestamp = 1.446686743312e+18,
@@ -28,9 +29,10 @@ local messages = {
2829
}
2930
},
3031
{Logger = "telemetry", Type = "telemetry.metadata", Fields = {
31-
studyVersion = 1,
32+
schemaName = "event",
33+
schemaVersion = 1,
3234
appVersion = "45.0",
33-
studyName = "example",
35+
studyName = "test-study",
3436
pioneerId = "11111111-1111-1111-1111-111111111111",
3537
appUpdateChannel = "beta",
3638
normalizedChannel = "beta",
@@ -72,11 +74,12 @@ local messages = {
7274
"xpcomAbi": "x86_64-gcc3"
7375
},
7476
"payload" : {
75-
"encryptedData": "eyJlbmMiOiAiQTI1NkdDTSIsICJhbGciOiAiUlNBLU9BRVAiLCAiemlwIjogIkRFRiIsICJraWQiOiAiMjk0MDkyMWUtMzY0Ni00NTFjLTg1MTAtOTcxNTUyNzU0ZTc0In0.hnXLQf6VoLkjhikf2jeNUtWlwXuCj8nKUjcrOpsWvpLpeDxOJKrxs5Iqm5myxV8UIOrHCGnowmUvvG4bu-KLdC-QzXTmMGovwE8_VxVFO6gCVmKlgLBDgcfjbKajy7JGtozomuSz-lhP5BIMzVfMF6XoVSKq2YDskzVTMGZdVnPvzFjYJpK8lEKxA_MVZF-NJ_JJKn6jtbo3y5SvuXPOKVu7JBCwvrb6VHEm0tMwhCzeH3YemUluOXTeoCMXnKx0bRJhEoeb8Bh-oNLSEYKVI1jFK-Bzc3VZ1cBMTQ_nKWorObgewdYFZsxM4bChT3dtKxpb_udG2KtaUxyxfAoZ2r0GvphSnC2lITHoHk49C54-VttMZcMyHHvFEXO_gnBsVOoP0Z2b3LXAyL6dhxg8doc4CWTj3oAA2MnzQl7ZoaiFZc4NYuav2MYpSomyGmXSLs9qHXYSMhpq3q-qSyPEm28x2iEb05HsK9e0jUyRcy1JimjaC5x4tBi4xNMRpRUNWrYkqLDsi5EneUTCmVEhKStNk6fHnqOTWoM5A_55KAxTQWknsSVaGWXQtk2O9dsDa6f7Rm3Zssb_bQVXc_EjHimXd02fSnv6rThZ2qa7-wDlKYD88vrmD358oxdwziVBpfU4DRjdMefdJybxKz_l8fwVtp7dZBNC8p469s_a2y8.ZmoNDzcRaVqrnwgO.2iyWrGdfGuAtp1Tv.9yVPdJMA63ZuNpmAKLPnTw",
77+
"encryptedData": "eyJlbmMiOiAiQTI1NkdDTSIsICJhbGciOiAiUlNBLU9BRVAiLCAiemlwIjogIkRFRiIsICJraWQiOiAiMjk0MDkyMWUtMzY0Ni00NTFjLTg1MTAtOTcxNTUyNzU0ZTc0In0.ds7LT3vtshHRibx3twJQSKb8n_W6EtvpD7597KjMBsA4NySKTF0cgGE4m7MvMDXzYUeJq5K1sAnuYNAFdUML2my06rXWB3Q8gP6PRjF2hYzj84NKYVBwBr0XdgSfGqx_ja3XZX0f8LKkCUppRDo_9YuK-7kkw4_NDMLen-f3o9ta87w9Nn9lbw1m62yhkR8S2jiK3W4jpCnbxIeyZMyo-u-iCdEN4gdtH3ledcpeSZXn6b-L6d_4iQx2Y98Y5xvkSakXkipowsDd9FI7yE_gprV1pJDV29lDmH7Km_9ZSGA6NQTZs0fkOcJIhZprW4Bq_aP2tlPBU343dC-6lrVitQLxGgYgUDZduR4E0T3XJ4LJaPNIJKsTx-s9UXGur0U0qCIBT-bJKD3bBeVJmSA7ZMcuOGHktCQsx0Fr84IOInFaOCZSulPS_H0IThB23Z-Z9e-dX-c1s-YfjSUvMiyuG_mciDLo27AsN2FMOPQD-tKkOgnz231Ri3GTK777OYpsYTr8Q0vBRdVLJIi_-dIMTSuRHX3RCwduVpR_EnPdvMh_O7949W45gLoyz_Z96BarU6WDssPrCLGPkHHVDQqCO7nHz8RB5l5Jpq0Y2D_2Aaq14qDZBfldgkZJC6QufefoVtwqWWdd_R_PE_gqazJmGTsesHtgTnvT8VydnmxeyIo.LM9z4RRfWVynWd0u.SQehWUthukk7EUX2.L8zyxPzzWbDalhMdLlCPhg",
7678
"encryptionKeyId": "pioneer-20170901",
7779
"pioneerId": "11111111-1111-1111-1111-111111111111",
78-
"studyName": "example",
79-
"studyVersion": 1
80+
"studyName": "test-study",
81+
"schemaName": "event",
82+
"schemaVersion": 1
8083
}
8184
}
8285
}]],
@@ -86,9 +89,10 @@ local messages = {
8689
uri = "/submit/telemetry/0055FAC4-8A1A-4FCA-B380-EBFDC8571A02/pioneer-study/Firefox/45.0/release/20151103030248",
8790
DecodeErrorType = "json",
8891
DecodeError = "invalid study: failed to parse offset:1 Invalid value.",
89-
studyVersion = 1,
92+
schemaName = "event",
93+
schemaVersion = 1,
9094
appVersion = "45.0",
91-
studyName = "example",
95+
studyName = "test-study",
9296
pioneerId = "11111111-1111-1111-1111-111111111111",
9397
appUpdateChannel = "beta",
9498
normalizedChannel = "beta",
@@ -104,7 +108,7 @@ local messages = {
104108
},
105109
{Logger = "telemetry", Type = "telemetry.error", Fields = {
106110
DecodeErrorType = "json",
107-
DecodeError = "study validation: SchemaURI: #/properties/exampleString Keyword: type DocumentURI: #/exampleString"
111+
DecodeError = "study validation: SchemaURI: #/properties/eventId Keyword: type DocumentURI: #/eventId"
108112
}
109113
},
110114
{Logger = "telemetry", Type = "telemetry.error", Fields = {
@@ -114,12 +118,12 @@ local messages = {
114118
},
115119
{Logger = "telemetry", Type = "telemetry.error", Fields = {
116120
DecodeErrorType = "schema",
117-
DecodeError = "no study schema: bogus ver: 1"
121+
DecodeError = "no schema: bogus.1 study: test-study"
118122
}
119123
},
120124
{Logger = "telemetry", Type = "telemetry.error", Fields = {
121125
DecodeErrorType = "schema",
122-
DecodeError = "no study schema: example ver: 2"
126+
DecodeError = "no schema: event.2 study: test-study"
123127
}
124128
},
125129
{Logger = "telemetry", Type = "telemetry.error", Fields = {

moz_pioneer/tests/hindsight/run/input/generate_data.lua

+11-10
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,20 @@ local envelope = [[
4444
"encryptionKeyId": "%s",
4545
"pioneerId": "11111111-1111-1111-1111-111111111111",
4646
"studyName": "%s",
47-
"studyVersion": %d
47+
"schemaName": "%s",
48+
"schemaVersion": %d
4849
}
4950
}
5051
}]]
5152

5253

5354
local submissions = {
54-
{'{ "exampleString" : "foobar"}', 'pioneer-20170901', "example", 1}, -- valid
55-
{'text', 'pioneer-20170901', "example", 1}, -- parse failure
56-
{'{ "exampleString" : 1}', 'pioneer-20170901', "example", 1}, -- study schema validation error
57-
{'{ "exampleString" : "foobar"}', 'pioneer-20200901', "example", 1}, -- no encryption key
58-
{'{ "exampleString" : "foobar"}', 'pioneer-20170901', "bogus", 1}, -- no schema
59-
{'{ "exampleString" : "foobar"}', 'pioneer-20170901', "example", 2}, -- no version
55+
{'{ "eventId" : "enrolled"}', 'pioneer-20170901', "test-study", "event", 1}, -- valid
56+
{'text', 'pioneer-20170901', "test-study", "event", 1}, -- parse failure
57+
{'{ "eventId" : 1}', 'pioneer-20170901', "test-study", "event", 1}, -- study schema validation error
58+
{'{ "eventId" : "foobar"}', 'pioneer-20200901', "test-study", "event", 1}, -- no encryption key
59+
{'{ "eventId" : "foobar"}', 'pioneer-20170901', "test-study", "bogus", 1}, -- no schema
60+
{'{ "eventId" : "foobar"}', 'pioneer-20170901', "test-study", "event", 2}, -- no version
6061
}
6162

6263
local msg = {
@@ -81,7 +82,7 @@ function process_message()
8182
for i,v in ipairs(submissions) do
8283
msg.Fields.uri = string.format(uri_template, i)
8384
local jwe = jose.jwe_encrypt(jwk, v[1], hdr)
84-
msg.Fields.content = string.format(envelope, jwe:export(), v[2], v[3], v[4])
85+
msg.Fields.content = string.format(envelope, jwe:export(), v[2], v[3], v[4], v[5])
8586
inject_message(msg)
8687
if i == 1 then inject_message(msg) end -- test duplicate
8788
cnt = cnt + 1
@@ -103,14 +104,14 @@ function process_message()
103104
-- invalid import
104105
cnt = cnt + 1
105106
msg.Fields.uri = string.format(uri_template, cnt)
106-
msg.Fields.content = string.format(envelope, "xxxxxxxxxxxxxxxx", v[2], v[3], v[4])
107+
msg.Fields.content = string.format(envelope, "xxxxxxxxxxxxxxxx", v[2], v[3], v[4], v[5])
107108
inject_message(msg)
108109

109110
-- invalid encryption
110111
cnt = cnt + 1
111112
msg.Fields.uri = string.format(uri_template, cnt)
112113
local jwe = jose.jwe_encrypt(jwk, submissions[1][1], hdr)
113-
msg.Fields.content = string.format(envelope, string.gsub(jwe:export(), ".$", "X"), v[2], v[3], v[4])
114+
msg.Fields.content = string.format(envelope, string.gsub(jwe:export(), ".$", "X"), v[2], v[3], v[4], v[5])
114115
inject_message(msg)
115116

116117
return 0

syslog/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
cmake_minimum_required(VERSION 3.0)
6-
project(syslog VERSION 1.0.7 LANGUAGES C)
6+
project(syslog VERSION 1.0.8 LANGUAGES C)
77
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Syslog parsers and collectors")
88
set(CPACK_DEBIAN_PACKAGE_DEPENDS "${PACKAGE_PREFIX}-lpeg (>= 1.0.5), ${PACKAGE_PREFIX}-socket (>= 3.0)")
99
string(REGEX REPLACE "[()]" "" CPACK_RPM_PACKAGE_REQUIRES ${CPACK_DEBIAN_PACKAGE_DEPENDS})

syslog/modules/lpeg/linux/sshd.lua

+14
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ syslog_grammar = l.Ct(
6060
* l.Cg((l.P(1)-l.S" ")^0, "remote_user")
6161
* l.P(-1)
6262
)
63+
+ (
64+
l.P"Connection from "
65+
* l.Cg(ipv46, "remote_addr")
66+
* l.P" port "
67+
* l.Cg(l.digit^1 / tonumber, "remote_port")
68+
* l.P" on "
69+
* l.Cg(ipv46, "local_addr")
70+
* l.P" port "
71+
* l.Cg(l.digit^1 / tonumber, "local_port")
72+
* l.P(-1)
73+
)
6374
+ (
6475
l.P"Connection closed by "
6576
* l.Cg(ipv46, "remote_addr")
@@ -106,6 +117,9 @@ syslog_grammar = l.Ct(
106117
* l.P": "
107118
* l.Cg(l.P(1)^1, "sshd_error")
108119
)
120+
+ (
121+
l.Cg(l.P"Set " * l.P(1)^1, "set")
122+
)
109123
)
110124

111125
return M

syslog/tests/linux/sshd.lua

+10
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ assert(fields.remote_addr.value == '10.2.3.4', fields.remote_addr)
2828
assert(fields.disconnect_reason == 11, fields.disconnect_reason)
2929
assert(fields.disconnect_msg == 'The user disconnected the application [preauth]', fields.disconnect_msg)
3030

31+
log = "Connection from 121.18.238.123 port 60512 on 172.31.41.219 port 22"
32+
fields = grammar:match(log)
33+
assert(fields.remote_addr.value == '121.18.238.123', fields.remote_addr)
34+
assert(fields.remote_port == 60512, fields.remote_port)
35+
assert(fields.local_addr.value == '172.31.41.219', fields.local_addr)
36+
assert(fields.local_port == 22, fields.local_port)
37+
38+
log = "Set /proc/self/oom_score_adj to 0"
39+
fields = grammar:match(log)
40+
assert(fields.set == log, fields.set)

0 commit comments

Comments
 (0)