Skip to content

Commit 3ba42bd

Browse files
authored
Log limits file in throttle plugin (#890)
1 parent 34d3fb5 commit 3ba42bd

File tree

11 files changed

+849
-18
lines changed

11 files changed

+849
-18
lines changed

cfg/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,18 @@ func ParseFieldSelector(selector string) []string {
564564
return result
565565
}
566566

567+
func BuildFieldSelector(fields []string) string {
568+
var builder strings.Builder
569+
for i, field := range fields {
570+
escaped := strings.ReplaceAll(field, ".", `\.`)
571+
builder.WriteString(escaped)
572+
if i != len(fields)-1 {
573+
builder.WriteByte('.')
574+
}
575+
}
576+
return builder.String()
577+
}
578+
567579
// Parses several fields selectors and removes nested fields.
568580
// If there are empty field selectors or no selectors at all error returned.
569581
// For example:

cfg/config_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,3 +864,57 @@ func TestMergeYAMLs(t *testing.T) {
864864
})
865865
}
866866
}
867+
868+
func TestBuildFieldSelector(t *testing.T) {
869+
tests := []struct {
870+
name string
871+
fields []string
872+
expected string
873+
}{
874+
{
875+
name: "Empty slice",
876+
fields: []string{},
877+
expected: "",
878+
},
879+
{
880+
name: "Single field, no dot",
881+
fields: []string{"field"},
882+
expected: "field",
883+
},
884+
{
885+
name: "Multiple fields, no dots",
886+
fields: []string{"a", "b", "c"},
887+
expected: "a.b.c",
888+
},
889+
{
890+
name: "Field with dot",
891+
fields: []string{"a.b", "c"},
892+
expected: "a\\.b.c",
893+
},
894+
{
895+
name: "All fields with dots",
896+
fields: []string{"a.b", "c.d"},
897+
expected: "a\\.b.c\\.d",
898+
},
899+
{
900+
name: "Field with multiple dots",
901+
fields: []string{"a.b.c", "d"},
902+
expected: "a\\.b\\.c.d",
903+
},
904+
{
905+
name: "Field with backslash",
906+
fields: []string{`a\b`, "c"},
907+
expected: `a\b.c`,
908+
},
909+
}
910+
911+
for _, tt := range tests {
912+
tt := tt
913+
t.Run(tt.name, func(t *testing.T) {
914+
t.Parallel()
915+
916+
got := BuildFieldSelector(tt.fields)
917+
require.Equal(t, tt.expected, got)
918+
})
919+
}
920+
}

plugin/action/throttle/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,5 +227,20 @@ Distribution object example:
227227

228228
<br>
229229

230+
**`limits_file`** *`string`*
231+
232+
The filename to store current log limits. Limits are loaded only on initialization
233+
> It's a `json` file. You can modify it manually. But the limit from the file will disappear if redis is available and it has a different value for this limit
234+
235+
> ⚠ **Experimental feature**
236+
237+
<br>
238+
239+
**`limits_save_interval`** *`cfg.Duration`* *`default=3s`*
240+
241+
Defines the interval between each saving of log limits in file
242+
243+
<br>
244+
230245

231246
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/action/throttle/distribution.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,30 @@ func (ld *limitDistributions) copy() limitDistributions {
136136
enabled: ld.enabled,
137137
}
138138
}
139+
140+
func (ld *limitDistributions) getLimitDistributionsCfg() limitDistributionCfg {
141+
field := cfg.BuildFieldSelector(ld.field)
142+
143+
ratioMap := make(map[float64][]string)
144+
for value, idx := range ld.idxByKey {
145+
if idx < 0 || idx >= len(ld.distributions) {
146+
continue
147+
}
148+
ratio := ld.distributions[idx].ratio
149+
ratioMap[ratio] = append(ratioMap[ratio], value)
150+
}
151+
152+
ratios := make([]limitDistributionRatio, 0, len(ratioMap))
153+
for ratio, values := range ratioMap {
154+
ratios = append(ratios, limitDistributionRatio{
155+
Ratio: ratio,
156+
Values: values,
157+
})
158+
}
159+
160+
return limitDistributionCfg{
161+
Field: field,
162+
Ratios: ratios,
163+
Enabled: ld.enabled,
164+
}
165+
}

