Skip to content

Commit cd27594

Browse files
committed
add sample logger for parse_re2, discard, throttle plugins
1 parent e592906 commit cd27594

File tree

13 files changed

+137
-61
lines changed

13 files changed

+137
-61
lines changed

logger/logger.go

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,22 @@ package logger
33
import (
44
"os"
55
"strings"
6+
"time"
67

78
"go.uber.org/zap"
89
"go.uber.org/zap/zapcore"
910
)
1011

1112
var Instance *zap.SugaredLogger
13+
var SampleInstance *zap.SugaredLogger
1214
var Level zap.AtomicLevel
1315

14-
const defaultLevel = zap.InfoLevel
16+
const (
17+
defaultLevel = zap.InfoLevel
18+
defaultTick = time.Second
19+
defaultFirst = 10
20+
defaultThereAfter = 100
21+
)
1522

1623
func init() {
1724
var level zapcore.Level
@@ -32,27 +39,42 @@ func init() {
3239

3340
Level = zap.NewAtomicLevelAt(level)
3441

35-
Instance = zap.New(
36-
zapcore.NewCore(
37-
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
38-
TimeKey: "ts",
39-
LevelKey: "level",
40-
NameKey: "Instance",
41-
CallerKey: "caller",
42-
MessageKey: "message",
43-
StacktraceKey: "stacktrace",
44-
LineEnding: zapcore.DefaultLineEnding,
45-
EncodeLevel: zapcore.LowercaseLevelEncoder,
46-
EncodeTime: zapcore.ISO8601TimeEncoder,
47-
EncodeDuration: zapcore.SecondsDurationEncoder,
48-
EncodeCaller: zapcore.ShortCallerEncoder,
49-
}),
50-
zapcore.AddSync(os.Stdout),
51-
Level,
52-
),
53-
).Sugar().Named("fd")
54-
55-
Instance.Infof("Logger initialized with level: %s", level)
42+
core := zapcore.NewCore(
43+
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
44+
TimeKey: "ts",
45+
LevelKey: "level",
46+
NameKey: "Instance",
47+
CallerKey: "caller",
48+
MessageKey: "message",
49+
StacktraceKey: "stacktrace",
50+
LineEnding: zapcore.DefaultLineEnding,
51+
EncodeLevel: zapcore.LowercaseLevelEncoder,
52+
EncodeTime: zapcore.ISO8601TimeEncoder,
53+
EncodeDuration: zapcore.SecondsDurationEncoder,
54+
EncodeCaller: zapcore.ShortCallerEncoder,
55+
}),
56+
zapcore.AddSync(os.Stdout),
57+
Level,
58+
)
59+
60+
sampleCore := zapcore.NewSamplerWithOptions(
61+
core,
62+
defaultTick,
63+
defaultFirst,
64+
defaultThereAfter,
65+
)
66+
67+
// logger initialization
68+
Instance = NewLogger(core)
69+
Instance.Infof("Logger initialized with level=%s", level)
70+
71+
// sample logger initialization
72+
SampleInstance = NewLogger(sampleCore)
73+
SampleInstance.Infof("SampleLogger initialized with level=%s", level)
74+
}
75+
76+
func NewLogger(core zapcore.Core) *zap.SugaredLogger {
77+
return zap.New(core).Sugar().Named("fd")
5678
}
5779

