From fe295f9b313839b1e29215e45799e14ab7d8f3c0 Mon Sep 17 00:00:00 2001 From: goshansmails <153861063+goshansmails@users.noreply.github.com> Date: Tue, 4 Feb 2025 19:24:15 +0300 Subject: [PATCH] Don't set default throttling limits in Redis (#738) * Don't set defaults --------- Co-authored-by: george pogosyan --- plugin/action/throttle/distribution.go | 6 -- plugin/action/throttle/limiters_map.go | 1 - plugin/action/throttle/redis_limiter.go | 45 ++++------- plugin/action/throttle/redis_limiter_test.go | 79 -------------------- plugin/action/throttle/rule.go | 20 +++-- plugin/action/throttle/throttle.go | 2 - 6 files changed, 22 insertions(+), 131 deletions(-) diff --git a/plugin/action/throttle/distribution.go b/plugin/action/throttle/distribution.go index 397034b92..2767c1ed0 100644 --- a/plugin/action/throttle/distribution.go +++ b/plugin/action/throttle/distribution.go @@ -1,7 +1,6 @@ package throttle import ( - "encoding/json" "errors" "fmt" "math" @@ -27,11 +26,6 @@ type limitDistributionCfg struct { Enabled bool `json:"enabled"` } -func (c *limitDistributionCfg) marshalJson() []byte { - v, _ := json.Marshal(c) - return v -} - func (c *limitDistributionCfg) isEmpty() bool { return c.Field == "" || len(c.Ratios) == 0 } diff --git a/plugin/action/throttle/limiters_map.go b/plugin/action/throttle/limiters_map.go index 85da3b927..951149faf 100644 --- a/plugin/action/throttle/limiters_map.go +++ b/plugin/action/throttle/limiters_map.go @@ -232,7 +232,6 @@ func (l *limitersMap) newLimiter(throttleKey, keyLimitOverride string, rule *rul l.limiterCfg, throttleKey, keyLimitOverride, &rule.limit, - rule.distributionCfg, l.limitDistrMetrics, l.nowFn, ) diff --git a/plugin/action/throttle/redis_limiter.go b/plugin/action/throttle/redis_limiter.go index 853d8dfd1..2406cd994 100644 --- a/plugin/action/throttle/redis_limiter.go +++ b/plugin/action/throttle/redis_limiter.go @@ -3,11 +3,13 @@ package throttle import ( "bytes" "encoding/json" + "errors" "fmt" "strconv" "strings" "time" + "github.com/go-redis/redis" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" ) @@ -45,9 +47,6 @@ type redisLimiter struct { valField string // json field with distribution value distributionField string - - // limit default value to set if limit key does not exist in redis - defaultVal string } // newRedisLimiter return instance of redis limiter. @@ -55,7 +54,6 @@ func newRedisLimiter( cfg *limiterConfig, throttleFieldValue, keyLimitOverride string, limit *complexLimit, - distributionCfg []byte, limitDistrMetrics *limitDistributionMetrics, nowFn func() time.Time, ) *redisLimiter { @@ -84,21 +82,6 @@ func newRedisLimiter( } else { rl.keyLimit = keyLimitOverride } - if rl.valField == "" { - rl.defaultVal = strconv.FormatInt(limit.value, 10) - } else { - // no err check since valField is string - valKey, _ := json.Marshal(rl.valField) - if limit.distributions.size() > 0 && rl.distributionField != "" { - distrKey, _ := json.Marshal(rl.distributionField) - rl.defaultVal = fmt.Sprintf("{%s:%v,%s:%s}", - valKey, limit.value, - distrKey, distributionCfg, - ) - } else { - rl.defaultVal = fmt.Sprintf("{%s:%v}", valKey, limit.value) - } - } return rl } @@ -253,28 +236,26 @@ func decodeKeyLimitValue(data []byte, valField, distrField string) (int64, limit // updateKeyLimit reads key limit from redis and updates current limit. func (l *redisLimiter) updateKeyLimit() error { - var b bool var err error var limitVal int64 var distrVal limitDistributionCfg - // try to set global limit to default - if b, err = l.redis.SetNX(l.keyLimit, l.defaultVal, 0).Result(); err != nil { - return fmt.Errorf("failed to set redis value by key %q: %w", l.keyLimit, err) - } else if b { + var data []byte + + data, err = l.redis.Get(l.keyLimit).Bytes() + if errors.Is(err, redis.Nil) { return nil } - // global limit already exists - overwrite local limit - v := l.redis.Get(l.keyLimit) + + if err != nil { + return err + } + if l.valField != "" { - var jsonData []byte - if jsonData, err = v.Bytes(); err != nil { - return fmt.Errorf("failed to convert redis value to bytes: %w", err) - } - if limitVal, distrVal, err = decodeKeyLimitValue(jsonData, l.valField, l.distributionField); err != nil { + if limitVal, distrVal, err = decodeKeyLimitValue(data, l.valField, l.distributionField); err != nil { return fmt.Errorf("failed to decode redis json value: %w", err) } } else { - if limitVal, err = v.Int64(); err != nil { + if limitVal, err = strconv.ParseInt(string(data), 10, 64); err != nil { return fmt.Errorf("failed to convert redis value to int64: %w", err) } } diff --git a/plugin/action/throttle/redis_limiter_test.go b/plugin/action/throttle/redis_limiter_test.go index 5dc67387a..642626b4f 100644 --- a/plugin/action/throttle/redis_limiter_test.go +++ b/plugin/action/throttle/redis_limiter_test.go @@ -2,7 +2,6 @@ package throttle import ( "context" - "fmt" "strings" "testing" "time" @@ -39,16 +38,12 @@ func Test_updateKeyLimit(t *testing.T) { }, Enabled: true, } - defaultDistributionJson := defaultDistribution.marshalJson() ld, _ := parseLimitDistribution(defaultDistribution, 10) defaultLimitWithDistribution := &complexLimit{ value: 10, kind: limitKindCount, distributions: ld, } - pod1LimitKey := strings.Join([]string{ - pipelineName, throttleFieldName, throttleFieldValue1, keySuffix, - }, "_") pod2LimitKey := strings.Join([]string{ pipelineName, throttleFieldName, throttleFieldValue2, keySuffix, }, "_") @@ -120,79 +115,6 @@ func Test_updateKeyLimit(t *testing.T) { wantRedis *redisData wantErr bool }{ - { - name: "set_default_limit", - args: args{ - client: client, - defaultLimit: defaultLimit, - throttleFieldValue: throttleFieldValue1, - keyLimitOverride: "", - valField: "", - }, - wantRedis: &redisData{ - key: pod1LimitKey, - value: "1", - }, - }, - { - name: "set_default_limit_custom_field", - args: args{ - client: client, - defaultLimit: defaultLimit, - throttleFieldValue: throttleFieldValue1, - keyLimitOverride: "default_limit", - valField: "custom_limit_field", - }, - wantRedis: &redisData{ - key: "default_limit", - value: `{"custom_limit_field":1}`, - }, - }, - { - name: "set_default_distribution", - args: args{ - client: client, - defaultLimit: defaultLimitWithDistribution, - throttleFieldValue: throttleFieldValue1, - keyLimitOverride: "default_distr1", - valField: "custom_limit_field", - distrField: "custom_distr_field", - }, - wantRedis: &redisData{ - key: "default_distr1", - value: fmt.Sprintf(`{"custom_limit_field":10,"custom_distr_field":%s}`, defaultDistributionJson), - }, - }, - { - name: "set_default_without_distributions", - args: args{ - client: client, - defaultLimit: defaultLimit, - throttleFieldValue: throttleFieldValue1, - keyLimitOverride: "default_distr2", - valField: "custom_limit_field", - distrField: "custom_distr_field", - }, - wantRedis: &redisData{ - key: "default_distr2", - value: `{"custom_limit_field":1}`, - }, - }, - { - name: "set_default_without_distr_field", - args: args{ - client: client, - defaultLimit: defaultLimitWithDistribution, - throttleFieldValue: throttleFieldValue1, - keyLimitOverride: "default_distr3", - valField: "custom_limit_field", - distrField: "", - }, - wantRedis: &redisData{ - key: "default_distr3", - value: `{"custom_limit_field":10}`, - }, - }, { name: "get_limit_from_default_key", args: args{ @@ -410,7 +332,6 @@ func Test_updateKeyLimit(t *testing.T) { tt.args.throttleFieldValue, tt.args.keyLimitOverride, tt.args.defaultLimit, - defaultDistributionJson, &limitDistributionMetrics{ EventsCount: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_count", "")), EventsSize: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_size", "")), diff --git a/plugin/action/throttle/rule.go b/plugin/action/throttle/rule.go index ce502c1eb..de9dfb7a6 100644 --- a/plugin/action/throttle/rule.go +++ b/plugin/action/throttle/rule.go @@ -13,15 +13,14 @@ type complexLimit struct { } type rule struct { - fields []string // sorted list of used keys is used for combining limiter key. - values []string // values to check against. order is the same as for keys. - limit complexLimit - distributionCfg []byte // json-encoded limit distribution cfg - byteIdxPart []byte + fields []string // sorted list of used keys is used for combining limiter key. + values []string // values to check against. order is the same as for keys. + limit complexLimit + byteIdxPart []byte } // newRule returns new rule instance. -func newRule(conditions map[string]string, limit complexLimit, distributionCfg []byte, ruleNum int) *rule { // nolint: gocritic // hugeParam is ok here +func newRule(conditions map[string]string, limit complexLimit, ruleNum int) *rule { // nolint: gocritic // hugeParam is ok here var ( keys = make([]string, 0, len(conditions)) values = make([]string, len(conditions)) @@ -39,11 +38,10 @@ func newRule(conditions map[string]string, limit complexLimit, distributionCfg [ byteIdxPart := []byte{byte('a' + ruleNum), ':'} return &rule{ - fields: keys, - values: values, - limit: limit, - byteIdxPart: byteIdxPart, - distributionCfg: distributionCfg, + fields: keys, + values: values, + limit: limit, + byteIdxPart: byteIdxPart, } } diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go index ffac562ab..8191e550e 100644 --- a/plugin/action/throttle/throttle.go +++ b/plugin/action/throttle/throttle.go @@ -373,7 +373,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.rules = append(p.rules, newRule( r.Conditions, complexLimit{r.Limit, r.LimitKind, ldRule}, - ruleDistrCfg.marshalJson(), i, )) } @@ -381,7 +380,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.rules = append(p.rules, newRule( map[string]string{}, complexLimit{p.config.DefaultLimit, p.config.LimitKind, ld}, - distrCfg.marshalJson(), len(p.config.Rules), )) }