From e02c4a602afe18243afcdc6600b9e33af2bd53fb Mon Sep 17 00:00:00 2001 From: Yaroslav Kirillov Date: Fri, 4 Oct 2024 17:51:23 +0500 Subject: [PATCH] Antispam by sourcename (#682) * check antispam exceptions by source name * add test --- fd/util.go | 10 +-- pipeline/antispam/antispammer.go | 23 +++++- pipeline/antispam/antispammer_test.go | 114 ++++++++++++++++++++++---- pipeline/pipeline.go | 3 +- 4 files changed, 124 insertions(+), 26 deletions(-) diff --git a/fd/util.go b/fd/util.go index 1ba56b8f7..41159c721 100644 --- a/fd/util.go +++ b/fd/util.go @@ -9,9 +9,9 @@ import ( "github.com/bitly/go-simplejson" "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/cfg/matchrule" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/pipeline/antispam" "github.com/ozontech/file.d/pipeline/doif" ) @@ -19,7 +19,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { capacity := pipeline.DefaultCapacity antispamThreshold := 0 antispamField := "" - var antispamExceptions matchrule.RuleSets + var antispamExceptions antispam.Exceptions avgInputEventSize := pipeline.DefaultAvgInputEventSize maxInputEventSize := pipeline.DefaultMaxInputEventSize streamField := pipeline.DefaultStreamField @@ -86,7 +86,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { antispamField = settings.Get("antispam_field").MustString() var err error - antispamExceptions, err = extractExceptions(settings) + antispamExceptions, err = extractAntispamExceptions(settings) if err != nil { logger.Fatalf("extract exceptions: %s", err) } @@ -121,7 +121,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { } } -func extractExceptions(settings *simplejson.Json) (matchrule.RuleSets, error) { +func extractAntispamExceptions(settings *simplejson.Json) (antispam.Exceptions, error) { raw, err := settings.Get("antispam_exceptions").MarshalJSON() if err != nil { return nil, err @@ -130,7 +130,7 @@ func extractExceptions(settings *simplejson.Json) (matchrule.RuleSets, error) { dec := json.NewDecoder(bytes.NewReader(raw)) dec.DisallowUnknownFields() - var exceptions matchrule.RuleSets + var exceptions antispam.Exceptions if err := dec.Decode(&exceptions); err != nil { return nil, err } diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go index d0eb7f680..1127a240e 100644 --- a/pipeline/antispam/antispammer.go +++ b/pipeline/antispam/antispammer.go @@ -23,7 +23,7 @@ type Antispammer struct { maintenanceInterval time.Duration mu sync.RWMutex sources map[any]source - exceptions matchrule.RuleSets + exceptions Exceptions logger *zap.Logger @@ -44,7 +44,7 @@ type Options struct { Threshold int Field string UnbanIterations int - Exceptions matchrule.RuleSets + Exceptions Exceptions Logger *zap.Logger MetricsController *metric.Ctl @@ -90,7 +90,11 @@ func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte for i := 0; i < len(a.exceptions); i++ { e := &a.exceptions[i] - if e.Match(event) { + checkData := event + if e.CheckSourceName { + checkData = []byte(name) + } + if e.Match(checkData) { if e.Name != "" { a.exceptionMetric.WithLabelValues(e.Name).Inc() } @@ -204,3 +208,16 @@ func (a *Antispammer) Dump() string { return out } + +type Exception struct { + matchrule.RuleSet + CheckSourceName bool `json:"check_source_name"` +} + +type Exceptions []Exception + +func (e Exceptions) Prepare() { + for i := range e { + e[i].Prepare() + } +} diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go index 5e82097fd..34b64572d 100644 --- a/pipeline/antispam/antispammer_test.go +++ b/pipeline/antispam/antispammer_test.go @@ -4,20 +4,17 @@ import ( "testing" "time" + "github.com/ozontech/file.d/cfg/matchrule" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) -func TestAntispam(t *testing.T) { - r := require.New(t) - - threshold := 5 - unbanIterations := 2 - maintenanceInterval := time.Second * 1 +func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Duration) *Antispammer { holder := metric.NewHolder(time.Minute) - antispamer := NewAntispammer(&Options{ + return NewAntispammer(&Options{ MaintenanceInterval: maintenanceInterval, Threshold: threshold, UnbanIterations: unbanIterations, @@ -25,6 +22,16 @@ func TestAntispam(t *testing.T) { MetricsController: metric.NewCtl("test", prometheus.NewRegistry()), MetricHolder: holder, }) +} + +func TestAntispam(t *testing.T) { + r := require.New(t) + + threshold := 5 + unbanIterations := 2 + maintenanceInterval := time.Second * 1 + + antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval) startTime := time.Now() checkSpam := func(i int) bool { @@ -53,15 +60,8 @@ func TestAntispamAfterRestart(t *testing.T) { threshold := 5 unbanIterations := 2 maintenanceInterval := time.Second * 1 - holder := metric.NewHolder(time.Minute) - antispamer := NewAntispammer(&Options{ - MaintenanceInterval: maintenanceInterval, - Threshold: threshold, - UnbanIterations: unbanIterations, - Logger: logger.Instance.Named("antispam").Desugar(), - MetricsController: metric.NewCtl("test", prometheus.NewRegistry()), - MetricHolder: holder, - }) + + antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval) startTime := time.Now() checkSpam := func(i int) bool { @@ -77,3 +77,85 @@ func TestAntispamAfterRestart(t *testing.T) { result := checkSpam(threshold) r.False(result) } + +func TestAntispamExceptions(t *testing.T) { + r := require.New(t) + now := time.Now() + + threshold := 1 + unbanIterations := 2 + maintenanceInterval := time.Second * 1 + + antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval) + + eventRulesetName := "test_event" + sourceRulesetName := "test_sourcename" + + antispamer.exceptions = Exceptions{ + { + RuleSet: matchrule.RuleSet{ + Name: eventRulesetName, + Cond: matchrule.CondOr, + Rules: []matchrule.Rule{ + { + Mode: matchrule.ModePrefix, + Values: []string{ + `{"level":"debug"`, + `{"level":"info"`, + }, + }, + { + Mode: matchrule.ModeContains, + Values: []string{"test_event"}, + }, + }, + }, + }, + { + CheckSourceName: true, + RuleSet: matchrule.RuleSet{ + Name: sourceRulesetName, + Cond: matchrule.CondAnd, + Rules: []matchrule.Rule{ + { + Mode: matchrule.ModeContains, + Values: []string{"my_source1", "my_source2"}, + }, + }, + }, + }, + } + antispamer.exceptions.Prepare() + + checkSpam := func(source, event string, wantMetric map[string]float64) { + antispamer.IsSpam(1, source, true, []byte(event), now) + for k, v := range wantMetric { + r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k))) + } + } + + checkSpam("test", `{"level":"info","message":test"}`, map[string]float64{ + eventRulesetName: 1, + sourceRulesetName: 0, + }) + + checkSpam("test", `{"level":"error","message":test_event123"}`, map[string]float64{ + eventRulesetName: 2, + sourceRulesetName: 0, + }) + + checkSpam("my_source2", `{"level":"error","message":test"}`, map[string]float64{ + eventRulesetName: 2, + sourceRulesetName: 1, + }) + + checkSpam("my_source1", `{"level":"debug","message":test"}`, map[string]float64{ + eventRulesetName: 3, + sourceRulesetName: 1, + }) + + checkSpam("test", `{"level":"error","message":test"}`, map[string]float64{ + eventRulesetName: 3, + sourceRulesetName: 1, + }) +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 62a675308..bf377c4b4 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/ozontech/file.d/cfg/matchrule" "github.com/ozontech/file.d/decoder" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/metric" @@ -135,7 +134,7 @@ type Settings struct { EventTimeout time.Duration AntispamThreshold int AntispamField string - AntispamExceptions matchrule.RuleSets + AntispamExceptions antispam.Exceptions AvgEventSize int MaxEventSize int StreamField string