5880
func Debug(args ...any) {

pipeline/pipeline.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ type Pipeline struct {
9999

100100
// some debugging stuff
101101
logger *zap.SugaredLogger
102+
sampleLogger *zap.SugaredLogger
102103
eventLogEnabled bool
103104
eventLog []string
104105
eventLogMu *sync.Mutex
@@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
143144
pipeline := &Pipeline{
144145
Name: name,
145146
logger: logger.Instance.Named(name),
147+
sampleLogger: logger.SampleInstance.Named(name),
146148
settings: settings,
147149
useSpread: false,
148150
disableStreams: false,
@@ -266,7 +268,7 @@ func (p *Pipeline) Start() {
266268
p.logger.Infof("stating processors, count=%d", len(p.Procs))
267269
for _, processor := range p.Procs {
268270
processor.registerMetrics(p.metricsCtl)
269-
processor.start(p.actionParams, p.logger)
271+
processor.start(p.actionParams, p.logger, p.sampleLogger)
270272
}
271273

272274
p.logger.Infof("starting input plugin %q", p.inputInfo.Type)
@@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() {
571573
proc := p.newProc()
572574
p.Procs = append(p.Procs, proc)
573575
proc.registerMetrics(p.metricsCtl)
574-
proc.start(p.actionParams, p.logger)
576+
proc.start(p.actionParams, p.logger, p.sampleLogger)
575577
}
576578

577579
p.procCount.Swap(to)

pipeline/plugin.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ type PluginDefaultParams struct {
5252

5353
type ActionPluginParams struct {
5454
*PluginDefaultParams
55-
Controller ActionPluginController
56-
Logger *zap.SugaredLogger
55+
Controller ActionPluginController
56+
Logger *zap.SugaredLogger
57+
SampleLogger *zap.SugaredLogger
5758
}
5859

5960
type OutputPluginParams struct {

pipeline/processor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,14 @@ func NewProcessor(
9494
return processor
9595
}
9696

97-
func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) {
97+
func (p *processor) start(params *PluginDefaultParams, defaultLogger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) {
9898
for i, action := range p.actions {
9999
actionInfo := p.actionInfos[i]
100100
action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{
101101
PluginDefaultParams: params,
102102
Controller: p,
103-
Logger: logger.Named("action").Named(actionInfo.Type),
103+
Logger: defaultLogger.Named("action").Named(actionInfo.Type),
104+
SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type),
104105
})
105106
}
106107

plugin/action/discard/README.idoc.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Discard plugin
22
@introduction
33

4-
> No config params
4+
### Config params
5+
@config-params|description

plugin/action/discard/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ pipelines:
1313
...
1414
```
1515

16-
> No config params
16+
### Config params
17+
**`is_logging`** *`bool`* *`default=false`*
18+
19+
Field that includes logging (with sampling).
20+
21+
<br>
22+
1723

1824
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/discard/discard.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/ozontech/file.d/fd"
55
"github.com/ozontech/file.d/pipeline"
66
"github.com/ozontech/file.d/plugin"
7+
"go.uber.org/zap"
78
)
89

910
/*{ introduction
@@ -23,10 +24,19 @@ pipelines:
2324
}*/
2425

2526
type Plugin struct {
27+
config *Config
28+
sampleLogger *zap.SugaredLogger
2629
plugin.NoMetricsPlugin
2730
}
2831

29-
type Config struct{}
32+
// ! config-params
33+
// ^ config-params
34+
type Config struct {
35+
// > @3@4@5@6
36+
// >
37+
// > Field that includes logging (with sampling).
38+
IsLogging bool `json:"is_logging" default:"false"` // *
39+
}
3040

3141
func init() {
3242
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
@@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
3949
return &Plugin{}, &Config{}
4050
}
4151

42-
func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
52+
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
53+
p.config = config.(*Config)
54+
p.sampleLogger = params.SampleLogger
4355
}
4456

4557
func (p *Plugin) Stop() {
4658
}
4759

48-
func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult {
60+
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
61+
if p.config.IsLogging {
62+
p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event))
63+
}
4964
return pipeline.ActionDiscard
5065
}

plugin/action/discard/discard_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestDiscardAnd(t *testing.T) {
2222
},
2323
}
2424

25-
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false))
25+
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, false))
2626

2727
wg := &sync.WaitGroup{}
2828
wg.Add(10)
@@ -66,7 +66,7 @@ func TestDiscardOr(t *testing.T) {
6666
},
6767
}
6868

69-
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
69+
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))
7070

7171
wg := &sync.WaitGroup{}
7272
wg.Add(8)
@@ -110,7 +110,7 @@ func TestDiscardRegex(t *testing.T) {
110110
},
111111
}
112112

113-
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
113+
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))
114114

115115
wg := &sync.WaitGroup{}
116116
wg.Add(11)
@@ -151,7 +151,7 @@ func TestDiscardMatchInvert(t *testing.T) {
151151
},
152152
}
153153

154-
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true))
154+
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, true))
155155

156156
wg := &sync.WaitGroup{}
157157
wg.Add(9)

plugin/action/mask/mask_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) {
179179
PipelineName: "test_pipeline",
180180
PipelineSettings: &pipeline.Settings{},
181181
},
182-
Logger: zap.L().Sugar(),
182+
Logger: zap.L().Sugar(),
183+
SampleLogger: zap.L().Sugar(),
183184
})
184185
plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re)
185186

plugin/action/parse_re2/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,11 @@ A prefix to add to decoded object keys.
1414

1515
<br>
1616

17+
**`is_logging`** *`bool`* *`default=false`*
18+
19+
Field that includes logging (with sampling).
20+
21+
<br>
22+
1723

1824
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/parse_re2/parse_re2.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ import (
88
"github.com/ozontech/file.d/pipeline"
99
"github.com/ozontech/file.d/plugin"
1010
insaneJSON "github.com/vitkovskii/insane-json"
11+
"go.uber.org/zap"
1112
)
1213

1314
/*{ introduction
1415
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.
1516
}*/
1617

1718
type Plugin struct {
18-
config *Config
19-
20-
re *regexp.Regexp
19+
config *Config
20+
sampleLogger *zap.SugaredLogger
21+
re *regexp.Regexp
2122
plugin.NoMetricsPlugin
2223
}
2324

@@ -39,6 +40,11 @@ type Config struct {
3940
// >
4041
// > A prefix to add to decoded object keys.
4142
Prefix string `json:"prefix" default:""` // *
43+
44+
// > @3@4@5@6
45+
// >
46+
// > Field that includes logging (with sampling).
47+
IsLogging bool `json:"is_logging" default:"false"` // *
4248
}
4349

4450
func init() {
@@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
5258
return &Plugin{}, &Config{}
5359
}
5460

55-
func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
61+
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
5662
p.config = config.(*Config)
63+
p.sampleLogger = params.SampleLogger
5764

5865
p.re = regexp.MustCompile(p.config.Re2)
5966
}
@@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
7077
sm := p.re.FindSubmatch(jsonNode.AsBytes())
7178

7279
if len(sm) == 0 {
80+
if p.config.IsLogging {
81+
p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event))
82+
}
7383
return pipeline.ActionPass
7484
}
7585

plugin/action/throttle/throttle.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,14 @@ It discards the events if pipeline throughput gets higher than a configured thre
4545
}*/
4646

4747
type Plugin struct {
48-
ctx context.Context
49-
cancel context.CancelFunc
50-
logger *zap.SugaredLogger
51-
config *Config
52-
pipeline string
53-
format string
54-
redisClient redisClient
48+
ctx context.Context
49+
cancel context.CancelFunc
50+
logger *zap.SugaredLogger
51+
sampleLogger *zap.SugaredLogger
52+
config *Config
53+
pipeline string
54+
format string
55+
redisClient redisClient
5556

5657
limiterBuf []byte
5758
rules []*rule
@@ -185,6 +186,7 @@ func (p *Plugin) syncWorker(ctx context.Context, jobCh <-chan limiter, wg *sync.
185186
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
186187
p.config = config.(*Config)
187188
p.logger = params.Logger
189+
p.sampleLogger = params.SampleLogger
188190
p.pipeline = params.PipelineName
189191
p.limiterBuf = make([]byte, 0)
190192
ctx, cancel := context.WithCancel(context.Background())
@@ -303,11 +305,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool {
303305

304306
if len(p.config.TimeField_) != 0 {
305307
tsValue := event.Root.Dig(p.config.TimeField_...).AsString()
306-
t, err := time.Parse(p.format, tsValue)
307-
if err != nil || ts.IsZero() {
308-
p.logger.Warnf("can't parse field %q using format %s: %s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
309-
} else {
310-
ts = t
308+
if tsValue != "" {
309+
if t, err := time.Parse(p.format, tsValue); err != nil || ts.IsZero() {
310+
p.sampleLogger.Warnf("can't parse field=%q using format=%s time=%s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
311+
} else {
312+
ts = t
313+
}
311314
}
312315
}
313316

0 commit comments

Comments
 (0)