Skip to content

Allow nested fields in keep_fields plugin #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 62 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
23a26a1
Refactor
Feb 4, 2025
11aa8d8
Remove generic
Feb 5, 2025
f445490
Add draft
Feb 6, 2025
f3cf048
Refactor
Feb 6, 2025
0fc7de2
Add comments
Feb 7, 2025
a3c9635
Refactor
Feb 13, 2025
a0623e3
Replace assert with require
Feb 13, 2025
d99042d
Clean up after action
Feb 14, 2025
d50faf0
Fix test
Feb 19, 2025
24f2fa9
Fix. Add test
Feb 24, 2025
14ac23d
Remove dig
Feb 27, 2025
b6c2285
Fix linter error
Feb 27, 2025
e79451c
Refactor
Feb 27, 2025
c69c958
Define nested fields on start
Feb 27, 2025
72648eb
Add bench
Mar 6, 2025
7d81a09
Add tree draft
Mar 7, 2025
c06ca3a
Use tree
Mar 7, 2025
ee77350
Fix bench
Mar 7, 2025
2e30581
Refactor
Mar 7, 2025
3bb9adf
Refactor
Mar 10, 2025
859d3f5
Speed up array checker
Mar 10, 2025
ae5846d
Refactor
Mar 10, 2025
6f60a37
Rename files
Mar 10, 2025
4dc12d1
Fix and speed up bench
Mar 10, 2025
d554c80
Fix array checker
Mar 10, 2025
ff32c47
Edit bench
Mar 11, 2025
07415d5
Turn off gc
Mar 11, 2025
ff44343
Stabilize benchmarks
Mar 12, 2025
7f5185f
Add bench (case: all fields kept)
Mar 12, 2025
0791855
Refactor bench
Mar 12, 2025
5ca0696
Add bench (case: half fields kept)
Mar 12, 2025
b07fb8b
Remove slow versions
Mar 13, 2025
89769df
Rename funcs and benchmarks
Mar 13, 2025
46da1f8
Speed up fallback
Mar 13, 2025
de3447d
Add benchmarks for configs with nested fields
Mar 13, 2025
4a43951
Speed up array checker
Mar 13, 2025
d81fe2c
Remove unused methods
Mar 13, 2025
da6d954
Add benchmark for config with deeply nested fields
Mar 13, 2025
805524d
Fix parsing of paths. Add test
Mar 14, 2025
b2180a3
Add comment
Mar 14, 2025
d3084a0
Add tests
Mar 17, 2025
7faac18
Add fast tree
Mar 18, 2025
530cc5d
Add fast tree to benchmarks
Mar 18, 2025
d046f55
Refactor
Mar 18, 2025
888dd4f
Remove benches, add tests
Mar 18, 2025
c831aec
Remove old ways to process events
Mar 18, 2025
9cfe433
Refactor
Mar 18, 2025
d59a3da
Refactor
Mar 18, 2025
d12d768
Remove empty method
Mar 18, 2025
f83bff0
Add blank lines
Mar 18, 2025
b3217e6
Remove all fields if no fields parsed
Mar 19, 2025
da2e1fe
Add test
Mar 19, 2025
132ac9a
Use cfg.ParseNestedFields func
Mar 21, 2025
8b4f1e5
Add test that fails
Mar 21, 2025
7681873
Fix test that fails
Mar 21, 2025
3bb1fd7
Add comment. Rename test
Mar 23, 2025
9adce78
Simplify tree node struct
Mar 23, 2025
8e64ffa
Use plugin loger
Mar 23, 2025
08ded98
Refactor: remove duplicating code
Mar 23, 2025
863e6ac
Cleanup fields bufs anyway
Mar 23, 2025
f16aff1
Delete isRoot flag
Mar 24, 2025
79a9f09
Edit docs
Apr 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,13 @@ func ParseFieldSelector(selector string) []string {
return result
}

