Skip to content

Commit

Permalink
Don't set default throttling limits in Redis (#738)
Browse files Browse the repository at this point in the history
* Don't set defaults

---------

Co-authored-by: george pogosyan <[email protected]>
  • Loading branch information
goshansmails and george pogosyan authored Feb 4, 2025
1 parent 20ce0b1 commit fe295f9
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 131 deletions.
6 changes: 0 additions & 6 deletions plugin/action/throttle/distribution.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package throttle

import (
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -27,11 +26,6 @@ type limitDistributionCfg struct {
Enabled bool `json:"enabled"`
}

func (c *limitDistributionCfg) marshalJson() []byte {
v, _ := json.Marshal(c)
return v
}

func (c *limitDistributionCfg) isEmpty() bool {
return c.Field == "" || len(c.Ratios) == 0
}
Expand Down
1 change: 0 additions & 1 deletion plugin/action/throttle/limiters_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func (l *limitersMap) newLimiter(throttleKey, keyLimitOverride string, rule *rul
l.limiterCfg,
throttleKey, keyLimitOverride,
&rule.limit,
rule.distributionCfg,
l.limitDistrMetrics,
l.nowFn,
)
Expand Down
45 changes: 13 additions & 32 deletions plugin/action/throttle/redis_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package throttle
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-redis/redis"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
)
Expand Down Expand Up @@ -45,17 +47,13 @@ type redisLimiter struct {
valField string
// json field with distribution value
distributionField string

// limit default value to set if limit key does not exist in redis
defaultVal string
}

