Skip to content

Commit e570031

Browse files
authored
Add json_extract plugin (#590)
* Add json_extract plugin * Rework with jx * Fix genFields for benchs * gen-doc * rework
1 parent a35f6d6 commit e570031

File tree

11 files changed

+445
-1
lines changed

11 files changed

+445
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ TBD: throughput on production servers.
4242

4343
**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)
4444

45-
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
45+
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
4646

4747
**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)
4848

_sidebar.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
- [join_template](plugin/action/join_template/README.md)
3535
- [json_decode](plugin/action/json_decode/README.md)
3636
- [json_encode](plugin/action/json_encode/README.md)
37+
- [json_extract](plugin/action/json_extract/README.md)
3738
- [keep_fields](plugin/action/keep_fields/README.md)
3839
- [mask](plugin/action/mask/README.md)
3940
- [modify](plugin/action/modify/README.md)

cmd/file.d/file.d.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
_ "github.com/ozontech/file.d/plugin/action/join_template"
2828
_ "github.com/ozontech/file.d/plugin/action/json_decode"
2929
_ "github.com/ozontech/file.d/plugin/action/json_encode"
30+
_ "github.com/ozontech/file.d/plugin/action/json_extract"
3031
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
3132
_ "github.com/ozontech/file.d/plugin/action/mask"
3233
_ "github.com/ozontech/file.d/plugin/action/modify"

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/cenkalti/backoff/v4 v4.2.1
1717
github.com/cespare/xxhash/v2 v2.2.0
1818
github.com/euank/go-kmsg-parser v2.0.0+incompatible
19+
github.com/go-faster/jx v1.1.0
1920
github.com/go-redis/redis v6.15.9+incompatible
2021
github.com/golang/mock v1.6.0
2122
github.com/google/uuid v1.3.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
8787
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
8888
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
8989
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
90+
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
91+
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
9092
github.com/go-ini/ini v1.62.0 h1:7VJT/ZXjzqSrvtraFp4ONq80hTcRQth1c9ZnQ3uNQvU=
9193
github.com/go-ini/ini v1.62.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
9294
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
@@ -468,6 +470,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58
468470
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
469471
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
470472
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
473+
golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0=
474+
golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
471475
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
472476
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
473477
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=

plugin/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,11 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\
392392

393393

394394
[More details...](plugin/action/json_encode/README.md)
395+
## json_extract
396+
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
397+
> If extracted field already exists in the event root, it will be overridden.
398+
399+
[More details...](plugin/action/json_extract/README.md)
395400
## keep_fields
396401
It keeps the list of the event fields and removes others.
397402

plugin/action/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\
227227

228228

229229
[More details...](plugin/action/json_encode/README.md)
230+
## json_extract
231+
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
232+
> If extracted field already exists in the event root, it will be overridden.
233+
234+
[More details...](plugin/action/json_extract/README.md)
230235
## keep_fields
231236
It keeps the list of the event fields and removes others.
232237

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# JSON extract plugin
2+
@introduction
3+
4+
### Examples
5+
@examples
6+
7+
### Benchmarks
8+
@benchmarks
9+
10+
### Config params
11+
@config-params|description

