Skip to content

Commit 19dfa17

Browse files
authored
Distributor ingestion limits (#3879)
* Add support for tenant ingest limits in distributor * Make sure request size is available before checking limits * Improve naming, error message * Rename field * Improve comment * Add more docs, run make generate * Remove ingestion_limit config option from docs
1 parent a7714c1 commit 19dfa17

File tree

7 files changed

+361
-15
lines changed

7 files changed

+361
-15
lines changed

pkg/distributor/distributor.go

+42-8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
3838
"github.com/grafana/pyroscope/pkg/clientpool"
3939
"github.com/grafana/pyroscope/pkg/distributor/aggregator"
40+
"github.com/grafana/pyroscope/pkg/distributor/ingest_limits"
4041
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
4142
writepath "github.com/grafana/pyroscope/pkg/distributor/write_path"
4243
phlaremodel "github.com/grafana/pyroscope/pkg/model"
@@ -99,6 +100,7 @@ type Distributor struct {
99100
ingestionRateLimiter *limiter.RateLimiter
100101
aggregator *aggregator.MultiTenantAggregator[*pprof.ProfileMerge]
101102
asyncRequests sync.WaitGroup
103+
ingestionLimitsSampler *ingest_limits.Sampler
102104

103105
subservices *services.Manager
104106
subservicesWatcher *services.FailureWatcher
@@ -117,6 +119,7 @@ type Distributor struct {
117119
type Limits interface {
118120
IngestionRateBytes(tenantID string) float64
119121
IngestionBurstSizeBytes(tenantID string) int
122+
IngestionLimit(tenantID string) *ingest_limits.Config
120123
IngestionTenantShardSize(tenantID string) int
121124
MaxLabelNameLength(tenantID string) int
122125
MaxLabelValueLength(tenantID string) int
@@ -187,7 +190,9 @@ func New(
187190
return nil, err
188191
}
189192

190-
subservices = append(subservices, distributorsLifecycler, distributorsRing, d.aggregator)
193+
d.ingestionLimitsSampler = ingest_limits.NewSampler(distributorsRing)
194+
195+
subservices = append(subservices, distributorsLifecycler, distributorsRing, d.aggregator, d.ingestionLimitsSampler)
191196

192197
d.ingestionRateLimiter = limiter.NewRateLimiter(newGlobalRateStrategy(newIngestionRateStrategy(limits), d), 10*time.Second)
193198
d.distributorsLifecycler = distributorsLifecycler
@@ -302,6 +307,12 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
302307
d.metrics.receivedCompressedBytes.WithLabelValues(string(profName), tenantID).Observe(float64(req.RawProfileSize))
303308
}
304309

310+
d.calculateRequestSize(req)
311+
312+
if err := d.checkIngestLimit(tenantID, req); err != nil {
313+
return nil, err
314+
}
315+
305316
if err := d.rateLimit(tenantID, req); err != nil {
306317
return nil, err
307318
}
@@ -310,7 +321,7 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
310321

311322
for _, series := range req.Series {
312323
profName := phlaremodel.Labels(series.Labels).Get(ProfileName)
313-
groups := usageGroups.GetUsageGroups(tenantID, phlaremodel.Labels(series.Labels))
324+
groups := usageGroups.GetUsageGroups(tenantID, series.Labels)
314325
profLanguage := d.GetProfileLanguage(series)
315326

316327
for _, raw := range series.Samples {
@@ -709,6 +720,17 @@ func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels
709720
}
710721

711722
func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushRequest) error {
723+
if !d.ingestionRateLimiter.AllowN(time.Now(), tenantID, int(req.TotalBytesUncompressed)) {
724+
validation.DiscardedProfiles.WithLabelValues(string(validation.RateLimited), tenantID).Add(float64(req.TotalProfiles))
725+
validation.DiscardedBytes.WithLabelValues(string(validation.RateLimited), tenantID).Add(float64(req.TotalBytesUncompressed))
726+
return connect.NewError(connect.CodeResourceExhausted,
727+
fmt.Errorf("push rate limit (%s) exceeded while adding %s", humanize.IBytes(uint64(d.limits.IngestionRateBytes(tenantID))), humanize.IBytes(uint64(req.TotalBytesUncompressed))),
728+
)
729+
}
730+
return nil
731+
}
732+
733+
func (d *Distributor) calculateRequestSize(req *distributormodel.PushRequest) {
712734
for _, series := range req.Series {
713735
// include the labels in the size calculation
714736
for _, lbs := range series.Labels {
@@ -720,14 +742,26 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushReque
720742
req.TotalBytesUncompressed += int64(raw.Profile.SizeVT())
721743
}
722744
}
723-
// rate limit the request
724-
if !d.ingestionRateLimiter.AllowN(time.Now(), tenantID, int(req.TotalBytesUncompressed)) {
725-
validation.DiscardedProfiles.WithLabelValues(string(validation.RateLimited), tenantID).Add(float64(req.TotalProfiles))
726-
validation.DiscardedBytes.WithLabelValues(string(validation.RateLimited), tenantID).Add(float64(req.TotalBytesUncompressed))
745+
}
746+
747+
func (d *Distributor) checkIngestLimit(tenantID string, req *distributormodel.PushRequest) error {
748+
l := d.limits.IngestionLimit(tenantID)
749+
if l == nil {
750+
return nil
751+
}
752+
753+
if l.LimitReached {
754+
// we want to allow a very small portion of the traffic after reaching the limit
755+
if d.ingestionLimitsSampler.AllowRequest(tenantID, l.Sampling) {
756+
return nil
757+
}
758+
limitResetTime := time.Unix(l.LimitResetTime, 0).UTC().Format(time.RFC3339)
759+
validation.DiscardedProfiles.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalProfiles))
760+
validation.DiscardedBytes.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalBytesUncompressed))
727761
return connect.NewError(connect.CodeResourceExhausted,
728-
fmt.Errorf("push rate limit (%s) exceeded while adding %s", humanize.IBytes(uint64(d.limits.IngestionRateBytes(tenantID))), humanize.IBytes(uint64(req.TotalBytesUncompressed))),
729-
)
762+
fmt.Errorf("limit of %s/%s reached, next reset at %s", humanize.IBytes(uint64(l.PeriodLimitMb*1024*1024)), l.PeriodType, limitResetTime))
730763
}
764+
731765
return nil
732766
}
733767