plugin/action/throttle/distribution_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package throttle
33
import (
44
"fmt"
55
"slices"
6+
"sort"
67
"testing"
78

89
"github.com/stretchr/testify/require"
@@ -196,3 +197,49 @@ func Test_parseLimitDistribution(t *testing.T) {
196197
})
197198
}
198199
}
200+
201+
func TestGetLimitDistributionsCfg(t *testing.T) {
202+
ld := &limitDistributions{
203+
field: []string{"log", "level"},
204+
idxByKey: map[string]int{
205+
"A": 0,
206+
"B": 1,
207+
"C": 0,
208+
},
209+
distributions: []complexDistribution{
210+
{ratio: 0.1, limit: 10},
211+
{ratio: 0.2, limit: 20},
212+
},
213+
enabled: true,
214+
}
215+
216+
expected := limitDistributionCfg{
217+
Field: "log.level",
218+
Ratios: []limitDistributionRatio{
219+
{Ratio: 0.1, Values: []string{"A", "C"}},
220+
{Ratio: 0.2, Values: []string{"B"}},
221+
},
222+
Enabled: true,
223+
}
224+
225+
result := ld.getLimitDistributionsCfg()
226+
227+
require.Equal(t, expected.Field, result.Field, "wrong Field")
228+
require.Equal(t, expected.Enabled, result.Enabled, "wrong Enabled")
229+
require.True(t, isLimitDistributionCfgRatiosEqual(expected.Ratios, result.Ratios), "wrong Ratios")
230+
}
231+
232+
func isLimitDistributionCfgRatiosEqual(expected, result []limitDistributionRatio) bool {
233+
sort.Slice(result, func(i, j int) bool {
234+
return result[i].Ratio < result[j].Ratio
235+
})
236+
237+
for i, ldRatio := range result {
238+
slices.Sort(ldRatio.Values)
239+
if !slices.Equal(expected[i].Values, ldRatio.Values) {
240+
return false
241+
}
242+
}
243+
244+
return true
245+
}

plugin/action/throttle/in_memory_limiter.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package throttle
22

