Skip to content

Commit ba2faa5

Browse files
committed
add sample logger for parse_re2 and discard plugins
1 parent e592906 commit ba2faa5

File tree

11 files changed

+115
-40
lines changed

11 files changed

+115
-40
lines changed

logger/logger.go

+42-18
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,22 @@ package logger
33
import (
44
"os"
55
"strings"
6+
"time"
67

78
"go.uber.org/zap"
89
"go.uber.org/zap/zapcore"
910
)
1011

1112
var Instance *zap.SugaredLogger
13+
var SampleInstance *zap.SugaredLogger
1214
var Level zap.AtomicLevel
1315

14-
const defaultLevel = zap.InfoLevel
16+
const (
17+
defaultLevel = zap.InfoLevel
18+
defaultTick = time.Second
19+
defaultFirst = 10
20+
defaultThereAfter = 100
21+
)
1522

1623
func init() {
1724
var level zapcore.Level
@@ -32,27 +39,44 @@ func init() {
3239

3340
Level = zap.NewAtomicLevelAt(level)
3441

42+
core := zapcore.NewCore(
43+
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
44+
TimeKey: "ts",
45+
LevelKey: "level",
46+
NameKey: "Instance",
47+
CallerKey: "caller",
48+
MessageKey: "message",
49+
StacktraceKey: "stacktrace",
50+
LineEnding: zapcore.DefaultLineEnding,
51+
EncodeLevel: zapcore.LowercaseLevelEncoder,
52+
EncodeTime: zapcore.ISO8601TimeEncoder,
53+
EncodeDuration: zapcore.SecondsDurationEncoder,
54+
EncodeCaller: zapcore.ShortCallerEncoder,
55+
}),
56+
zapcore.AddSync(os.Stdout),
57+
Level,
58+
)
59+
60+
sampleCore := zapcore.NewSamplerWithOptions(
61+
core,
62+
defaultTick,
63+
defaultFirst,
64+
defaultThereAfter,
65+
)
66+
67+
// logger initialization
3568
Instance = zap.New(
36-
zapcore.NewCore(
37-
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
38-
TimeKey: "ts",
39-
LevelKey: "level",
40-
NameKey: "Instance",
41-
CallerKey: "caller",
42-
MessageKey: "message",
43-
StacktraceKey: "stacktrace",
44-
LineEnding: zapcore.DefaultLineEnding,
45-
EncodeLevel: zapcore.LowercaseLevelEncoder,
46-
EncodeTime: zapcore.ISO8601TimeEncoder,
47-
EncodeDuration: zapcore.SecondsDurationEncoder,
48-
EncodeCaller: zapcore.ShortCallerEncoder,
49-
}),
50-
zapcore.AddSync(os.Stdout),
51-
Level,
52-
),
69+
core,
5370
).Sugar().Named("fd")
5471

5572
Instance.Infof("Logger initialized with level: %s", level)
73+
74+
// sample logger initialization
75+
SampleInstance = zap.New(
76+
sampleCore,
77+
).Sugar().Named("fd")
78+
79+
SampleInstance.Infof("SampleLogger initialized with level: %s", level)
5680
}
5781

