Skip to content

Commit 23cf005

Browse files
committed
Use backoff
Signed-off-by: Justin Jung <[email protected]>
1 parent e057461 commit 23cf005

File tree

3 files changed

+17
-7
lines changed

3 files changed

+17
-7
lines changed

pkg/querier/blocks_store_queryable.go

-2
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ const (
6161
// store-gateways. If no more store-gateways are left (ie. due to lower replication
6262
// factor) than we'll end the retries earlier.
6363
maxFetchSeriesAttempts = 3
64-
65-
ingesterQueryMaxAttempts = 3
6664
)
6765

6866
var (

pkg/querier/distributor_queryable.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@ import (
2020
"github.com/cortexproject/cortex/pkg/querier/series"
2121
"github.com/cortexproject/cortex/pkg/tenant"
2222
"github.com/cortexproject/cortex/pkg/util"
23+
"github.com/cortexproject/cortex/pkg/util/backoff"
2324
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
2425
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2526
)
2627

28+
const retryMinBackoff = 10 * time.Second
29+
const retryMaxBackoff = time.Minute
30+
2731
// Distributor is the read interface to the distributor, made an interface here
2832
// to reduce package coupling.
2933
type Distributor interface {
@@ -154,9 +158,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
154158
}
155159

156160
func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
157-
results, err := q.queryWithRetry(func() (*client.QueryStreamResponse, error) {
161+
results, err := q.queryWithRetry(ctx, func() (*client.QueryStreamResponse, error) {
158162
return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...)
159-
}, q.ingesterQueryMaxAttempts)
163+
})
160164

161165
if err != nil && !partialdata.IsPartialDataError(err) {
162166
return storage.ErrSeriesSet(err)
@@ -198,16 +202,24 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa
198202
return seriesSet
199203
}
200204

201-
func (q *distributorQuerier) queryWithRetry(queryFunc func() (*client.QueryStreamResponse, error), retryAttempt int) (*client.QueryStreamResponse, error) {
205+
func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func() (*client.QueryStreamResponse, error)) (*client.QueryStreamResponse, error) {
202206
var result *client.QueryStreamResponse
203207
var err error
204208

205-
for i := 0; i < retryAttempt; i++ {
209+
retries := backoff.New(ctx, backoff.Config{
210+
MinBackoff: retryMinBackoff,
211+
MaxBackoff: retryMaxBackoff,
212+
MaxRetries: q.ingesterQueryMaxAttempts,
213+
})
214+
215+
for retries.Ongoing() {
206216
result, err = queryFunc()
207217

208218
if err == nil || !q.isRetryableError(err) {
209219
return result, err
210220
}
221+
222+
retries.Wait()
211223
}
212224

213225
return result, err

pkg/querier/querier.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
128128
f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
129129
f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.")
130130
f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier")
131-
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", ingesterQueryMaxAttempts, "The maximum number of times we attempt fetching data from ingesters for retryable errors.")
131+
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 3, "The maximum number of times we attempt fetching data from ingesters for retryable errors.")
132132
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
133133
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
134134
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")

0 commit comments

Comments
 (0)