pkg/distributor/distributor_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/stretchr/testify/assert"
2828
"github.com/stretchr/testify/require"
2929

30+
"github.com/grafana/pyroscope/pkg/distributor/ingest_limits"
3031
testhelper2 "github.com/grafana/pyroscope/pkg/pprof/testhelper"
3132

3233
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
@@ -301,6 +302,26 @@ func Test_Limits(t *testing.T) {
301302
expectedCode: connect.CodeInvalidArgument,
302303
expectedValidationReason: validation.LabelNameTooLong,
303304
},
305+
{
306+
description: "ingest_limit_reached",
307+
pushReq: &pushv1.PushRequest{},
308+
overrides: validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) {
309+
l := validation.MockDefaultLimits()
310+
l.IngestionLimit = &ingest_limits.Config{
311+
PeriodType: "hour",
312+
PeriodLimitMb: 128,
313+
LimitResetTime: 1737721086,
314+
LimitReached: true,
315+
Sampling: ingest_limits.SamplingConfig{
316+
NumRequests: 0,
317+
Period: time.Minute,
318+
},
319+
}
320+
tenantLimits["user-1"] = l
321+
}),
322+
expectedCode: connect.CodeResourceExhausted,
323+
expectedValidationReason: validation.IngestLimitReached,
324+
},
304325
}
305326