// newRedisLimiter return instance of redis limiter.
func newRedisLimiter(
cfg *limiterConfig,
throttleFieldValue, keyLimitOverride string,
limit *complexLimit,
distributionCfg []byte,
limitDistrMetrics *limitDistributionMetrics,
nowFn func() time.Time,
) *redisLimiter {
Expand Down Expand Up @@ -84,21 +82,6 @@ func newRedisLimiter(
} else {
rl.keyLimit = keyLimitOverride
}
if rl.valField == "" {
rl.defaultVal = strconv.FormatInt(limit.value, 10)
} else {
// no err check since valField is string
valKey, _ := json.Marshal(rl.valField)
if limit.distributions.size() > 0 && rl.distributionField != "" {
distrKey, _ := json.Marshal(rl.distributionField)
rl.defaultVal = fmt.Sprintf("{%s:%v,%s:%s}",
valKey, limit.value,
distrKey, distributionCfg,
)
} else {
rl.defaultVal = fmt.Sprintf("{%s:%v}", valKey, limit.value)
}
}

return rl
}
Expand Down Expand Up @@ -253,28 +236,26 @@ func decodeKeyLimitValue(data []byte, valField, distrField string) (int64, limit

// updateKeyLimit reads key limit from redis and updates current limit.
func (l *redisLimiter) updateKeyLimit() error {
var b bool
var err error
var limitVal int64
var distrVal limitDistributionCfg
// try to set global limit to default
if b, err = l.redis.SetNX(l.keyLimit, l.defaultVal, 0).Result(); err != nil {
return fmt.Errorf("failed to set redis value by key %q: %w", l.keyLimit, err)
} else if b {
var data []byte

data, err = l.redis.Get(l.keyLimit).Bytes()
if errors.Is(err, redis.Nil) {
return nil
}
// global limit already exists - overwrite local limit
v := l.redis.Get(l.keyLimit)

if err != nil {
return err
}

if l.valField != "" {
var jsonData []byte
if jsonData, err = v.Bytes(); err != nil {
return fmt.Errorf("failed to convert redis value to bytes: %w", err)
}
if limitVal, distrVal, err = decodeKeyLimitValue(jsonData, l.valField, l.distributionField); err != nil {
if limitVal, distrVal, err = decodeKeyLimitValue(data, l.valField, l.distributionField); err != nil {
return fmt.Errorf("failed to decode redis json value: %w", err)
}
} else {
if limitVal, err = v.Int64(); err != nil {
if limitVal, err = strconv.ParseInt(string(data), 10, 64); err != nil {
return fmt.Errorf("failed to convert redis value to int64: %w", err)
}
}
Expand Down
79 changes: 0 additions & 79 deletions plugin/action/throttle/redis_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package throttle

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -39,16 +38,12 @@ func Test_updateKeyLimit(t *testing.T) {
},
Enabled: true,
}
defaultDistributionJson := defaultDistribution.marshalJson()
ld, _ := parseLimitDistribution(defaultDistribution, 10)
defaultLimitWithDistribution := &complexLimit{
value: 10,
kind: limitKindCount,
distributions: ld,
}
pod1LimitKey := strings.Join([]string{
pipelineName, throttleFieldName, throttleFieldValue1, keySuffix,
}, "_")
pod2LimitKey := strings.Join([]string{
pipelineName, throttleFieldName, throttleFieldValue2, keySuffix,
}, "_")
Expand Down Expand Up @@ -120,79 +115,6 @@ func Test_updateKeyLimit(t *testing.T) {
wantRedis *redisData
wantErr bool
}{
{
name: "set_default_limit",
args: args{
client: client,
defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "",
valField: "",
},
wantRedis: &redisData{
key: pod1LimitKey,
value: "1",
},
},
{
name: "set_default_limit_custom_field",
args: args{
client: client,
defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "default_limit",
valField: "custom_limit_field",
},
wantRedis: &redisData{
key: "default_limit",
value: `{"custom_limit_field":1}`,
},
},
{
name: "set_default_distribution",
args: args{
client: client,
defaultLimit: defaultLimitWithDistribution,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "default_distr1",
valField: "custom_limit_field",
distrField: "custom_distr_field",
},
wantRedis: &redisData{
key: "default_distr1",
value: fmt.Sprintf(`{"custom_limit_field":10,"custom_distr_field":%s}`, defaultDistributionJson),
},
},
{
name: "set_default_without_distributions",
args: args{
client: client,
defaultLimit: defaultLimit,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "default_distr2",
valField: "custom_limit_field",
distrField: "custom_distr_field",
},
wantRedis: &redisData{
key: "default_distr2",
value: `{"custom_limit_field":1}`,
},
},
{
name: "set_default_without_distr_field",
args: args{
client: client,
defaultLimit: defaultLimitWithDistribution,
throttleFieldValue: throttleFieldValue1,
keyLimitOverride: "default_distr3",
valField: "custom_limit_field",
distrField: "",
},
wantRedis: &redisData{
key: "default_distr3",
value: `{"custom_limit_field":10}`,
},
},
{
name: "get_limit_from_default_key",
args: args{
Expand Down Expand Up @@ -410,7 +332,6 @@ func Test_updateKeyLimit(t *testing.T) {
tt.args.throttleFieldValue,
tt.args.keyLimitOverride,
tt.args.defaultLimit,
defaultDistributionJson,
&limitDistributionMetrics{
EventsCount: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_count", "")),
EventsSize: metric.NewHeldCounterVec(ctl.RegisterCounterVec("test_size", "")),
Expand Down
20 changes: 9 additions & 11 deletions plugin/action/throttle/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ type complexLimit struct {
}

type rule struct {
fields []string // sorted list of used keys is used for combining limiter key.
values []string // values to check against. order is the same as for keys.
limit complexLimit
distributionCfg []byte // json-encoded limit distribution cfg
byteIdxPart []byte
fields []string // sorted list of used keys is used for combining limiter key.
values []string // values to check against. order is the same as for keys.
limit complexLimit
byteIdxPart []byte
}

// newRule returns new rule instance.
func newRule(conditions map[string]string, limit complexLimit, distributionCfg []byte, ruleNum int) *rule { // nolint: gocritic // hugeParam is ok here
func newRule(conditions map[string]string, limit complexLimit, ruleNum int) *rule { // nolint: gocritic // hugeParam is ok here
var (
keys = make([]string, 0, len(conditions))
values = make([]string, len(conditions))
Expand All @@ -39,11 +38,10 @@ func newRule(conditions map[string]string, limit complexLimit, distributionCfg [

byteIdxPart := []byte{byte('a' + ruleNum), ':'}
return &rule{
fields: keys,
values: values,
limit: limit,
byteIdxPart: byteIdxPart,
distributionCfg: distributionCfg,
fields: keys,
values: values,
limit: limit,
byteIdxPart: byteIdxPart,
}
}

Expand Down
2 changes: 0 additions & 2 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,13 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
p.rules = append(p.rules, newRule(
r.Conditions,
complexLimit{r.Limit, r.LimitKind, ldRule},
ruleDistrCfg.marshalJson(),
i,
))
}

p.rules = append(p.rules, newRule(
map[string]string{},
complexLimit{p.config.DefaultLimit, p.config.LimitKind, ld},
distrCfg.marshalJson(),
len(p.config.Rules),
))
}
Expand Down

0 comments on commit fe295f9

Please sign in to comment.