// Parses several fields selectors and removes nested fields.
// If there are empty field selectors or no selectors at all error returned.
// For example:
// {"a", "a.b"} and {"a"} give the same results
// because field "a.b" is nested to field "a".
// Used in remove_fields plugin and keep_fields plugin
// to prevent extra selections.
func ParseNestedFields(fields []string) ([][]string, error) {
if len(fields) == 0 {
return nil, errors.New("empty fields list")
Expand Down
31 changes: 31 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,37 @@ because the start as a valid JSON matters.
[More details...](plugin/action/json_extract/README.md)
## keep_fields
It keeps the list of the event fields and removes others.
Nested fields supported: list subfield names separated with dot.
Example:
```
fields: ["a.b.f1", "c"]
# event before processing
{
"a":{
"b":{
"f1":1,
"f2":2
}
},
"c":0,
"d":0
}

# event after processing
{
"a":{
"b":{
"f1":1
}
},
"c":0
}

```

NOTE: if `fields` param contains nested fields they will be removed.
For example `fields: ["a.b", "a"]` gives the same result as `fields: ["a"]`.
See `cfg.ParseNestedFields`.

[More details...](plugin/action/keep_fields/README.md)
## mask
Expand Down
31 changes: 31 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,37 @@ because the start as a valid JSON matters.
[More details...](plugin/action/json_extract/README.md)
## keep_fields
It keeps the list of the event fields and removes others.
Nested fields supported: list subfield names separated with dot.
Example:
```
fields: ["a.b.f1", "c"]
# event before processing
{
"a":{
"b":{
"f1":1,
"f2":2
}
},
"c":0,
"d":0
}

# event after processing
{
"a":{
"b":{
"f1":1
}
},
"c":0
}

```

NOTE: if `fields` param contains nested fields they will be removed.
For example `fields: ["a.b", "a"]` gives the same result as `fields: ["a"]`.
See `cfg.ParseNestedFields`.

[More details...](plugin/action/keep_fields/README.md)
## mask
Expand Down
31 changes: 31 additions & 0 deletions plugin/action/keep_fields/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# Keep fields plugin
It keeps the list of the event fields and removes others.
Nested fields supported: list subfield names separated with dot.
Example:
```
fields: ["a.b.f1", "c"]
# event before processing
{
"a":{
"b":{
"f1":1,
"f2":2
}
},
"c":0,
"d":0
}

# event after processing
{
"a":{
"b":{
"f1":1
}
},
"c":0
}

```

NOTE: if `fields` param contains nested fields they will be removed.
For example `fields: ["a.b", "a"]` gives the same result as `fields: ["a"]`.
See `cfg.ParseNestedFields`.

### Config params
**`fields`** *`[]string`*
Expand Down
140 changes: 121 additions & 19 deletions plugin/action/keep_fields/keep_fields.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,56 @@
package keep_fields

import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
insaneJSON "github.com/ozontech/insane-json"
"go.uber.org/zap"
)

/*{ introduction
It keeps the list of the event fields and removes others.
Nested fields supported: list subfield names separated with dot.
Example:
```
fields: ["a.b.f1", "c"]
# event before processing
{
"a":{
"b":{
"f1":1,
"f2":2
}
},
"c":0,
"d":0
}

# event after processing
{
"a":{
"b":{
"f1":1
}
},
"c":0
}

```

NOTE: if `fields` param contains nested fields they will be removed.
For example `fields: ["a.b", "a"]` gives the same result as `fields: ["a"]`.
See `cfg.ParseNestedFields`.
}*/

type Plugin struct {
config *Config
fieldsBuf []string
config *Config
logger *zap.Logger

fieldPaths [][]string

parsedFieldsRoot fieldPathNode
fieldsDepthSlice [][]string
}

// ! config-params
Expand All @@ -34,37 +73,100 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
p.config = config.(*Config)
func (p *Plugin) Stop() {
}

func (p *Plugin) Stop() {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.logger = params.Logger.Desugar()

p.config = config.(*Config)
if p.config == nil {
p.logger.Panic("config is nil for the keep fields plugin")
}

var err error
p.fieldPaths, err = cfg.ParseNestedFields(p.config.Fields)
if err != nil {
p.logger.Fatal("can't parse nested fields", zap.Error(err))
}

p.parsedFieldsRoot = newFieldPathNode() // root node

fieldMaxDepth := 0
for _, fieldPath := range p.fieldPaths {
fieldMaxDepth = max(fieldMaxDepth, len(fieldPath))

curNode := p.parsedFieldsRoot
for _, field := range fieldPath {
nextNode, ok := curNode.children[field]
if !ok {
nextNode = newFieldPathNode()
curNode.children[field] = nextNode
}

curNode = nextNode
}
}

// buffer to store fields to delete
p.fieldsDepthSlice = make([][]string, fieldMaxDepth)
for i := 0; i < fieldMaxDepth; i++ {
p.fieldsDepthSlice[i] = make([]string, 0, 100)
}
}

func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
p.fieldsBuf = p.fieldsBuf[:0]

if !event.Root.IsObject() {
return pipeline.ActionPass
}

for _, node := range event.Root.AsFields() {
p.traverseFieldsTree(p.parsedFieldsRoot, event.Root.Node, 0)

return pipeline.ActionPass
}

type fieldPathNode struct {
children map[string]fieldPathNode
}

func newFieldPathNode() fieldPathNode {
return fieldPathNode{
children: make(map[string]fieldPathNode),
}
}

func (p *Plugin) traverseFieldsTree(fpNode fieldPathNode, eventNode *insaneJSON.Node, depth int) bool {
// no child nodes in input path, found target node
if len(fpNode.children) == 0 {
return true
}
// cannot go further, nested target field does not exist
if !eventNode.IsObject() {
return false
}
shouldPreserveNode := false
for _, node := range eventNode.AsFields() {
eventField := node.AsString()
isInList := false
for _, pluginField := range p.config.Fields {
if pluginField == eventField {
isInList = true
break
if childNode, ok := fpNode.children[eventField]; ok {
if len(childNode.children) == 0 {
shouldPreserveNode = true
continue
}
if exists := p.traverseFieldsTree(childNode, eventNode.Dig(eventField), depth+1); exists {
shouldPreserveNode = true
continue
}
}
if !isInList {
p.fieldsBuf = append(p.fieldsBuf, eventField)
p.fieldsDepthSlice[depth] = append(p.fieldsDepthSlice[depth], eventField)
}
if depth == 0 || shouldPreserveNode {
// remove all unnecessary fields from current node, if the current node should be preserved
for _, field := range p.fieldsDepthSlice[depth] {
eventNode.Dig(field).Suicide()
}
}

for _, field := range p.fieldsBuf {
event.Root.Dig(field).Suicide()
}
p.fieldsDepthSlice[depth] = p.fieldsDepthSlice[depth][:0]

return pipeline.ActionPass
return shouldPreserveNode
}
Loading