33
import (
4+
"math"
45
"sync"
56
"sync/atomic"
67
"time"
@@ -9,6 +10,8 @@ import (
910
"github.com/ozontech/file.d/pipeline"
1011
)
1112

13+
const EPS = 1e-9
14+
1215
type inMemoryLimiter struct {
1316
limit complexLimit
1417
buckets buckets
@@ -236,6 +239,38 @@ func (l *inMemoryLimiter) bucketsMinID() int {
236239
return l.buckets.getMinID()
237240
}
238241

242+
func (l *inMemoryLimiter) getLimitCfg() limitCfg {
243+
return limitCfg{}
244+
}
245+
246+
func (l *inMemoryLimiter) getLimit() int64 {
247+
return atomic.LoadInt64(&l.limit.value)
248+
}
249+
250+
func (l *inMemoryLimiter) isLimitCfgChanged(curLimit int64, curDistribution []limitDistributionRatio) bool {
251+
if l.getLimit() != curLimit {
252+
return true
253+
}
254+
255+
curDistributionsCount := 0
256+
257+
l.lock()
258+
for _, ldRatio := range curDistribution {
259+
curDistributionsCount += len(ldRatio.Values)
260+
for _, fieldValue := range ldRatio.Values {
261+
idx, has := l.limit.distributions.idxByKey[fieldValue]
262+
if !has || math.Abs(l.limit.distributions.distributions[idx].ratio-ldRatio.Ratio) > EPS {
263+
l.unlock()
264+
return true
265+
}
266+
}
267+
}
268+
distributionsCount := len(l.limit.distributions.idxByKey)
269+
l.unlock()
270+
271+
return distributionsCount != curDistributionsCount
272+
}
273+
239274
func (l *inMemoryLimiter) setNowFn(fn func() time.Time) {
240275
l.lock()
241276
l.nowFn = fn
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package throttle
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestIsLimitCfgChangedSame(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
limiter *inMemoryLimiter
13+
curLimit int64
14+
curDistribution []limitDistributionRatio
15+
16+
want bool
17+
}{
18+
{
19+
name: "same_limit_and_distribution",
20+
limiter: &inMemoryLimiter{
21+
limit: complexLimit{
22+
value: 100,
23+
distributions: limitDistributions{
24+
idxByKey: map[string]int{
25+
"A": 0,
26+
"B": 1,
27+
},
28+
distributions: []complexDistribution{
29+
{ratio: 0.1, limit: 10},
30+
{ratio: 0.2, limit: 20},
31+
},
32+
enabled: true,
33+
},
34+
},
35+
},
36+
curLimit: 100,
37+
curDistribution: []limitDistributionRatio{
38+
{Ratio: 0.1, Values: []string{"A"}},
39+
{Ratio: 0.2, Values: []string{"B"}},
40+
},
41+
42+
want: false,
43+
},
44+
{
45+
name: "limit_changed",
46+
limiter: &inMemoryLimiter{
47+
limit: complexLimit{
48+
value: 200,
49+
},
50+
},
51+
curLimit: 100,
52+
curDistribution: []limitDistributionRatio{},
53+
54+
want: true,
55+
},
56+
{
57+
name: "distribution_changed",
58+
limiter: &inMemoryLimiter{
59+
limit: complexLimit{
60+
value: 100,
61+
distributions: limitDistributions{
62+
idxByKey: map[string]int{
63+
"A": 0,
64+
"B": 0,
65+
},
66+
distributions: []complexDistribution{
67+
{ratio: 0.1, limit: 10},
68+
},
69+
enabled: true,
70+
},
71+
},
72+
},
73+
curLimit: 100,
74+
curDistribution: []limitDistributionRatio{
75+
{Ratio: 0.1, Values: []string{"A"}},
76+
{Ratio: 0.2, Values: []string{"B"}},
77+
},
78+
79+
want: true,
80+
},
81+
{
82+
name: "increase_distributions_size",
83+
limiter: &inMemoryLimiter{
84+
limit: complexLimit{
85+
value: 100,
86+
distributions: limitDistributions{
87+
idxByKey: map[string]int{
88+
"A": 0,
89+
"B": 1,
90+
"C": 0,
91+
},
92+
distributions: []complexDistribution{
93+
{ratio: 0.1, limit: 10},
94+
{ratio: 0.2, limit: 20},
95+
},
96+
enabled: true,
97+
},
98+
},
99+
},
100+
curLimit: 100,
101+
curDistribution: []limitDistributionRatio{
102+
{Ratio: 0.1, Values: []string{"A"}},
103+
{Ratio: 0.2, Values: []string{"B"}},
104+
},
105+
106+
want: true,
107+
},
108+
{
109+
name: "decrease_distributions_size",
110+
limiter: &inMemoryLimiter{
111+
limit: complexLimit{
112+
value: 100,
113+
distributions: limitDistributions{
114+
idxByKey: map[string]int{
115+
"A": 0,
116+
},
117+
distributions: []complexDistribution{
118+
{ratio: 0.1, limit: 10},
119+
},
120+
enabled: true,
121+
},
122+
},
123+
},
124+
curLimit: 100,
125+
curDistribution: []limitDistributionRatio{
126+
{Ratio: 0.1, Values: []string{"A"}},
127+
{Ratio: 0.2, Values: []string{"B"}},
128+
},
129+
130+
want: true,
131+
},
132+
}
133+
134+
for _, tt := range tests {
135+
tt := tt
136+
t.Run(tt.name, func(t *testing.T) {
137+
t.Parallel()
138+
139+
got := tt.limiter.isLimitCfgChanged(tt.curLimit, tt.curDistribution)
140+
require.Equal(t, tt.want, got)
141+
})
142+
}
143+
}

0 commit comments

Comments
 (0)