5882
func Debug(args ...any) {

pipeline/pipeline.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ type Pipeline struct {
9999

100100
// some debugging stuff
101101
logger *zap.SugaredLogger
102+
sampleLogger *zap.SugaredLogger
102103
eventLogEnabled bool
103104
eventLog []string
104105
eventLogMu *sync.Mutex
@@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
143144
pipeline := &Pipeline{
144145
Name: name,
145146
logger: logger.Instance.Named(name),
147+
sampleLogger: logger.SampleInstance.Named(name),
146148
settings: settings,
147149
useSpread: false,
148150
disableStreams: false,
@@ -266,7 +268,7 @@ func (p *Pipeline) Start() {
266268
p.logger.Infof("stating processors, count=%d", len(p.Procs))
267269
for _, processor := range p.Procs {
268270
processor.registerMetrics(p.metricsCtl)
269-
processor.start(p.actionParams, p.logger)
271+
processor.start(p.actionParams, p.logger, p.sampleLogger)
270272
}
271273

272274
p.logger.Infof("starting input plugin %q", p.inputInfo.Type)
@@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() {
571573
proc := p.newProc()
572574
p.Procs = append(p.Procs, proc)
573575
proc.registerMetrics(p.metricsCtl)
574-
proc.start(p.actionParams, p.logger)
576+
proc.start(p.actionParams, p.logger, p.sampleLogger)
575577
}
576578

577579
p.procCount.Swap(to)

pipeline/plugin.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ type PluginDefaultParams struct {
5252

5353
type ActionPluginParams struct {
5454
*PluginDefaultParams
55-
Controller ActionPluginController
56-
Logger *zap.SugaredLogger
55+
Controller ActionPluginController
56+
Logger *zap.SugaredLogger
57+
SampleLogger *zap.SugaredLogger
5758
}
5859

5960
type OutputPluginParams struct {

pipeline/processor.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,14 @@ func NewProcessor(
9494
return processor
9595
}
9696

97-
func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) {
97+
func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) {
9898
for i, action := range p.actions {
9999
actionInfo := p.actionInfos[i]
100100
action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{
101101
PluginDefaultParams: params,
102102
Controller: p,
103103
Logger: logger.Named("action").Named(actionInfo.Type),
104+
SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type),
104105
})
105106
}
106107

plugin/action/discard/README.idoc.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Discard plugin
22
@introduction
33

4-
> No config params
4+
### Config params
5+
@config-params|description

plugin/action/discard/README.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ pipelines:
1313
...
1414
```
1515

16-
> No config params
16+
### Config params
17+
**`is_logging`** *`bool`* *`default=false`*
18+
19+
Field that includes logging (with sampling).
20+
21+
<br>
22+
1723

1824
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/discard/discard.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/ozontech/file.d/fd"
55
"github.com/ozontech/file.d/pipeline"
66
"github.com/ozontech/file.d/plugin"
7+
"go.uber.org/zap"
78
)
89

910
/*{ introduction
@@ -23,10 +24,19 @@ pipelines:
2324
}*/
2425

2526
type Plugin struct {
27+
config *Config
28+
sampleLogger *zap.SugaredLogger
2629
plugin.NoMetricsPlugin
2730
}
2831

29-
type Config struct{}
32+
// ! config-params
33+
// ^ config-params
34+
type Config struct {
35+
// > @3@4@5@6
36+
// >
37+
// > Field that includes logging (with sampling).
38+
IsLogging bool `json:"is_logging" default:"false"` // *
39+
}
3040

3141
func init() {
3242
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
@@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
3949
return &Plugin{}, &Config{}
4050
}
4151

42-
func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
52+
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
53+
p.config = config.(*Config)
54+
p.sampleLogger = params.SampleLogger
4355
}
4456

4557
func (p *Plugin) Stop() {
4658
}
4759

48-
func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult {
60+
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
61+
if p.config.IsLogging {
62+
p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event))
63+
}
4964
return pipeline.ActionDiscard
5065
}

plugin/action/mask/mask_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) {
179179
PipelineName: "test_pipeline",
180180
PipelineSettings: &pipeline.Settings{},
181181
},
182-
Logger: zap.L().Sugar(),
182+
Logger: zap.L().Sugar(),
183+
SampleLogger: zap.L().Sugar(),
183184
})
184185
plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re)
185186

plugin/action/parse_re2/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,11 @@ A prefix to add to decoded object keys.
1414

1515
<br>
1616

17+
**`is_logging`** *`bool`* *`default=false`*
18+
19+
Field that includes logging (with sampling).
20+
21+
<br>
22+
1723

1824
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/parse_re2/parse_re2.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ import (
88
"github.com/ozontech/file.d/pipeline"
99
"github.com/ozontech/file.d/plugin"
1010
insaneJSON "github.com/vitkovskii/insane-json"
11+
"go.uber.org/zap"
1112
)
1213

1314
/*{ introduction
1415
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.
1516
}*/
1617

1718
type Plugin struct {
18-
config *Config
19-
20-
re *regexp.Regexp
19+
config *Config
20+
sampleLogger *zap.SugaredLogger
21+
re *regexp.Regexp
2122
plugin.NoMetricsPlugin
2223
}
2324

@@ -39,6 +40,11 @@ type Config struct {
3940
// >
4041
// > A prefix to add to decoded object keys.
4142
Prefix string `json:"prefix" default:""` // *
43+
44+
// > @3@4@5@6
45+
// >
46+
// > Field that includes logging (with sampling).
47+
IsLogging bool `json:"is_logging" default:"false"` // *
4248
}
4349

4450
func init() {
@@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
5258
return &Plugin{}, &Config{}
5359
}
5460

55-
func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
61+
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
5662
p.config = config.(*Config)
63+
p.sampleLogger = params.SampleLogger
5764

5865
p.re = regexp.MustCompile(p.config.Re2)
5966
}
@@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
7077
sm := p.re.FindSubmatch(jsonNode.AsBytes())
7178

7279
if len(sm) == 0 {
80+
if p.config.IsLogging {
81+
p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event))
82+
}
7383
return pipeline.ActionPass
7484
}
7585

plugin/input/k8s/multiline_action_test.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@ func TestMultilineAction_Do(t *testing.T) {
1717
config := &Config{
1818
SplitEventSize: predictionLookahead * 4,
1919
}
20-
plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{
21-
PipelineSettings: &pipeline.Settings{
22-
MaxEventSize: 20,
20+
plugin.Start(config, &pipeline.ActionPluginParams{
21+
Logger: zap.S(),
22+
SampleLogger: zap.S(),
23+
PluginDefaultParams: &pipeline.PluginDefaultParams{
24+
PipelineSettings: &pipeline.Settings{
25+
MaxEventSize: 20,
26+
},
2327
},
24-
}})
28+
})
2529

2630
item := &metaItem{
2731
nodeName: "node_1",
@@ -93,9 +97,13 @@ func TestMultilineAction_Do_shouldSplit(t *testing.T) {
9397
config := &Config{
9498
SplitEventSize: predictionLookahead * 4,
9599
}
96-
plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{
97-
PipelineSettings: &pipeline.Settings{},
98-
}})
100+
plugin.Start(config, &pipeline.ActionPluginParams{
101+
Logger: zap.S(),
102+
SampleLogger: zap.S(),
103+
PluginDefaultParams: &pipeline.PluginDefaultParams{
104+
PipelineSettings: &pipeline.Settings{},
105+
},
106+
})
99107

100108
item := &metaItem{
101109
nodeName: "node_1",

0 commit comments

Comments
 (0)