Skip to content

Commit 4540c98

Browse files
author
Mike Trinkala
authored
Merge pull request #295 from mozilla-services/bugzilla_1448343
Bugzilla 1448343 - Document the 'seen' docType parquet schemas
2 parents 2d8f3f0 + b22f6f6 commit 4540c98

File tree

2 files changed

+217
-1
lines changed

2 files changed

+217
-1
lines changed

moz_telemetry/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55
cmake_minimum_required(VERSION 3.0)
6-
project(moz-telemetry VERSION 1.2.17 LANGUAGES C)
6+
project(moz-telemetry VERSION 1.2.18 LANGUAGES C)
77
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Mozilla Firefox Telemetry Data Processing")
88
set(CPACK_DEBIAN_PACKAGE_DEPENDS "${PACKAGE_PREFIX}-moz-ingest (>= 0.0.1), ${PACKAGE_PREFIX}-lsb (>= 1.1.0), ${PACKAGE_PREFIX}-circular-buffer (>= 1.0.2), ${PACKAGE_PREFIX}-heka (>= 1.1.9), ${PACKAGE_PREFIX}-elasticsearch (>= 1.0.3), ${PACKAGE_PREFIX}-rjson (>= 1.1.0), ${PACKAGE_PREFIX}-lfs (>= 1.6.4)")
99
string(REGEX REPLACE "[()]" "" CPACK_RPM_PACKAGE_REQUIRES ${CPACK_DEBIAN_PACKAGE_DEPENDS})
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
-- This Source Code Form is subject to the terms of the Mozilla Public
2+
-- License, v. 2.0. If a copy of the MPL was not distributed with this
3+
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
--[[
6+
# Mozilla Telemetry Parquet Schema Documentation
7+
8+
Generates parquet schema documentation for each docType in the data stream.
9+
10+
## Sample Configuration
11+
12+
```lua
13+
filename = "moz_telemetry_parquet_schema.lua"
14+
message_matcher = "Uuid < '\003' && Fields[docType] != NIL" -- slightly greater than a 1% sample
15+
ticker_interval = 60
16+
preserve_data = false
17+
```
18+
19+
## Sample Output
20+
21+
Hierarchy:
22+
1. msg.Type
23+
1. msg.Fields[docType]
24+
1. msg.Fields[sourceVersion]
25+
26+
The number in brackets is the number of occurrences of each dimension in the sample.
27+
```
28+
telemetry.duplicate [1415]
29+
first-shutdown [12]
30+
-no version- [12]
31+
message schema {
32+
required binary Logger (UTF8);
33+
required fixed_len_byte_array(16) Uuid;
34+
optional int32 Pid;
35+
optional int32 Severity;
36+
optional binary EnvVersion (UTF8);
37+
required binary Hostname (UTF8);
38+
required int64 Timestamp;
39+
optional binary Payload (UTF8);
40+
required binary Type (UTF8);
41+
required group Fields {
42+
optional binary geoSubdivision1 (UTF8);
43+
required binary appUpdateChannel (UTF8);
44+
required binary documentId (UTF8);
45+
required binary docType (UTF8);
46+
required int64 duplicateDelta;
47+
required binary geoCountry (UTF8);
48+
required binary geoCity (UTF8);
49+
required binary appVersion (UTF8);
50+
required binary appBuildId (UTF8);
51+
required binary appName (UTF8);
52+
}
53+
}
54+
```
55+
--]]
56+
57+
local schema = {}
58+
59+
local function get_parquet_type(t)
60+
if t == -1 then
61+
return "mismatch"
62+
elseif t == 1 then
63+
return "binary"
64+
elseif t == 2 then
65+
return "int64"
66+
elseif t == 3 then
67+
return "double"
68+
elseif t == 4 then
69+
return "boolean"
70+
end
71+
return "binary" -- default
72+
end
73+
74+
75+
local function get_entry_table(t, key)
76+
local v = t[key]
77+
if not v then
78+
v = {}
79+
v.cnt = 0
80+
v.headers = {
81+
Uuid = {cnt = 0, type = "fixed_len_byte_array(16)"},
82+
Timestamp = {cnt = 0, type = "int64"},
83+
Logger = {cnt = 0, type = "binary"},
84+
Hostname = {cnt = 0, type = "binary"},
85+
Type = {cnt = 0, type = "binary"},
86+
Payload = {cnt = 0, type = "binary"},
87+
EnvVersion = {cnt = 0, type = "binary"},
88+
Pid = {cnt = 0, type = "int32"},
89+
Severity = {cnt = 0, type = "int32"},
90+
}
91+
v.fields = {}
92+
t[key] = v
93+
end
94+
v.cnt = v.cnt + 1
95+
return v
96+
end
97+
98+
99+
local function get_table(t, key)
100+
local v = t[key]
101+
if not v then
102+
v = {}
103+
v.cnt = 0
104+
t[key] = v
105+
end
106+
v.cnt = v.cnt + 1
107+
return v
108+
end
109+
110+
111+
local function output_parquet_headers(t, cnt)
112+
for k, v in pairs(t) do
113+
add_to_payload(" ")
114+
if cnt ~= v.cnt then
115+
add_to_payload("optional ")
116+
else
117+
add_to_payload("required ")
118+
end
119+
local meta = ""
120+
if v.type == "binary" then meta = " (UTF8)" end
121+
add_to_payload(v.type, " ", k, meta, ";\n")
122+
end
123+
end
124+
125+
126+
local function output_parquet_fields(t, cnt)
127+
add_to_payload(" required group Fields {\n")
128+
for k, v in pairs(t) do
129+
add_to_payload(" ")
130+
if v.repetition then
131+
add_to_payload("repeated ")
132+
elseif cnt ~= v.cnt then
133+
add_to_payload("optional ")
134+
else
135+
add_to_payload("required ")
136+
end
137+
local meta = ""
138+
local t = get_parquet_type(v.type)
139+
if t == "binary" then meta = " (UTF8)" end
140+
add_to_payload(t, " ", k, meta, ";\n")
141+
end
142+
add_to_payload(" }\n")
143+
end
144+
145+
146+
local function output_versions(t)
147+
for k, v in pairs(t) do
148+
if type(v) == "table" then
149+
local cnt = v.cnt
150+
if k == "" then k = "-no version-" end
151+
add_to_payload(" ", k, " [", cnt, "]\n")
152+
add_to_payload(" message schema {\n")
153+
output_parquet_headers(v.headers, cnt)
154+
output_parquet_fields(v.fields, cnt)
155+
add_to_payload(" }\n")
156+
end
157+
end
158+
end
159+
160+
161+
local function output_types(t)
162+
for k, v in pairs(t) do
163+
if type(v) == "table" then
164+
add_to_payload(" ", k, " [", v.cnt, "]\n")
165+
output_versions(v)
166+
end
167+
end
168+
end
169+
170+
171+
local function output_loggers(schema)
172+
for k, v in pairs(schema) do
173+
if type(v) == "table" then
174+
if k == "" then
175+
k = "-no Type-"
176+
end
177+
add_to_payload(k, " [", v.cnt, "]\n")
178+
output_types(v)
179+
end
180+
end
181+
end
182+
183+
184+
function process_message()
185+
local msg = decode_message(read_message("raw"))
186+
local l = get_table(schema, msg.Type or "")
187+
local t = get_table(l, read_message("Fields[docType]"))
188+
local v = get_entry_table(t, read_message("Fields[sourceVersion]") or "")
189+
190+
for m,n in pairs(v.headers) do
191+
if msg[m] then
192+
n.cnt = n.cnt + 1
193+
end
194+
end
195+
196+
if not msg.Fields then return 0 end
197+
198+
for i, f in ipairs(msg.Fields) do
199+
local entry = v.fields[f.name]
200+
if entry then
201+
entry.cnt = entry.cnt + 1
202+
if f.value_type ~= entry.type then
203+
entry.type = -1 -- mis-matched types
204+
end
205+
else
206+
v.fields[f.name] = {cnt = 1, type = f.value_type, representation = f.representation, repetition = #f.value > 1}
207+
end
208+
end
209+
return 0
210+
end
211+
212+
213+
function timer_event(ns, shutdown)
214+
output_loggers(schema)
215+
inject_payload("txt", "parquet")
216+
end

0 commit comments

Comments
 (0)