306327
for _, tc := range testCases {
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ingest_limits
2+
3+
import "time"
4+
5+
type Config struct {
6+
// PeriodType provides the limit period / interval (e.g., "hour"). Used in error messages only.
7+
PeriodType string `yaml:"period_type" json:"period_type"`
8+
// PeriodLimitMb provides the limit that is being set in MB. Used in error messages only.
9+
PeriodLimitMb int `yaml:"period_limit_mb" json:"period_limit_mb"`
10+
// LimitResetTime provides the time (Unix seconds) when the limit will reset. Used in error messages only.
11+
LimitResetTime int64 `yaml:"limit_reset_time" json:"limit_reset_time"`
12+
// LimitReached instructs distributors to allow or reject profiles.
13+
LimitReached bool `yaml:"limit_reached" json:"limit_reached"`
14+
// Sampling controls the sampling parameters when the limit is reached.
15+
Sampling SamplingConfig `yaml:"sampling" json:"sampling"`
16+
}
17+
18+
// SamplingConfig describes the params of a simple probabilistic sampling mechanism.
19+
//
20+
// Distributors should allow up to NumRequests requests through and then apply a cooldown (Period) after which
21+
// more requests can be let through.
22+
type SamplingConfig struct {
23+
NumRequests int `yaml:"num_requests" json:"num_requests"`
24+
Period time.Duration `yaml:"period" json:"period"`
25+
}
+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package ingest_limits
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"sync"
7+
"time"
8+
9+
"github.com/grafana/dskit/services"
10+
)
11+
12+
type tenantTracker struct {
13+
mu sync.Mutex
14+
lastRequestTime time.Time
15+
remainingRequests int
16+
}
17+
18+
// Sampler provides a very simple time-based probabilistic sampling,
19+
// intended to be used when a tenant limit has been reached.
20+
//
21+
// The sampler will allow a number of requests in a time interval.
22+
// Once the interval is over, the number of allowed requests resets.
23+
//
24+
// We introduce a probability function for a request to be allowed defined as 1 / num_replicas,
25+
// to account for the size of the cluster and because tracking is done in memory.
26+
type Sampler struct {
27+
*services.BasicService
28+
29+
mu sync.RWMutex
30+
tenants map[string]*tenantTracker
31+
32+
// needed for adjusting the probability function with the number of replicas
33+
instanceCountProvider InstanceCountProvider
34+
35+
// cleanup of the tenants map to prevent build-up
36+
cleanupInterval time.Duration
37+
maxAge time.Duration
38+
closeOnce sync.Once
39+
stop chan struct{}
40+
done chan struct{}
41+
}
42+
43+
type InstanceCountProvider interface {
44+
InstancesCount() int
45+
}
46+
47+
func NewSampler(instanceCount InstanceCountProvider) *Sampler {
48+
s := &Sampler{
49+
tenants: make(map[string]*tenantTracker),
50+
instanceCountProvider: instanceCount,
51+
cleanupInterval: 1 * time.Hour,
52+
maxAge: 24 * time.Hour,
53+
stop: make(chan struct{}),
54+
done: make(chan struct{}),
55+
}
56+
s.BasicService = services.NewBasicService(
57+
s.starting,
58+
s.running,
59+
s.stopping,
60+
)
61+
62+
return s
63+
}
64+
65+
func (s *Sampler) starting(_ context.Context) error { return nil }
66+
67+
func (s *Sampler) stopping(_ error) error {
68+
s.closeOnce.Do(func() {
69+
close(s.stop)
70+
<-s.done
71+
})
72+
return nil
73+
}
74+
75+
func (s *Sampler) running(ctx context.Context) error {
76+
t := time.NewTicker(s.cleanupInterval)
77+
defer func() {
78+
t.Stop()
79+
close(s.done)
80+
}()
81+
for {
82+
select {
83+
case <-t.C:
84+
s.removeStaleTenants()
85+
case <-s.stop:
86+
return nil
87+
case <-ctx.Done():
88+
return nil
89+
}
90+
}
91+
}
92+
93+
func (s *Sampler) AllowRequest(tenantID string, config SamplingConfig) bool {
94+
s.mu.Lock()
95+
tracker, exists := s.tenants[tenantID]
96+
if !exists {
97+
tracker = &tenantTracker{
98+
lastRequestTime: time.Now(),
99+
remainingRequests: config.NumRequests,
100+
}
101+
s.tenants[tenantID] = tracker
102+
}
103+
s.mu.Unlock()
104+
105+
return tracker.AllowRequest(s.instanceCountProvider.InstancesCount(), config.Period, config.NumRequests)
106+
}
107+
108+
func (b *tenantTracker) AllowRequest(replicaCount int, windowDuration time.Duration, maxRequests int) bool {
109+
b.mu.Lock()
110+
defer b.mu.Unlock()
111+
112+
now := time.Now()
113+
114+
// reset tracking data if enough time has passed
115+
if now.Sub(b.lastRequestTime) >= windowDuration {
116+
b.lastRequestTime = now
117+
b.remainingRequests = maxRequests
118+
}
119+
120+
if b.remainingRequests > 0 {
121+
// random chance of allowing request, adjusting for the number of replicas
122+
shouldAllow := rand.Float64() < float64(maxRequests)/float64(replicaCount)
123+
124+
if shouldAllow {
125+
b.remainingRequests--
126+
return true
127+
}
128+
}
129+
130+
return false
131+
}
132+
133+
func (s *Sampler) removeStaleTenants() {
134+
s.mu.Lock()
135+
cutoff := time.Now().Add(-s.maxAge)
136+
for tenantID, tracker := range s.tenants {
137+
if tracker.lastRequestTime.Before(cutoff) {
138+
delete(s.tenants, tenantID)
139+
}
140+
}
141+
s.mu.Unlock()
142+
}

0 commit comments

Comments
 (0)