plugin/action/json_extract/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# JSON extract plugin
2+
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
3+
> If extracted field already exists in the event root, it will be overridden.
4+
5+
### Examples
6+
```yaml
7+
pipelines:
8+
example_pipeline:
9+
...
10+
actions:
11+
- type: json_extract
12+
field: log
13+
extract_field: error.code
14+
...
15+
```
16+
The original event:
17+
```json
18+
{
19+
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
20+
"time": "2024-03-01T10:49:28.263317941Z"
21+
}
22+
```
23+
The resulting event:
24+
```json
25+
{
26+
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
27+
"time": "2024-03-01T10:49:28.263317941Z",
28+
"code": 2
29+
}
30+
```
31+
32+
### Benchmarks
33+
Performance comparison of `json_extract` and `json_decode` plugins.
34+
`json_extract` on average 3 times faster than `json_decode`.
35+
36+
| json (length) | json_extract (time ns) | json_decode (time ns) |
37+
|---------------|------------------------|-----------------------|
38+
| 129 | 33 | 176 |
39+
| 309 | 264 | 520 |
40+
| 2109 | 2263 | 6778 |
41+
| 10909 | 11289 | 32205 |
42+
| 21909 | 23277 | 62819 |
43+
44+
### Config params
45+
**`field`** *`cfg.FieldSelector`* *`required`*
46+
47+
The event field from which to extract. Must be a string.
48+
49+
<br>
50+
51+
**`extract_field`** *`cfg.FieldSelector`* *`required`*
52+
53+
Field to extract.
54+
55+
<br>
56+
57+
58+
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package json_extract
2+
3+
import (
4+
"bytes"
5+
6+
"github.com/go-faster/jx"
7+
"github.com/ozontech/file.d/cfg"
8+
"github.com/ozontech/file.d/fd"
9+
"github.com/ozontech/file.d/pipeline"
10+
insaneJSON "github.com/vitkovskii/insane-json"
11+
)
12+
13+
/*{ introduction
14+
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
15+
> If extracted field already exists in the event root, it will be overridden.
16+
}*/
17+
18+
/*{ examples
19+
```yaml
20+
pipelines:
21+
example_pipeline:
22+
...
23+
actions:
24+
- type: json_extract
25+
field: log
26+
extract_field: error.code
27+
...
28+
```
29+
The original event:
30+
```json
31+
{
32+
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
33+
"time": "2024-03-01T10:49:28.263317941Z"
34+
}
35+
```
36+
The resulting event:
37+
```json
38+
{
39+
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
40+
"time": "2024-03-01T10:49:28.263317941Z",
41+
"code": 2
42+
}
43+
```
44+
}*/
45+
46+
/*{ benchmarks
47+
Performance comparison of `json_extract` and `json_decode` plugins.
48+
`json_extract` on average 3 times faster than `json_decode`.
49+
50+
| json (length) | json_extract (time ns) | json_decode (time ns) |
51+
|---------------|------------------------|-----------------------|
52+
| 129 | 33 | 176 |
53+
| 309 | 264 | 520 |
54+
| 2109 | 2263 | 6778 |
55+
| 10909 | 11289 | 32205 |
56+
| 21909 | 23277 | 62819 |
57+
}*/
58+
59+
type Plugin struct {
60+
config *Config
61+
decoder *jx.Decoder
62+
}
63+
64+
// ! config-params
65+
// ^ config-params
66+
type Config struct {
67+
// > @3@4@5@6
68+
// >
69+
// > The event field from which to extract. Must be a string.
70+
Field cfg.FieldSelector `json:"field" parse:"selector" required:"true"` // *
71+
Field_ []string
72+
73+
// > @3@4@5@6
74+
// >
75+
// > Field to extract.
76+
ExtractField cfg.FieldSelector `json:"extract_field" parse:"selector" required:"true"` // *
77+
ExtractField_ []string
78+
}
79+
80+
func init() {
81+
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
82+
Type: "json_extract",
83+
Factory: factory,
84+
})
85+
}
86+
87+
func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
88+
return &Plugin{}, &Config{}
89+
}
90+
91+
func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
92+
p.config = config.(*Config)
93+
p.decoder = &jx.Decoder{}
94+
}
95+
96+
func (p *Plugin) Stop() {}
97+
98+
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
99+
jsonNode := event.Root.Dig(p.config.Field_...)
100+
if jsonNode == nil {
101+
return pipeline.ActionPass
102+
}
103+
104+
p.decoder.ResetBytes(jsonNode.AsBytes())
105+
extract(event.Root, p.decoder, p.config.ExtractField_, 0, false)
106+
return pipeline.ActionPass
107+
}
108+
109+
// extract extracts field from decoder and adds it to the root.
110+
// `skipAddField` flag is required for proper benchmarking.
111+
func extract(root *insaneJSON.Root, d *jx.Decoder, field []string, depth int, skipAddField bool) {
112+
objIter, err := d.ObjIter()
113+
if err != nil {
114+
return
115+
}
116+
117+
for objIter.Next() {
118+
if bytes.Equal(objIter.Key(), pipeline.StringToByteUnsafe(field[depth])) {
119+
if depth == len(field)-1 { // add field
120+
if skipAddField {
121+
_ = d.Skip()
122+
} else {
123+
addField(root, field[depth], d)
124+
}
125+
} else { // go deep
126+
raw, err := d.Raw()
127+
if err != nil {
128+
break
129+
}
130+
d.ResetBytes(raw)
131+
extract(root, d, field, depth+1, skipAddField)
132+
}
133+
break
134+
} else if err = d.Skip(); err != nil {
135+
break
136+
}
137+
}
138+
}
139+
140+
func addField(root *insaneJSON.Root, field string, d *jx.Decoder) {
141+
switch d.Next() {
142+
case jx.Number:
143+
num, _ := d.Num()
144+
intVal, err := num.Int64()
145+
if err == nil {
146+
root.AddFieldNoAlloc(root, field).MutateToInt64(intVal)
147+
} else {
148+
floatVal, err := num.Float64()
149+
if err == nil {
150+
root.AddFieldNoAlloc(root, field).MutateToFloat(floatVal)
151+
}
152+
}
153+
case jx.String:
154+
s, _ := d.StrBytes()
155+
root.AddFieldNoAlloc(root, field).MutateToBytesCopy(root, s)
156+
case jx.Null:
157+
root.AddFieldNoAlloc(root, field).MutateToNull()
158+
case jx.Bool:
159+
b, _ := d.Bool()
160+
root.AddFieldNoAlloc(root, field).MutateToBool(b)
161+
case jx.Object, jx.Array:
162+
raw, _ := d.Raw()
163+
root.AddFieldNoAlloc(root, field).MutateToJSON(root, raw.String())
164+
default:
165+
_ = d.Skip()
166+
}
167+
}

0 commit comments

Comments
 (0)