-
Notifications
You must be signed in to change notification settings - Fork 816
Add querier.ingester-query-max-attempts to retry on partial data. #6714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
aab648e
51aaf24
443c6a6
2158ca7
e057461
23cf005
761741f
883d8bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,14 @@ import ( | |
"github.com/cortexproject/cortex/pkg/querier/series" | ||
"github.com/cortexproject/cortex/pkg/tenant" | ||
"github.com/cortexproject/cortex/pkg/util" | ||
"github.com/cortexproject/cortex/pkg/util/backoff" | ||
"github.com/cortexproject/cortex/pkg/util/chunkcompat" | ||
"github.com/cortexproject/cortex/pkg/util/spanlogger" | ||
) | ||
|
||
const retryMinBackoff = time.Second | ||
const retryMaxBackoff = 5 * time.Second | ||
|
||
// Distributor is the read interface to the distributor, made an interface here | ||
// to reduce package coupling. | ||
type Distributor interface { | ||
|
@@ -38,36 +42,39 @@ type Distributor interface { | |
MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) | ||
} | ||
|
||
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc) QueryableWithFilter { | ||
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int) QueryableWithFilter { | ||
return distributorQueryable{ | ||
distributor: distributor, | ||
streamingMetdata: streamingMetdata, | ||
labelNamesWithMatchers: labelNamesWithMatchers, | ||
iteratorFn: iteratorFn, | ||
queryIngestersWithin: queryIngestersWithin, | ||
isPartialDataEnabled: isPartialDataEnabled, | ||
distributor: distributor, | ||
streamingMetdata: streamingMetdata, | ||
labelNamesWithMatchers: labelNamesWithMatchers, | ||
iteratorFn: iteratorFn, | ||
queryIngestersWithin: queryIngestersWithin, | ||
isPartialDataEnabled: isPartialDataEnabled, | ||
ingesterQueryMaxAttempts: ingesterQueryMaxAttempts, | ||
} | ||
} | ||
|
||
type distributorQueryable struct { | ||
distributor Distributor | ||
streamingMetdata bool | ||
labelNamesWithMatchers bool | ||
iteratorFn chunkIteratorFunc | ||
queryIngestersWithin time.Duration | ||
isPartialDataEnabled partialdata.IsCfgEnabledFunc | ||
distributor Distributor | ||
streamingMetdata bool | ||
labelNamesWithMatchers bool | ||
iteratorFn chunkIteratorFunc | ||
queryIngestersWithin time.Duration | ||
isPartialDataEnabled partialdata.IsCfgEnabledFunc | ||
ingesterQueryMaxAttempts int | ||
} | ||
|
||
func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) { | ||
return &distributorQuerier{ | ||
distributor: d.distributor, | ||
mint: mint, | ||
maxt: maxt, | ||
streamingMetadata: d.streamingMetdata, | ||
labelNamesMatchers: d.labelNamesWithMatchers, | ||
chunkIterFn: d.iteratorFn, | ||
queryIngestersWithin: d.queryIngestersWithin, | ||
isPartialDataEnabled: d.isPartialDataEnabled, | ||
distributor: d.distributor, | ||
mint: mint, | ||
maxt: maxt, | ||
streamingMetadata: d.streamingMetdata, | ||
labelNamesMatchers: d.labelNamesWithMatchers, | ||
chunkIterFn: d.iteratorFn, | ||
queryIngestersWithin: d.queryIngestersWithin, | ||
isPartialDataEnabled: d.isPartialDataEnabled, | ||
ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts, | ||
}, nil | ||
} | ||
|
||
|
@@ -77,13 +84,14 @@ func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bo | |
} | ||
|
||
type distributorQuerier struct { | ||
distributor Distributor | ||
mint, maxt int64 | ||
streamingMetadata bool | ||
labelNamesMatchers bool | ||
chunkIterFn chunkIteratorFunc | ||
queryIngestersWithin time.Duration | ||
isPartialDataEnabled partialdata.IsCfgEnabledFunc | ||
distributor Distributor | ||
mint, maxt int64 | ||
streamingMetadata bool | ||
labelNamesMatchers bool | ||
chunkIterFn chunkIteratorFunc | ||
queryIngestersWithin time.Duration | ||
isPartialDataEnabled partialdata.IsCfgEnabledFunc | ||
ingesterQueryMaxAttempts int | ||
} | ||
|
||
// Select implements storage.Querier interface. | ||
|
@@ -150,7 +158,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st | |
} | ||
|
||
func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { | ||
results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) | ||
results, err := q.queryWithRetry(ctx, func() (*client.QueryStreamResponse, error) { | ||
return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) | ||
}) | ||
|
||
if err != nil && !partialdata.IsPartialDataError(err) { | ||
return storage.ErrSeriesSet(err) | ||
|
@@ -192,6 +202,33 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa | |
return seriesSet | ||
} | ||
|
||
func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func() (*client.QueryStreamResponse, error)) (*client.QueryStreamResponse, error) { | ||
if q.ingesterQueryMaxAttempts == 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to handle case of ingesterQueryMaxAttempts set to 0 as it retries forever IIUC There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the validation in
But would you still want the condition to be changed to |
||
return queryFunc() | ||
} | ||
|
||
var result *client.QueryStreamResponse | ||
var err error | ||
|
||
retries := backoff.New(ctx, backoff.Config{ | ||
MinBackoff: retryMinBackoff, | ||
MaxBackoff: retryMaxBackoff, | ||
MaxRetries: q.ingesterQueryMaxAttempts, | ||
}) | ||
|
||
for retries.Ongoing() { | ||
result, err = queryFunc() | ||
|
||
if err == nil || !q.isRetryableError(err) { | ||
return result, err | ||
} | ||
|
||
retries.Wait() | ||
} | ||
|
||
return result, err | ||
} | ||
|
||
func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { | ||
var ( | ||
lvs []string | ||
|
@@ -201,9 +238,13 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints | |
partialDataEnabled := q.partialDataEnabled(ctx) | ||
|
||
if q.streamingMetadata { | ||
lvs, err = q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) | ||
lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { | ||
return q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) | ||
}) | ||
} else { | ||
lvs, err = q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) | ||
lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { | ||
return q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) | ||
}) | ||
} | ||
|
||
if partialdata.IsPartialDataError(err) { | ||
|
@@ -230,9 +271,13 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe | |
) | ||
|
||
if q.streamingMetadata { | ||
ln, err = q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) | ||
ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { | ||
return q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) | ||
}) | ||
} else { | ||
ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) | ||
ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { | ||
return q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) | ||
}) | ||
} | ||
|
||
if partialdata.IsPartialDataError(err) { | ||
|
@@ -243,6 +288,33 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe | |
return ln, nil, err | ||
} | ||
|
||
func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) { | ||
if q.ingesterQueryMaxAttempts == 1 { | ||
return labelsFunc() | ||
} | ||
|
||
var result []string | ||
var err error | ||
|
||
retries := backoff.New(ctx, backoff.Config{ | ||
MinBackoff: retryMinBackoff, | ||
MaxBackoff: retryMaxBackoff, | ||
MaxRetries: q.ingesterQueryMaxAttempts, | ||
}) | ||
|
||
for retries.Ongoing() { | ||
result, err = labelsFunc() | ||
|
||
if err == nil || !q.isRetryableError(err) { | ||
return result, err | ||
} | ||
|
||
retries.Wait() | ||
} | ||
|
||
return result, err | ||
} | ||
|
||
// labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method | ||
func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { | ||
log, ctx := spanlogger.New(ctx, "distributorQuerier.labelNamesWithMatchers") | ||
|
@@ -297,6 +369,10 @@ func (q *distributorQuerier) partialDataEnabled(ctx context.Context) bool { | |
return q.isPartialDataEnabled != nil && q.isPartialDataEnabled(userID) | ||
} | ||
|
||
func (q *distributorQuerier) isRetryableError(err error) bool { | ||
return partialdata.IsPartialDataError(err) | ||
} | ||
|
||
type distributorExemplarQueryable struct { | ||
distributor Distributor | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ingester queries usually finish in ms. I am not sure if it is worth it to wait for 1s ~ 5s backoff retry as it may cause more issues like increased inflight queries on Ingesters.