From aab648e17f250f0117d88242bda204002c880897 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 15:23:59 -0700 Subject: [PATCH 1/8] Add static retry Signed-off-by: Justin Jung --- pkg/distributor/query.go | 130 +++++++++------- pkg/querier/distributor_queryable.go | 58 +++++++- pkg/querier/distributor_queryable_test.go | 174 ++++++++++++++++++++++ 3 files changed, 305 insertions(+), 57 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8de7630e755..8faa5a6ea5e 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -227,70 +227,73 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats = stats.FromContext(ctx) ) - // Fetch samples from multiple ingesters - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { - client, err := d.ingesterPool.GetClientFor(ing.Addr) - if err != nil { - return nil, err - } - - ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) - if err != nil { - level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) - } + results, err := d.queryWithRetry(func() ([]interface{}, error) { + // Fetch samples from multiple ingesters + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + client, err := d.ingesterPool.GetClientFor(ing.Addr) + if err != nil { + return nil, err + } - d.ingesterQueries.WithLabelValues(ingesterId).Inc() + ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) + if err != nil { + level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) + } - stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) - if err != nil { - d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() - return nil, err - } - defer stream.CloseSend() //nolint:errcheck - - result := &ingester_client.QueryStreamResponse{} - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { - // Do not track a failure if the context was canceled. - if !grpcutil.IsGRPCContextCanceled(err) { - d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() - } + d.ingesterQueries.WithLabelValues(ingesterId).Inc() + stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) + if err != nil { + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() return nil, err } + defer stream.CloseSend() //nolint:errcheck + + result := &ingester_client.QueryStreamResponse{} + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + // Do not track a failure if the context was canceled. + if !grpcutil.IsGRPCContextCanceled(err) { + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() + } + + return nil, err + } - // Enforce the max chunks limits. - if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { - return nil, validation.LimitError(chunkLimitErr.Error()) - } + // Enforce the max chunks limits. + if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { + return nil, validation.LimitError(chunkLimitErr.Error()) + } - s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)) - for _, series := range resp.Chunkseries { - s = append(s, series.Labels) - } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)) + for _, series := range resp.Chunkseries { + s = append(s, series.Labels) + } - if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } + if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { + return nil, validation.LimitError(limitErr.Error()) + } - if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { - return nil, validation.LimitError(chunkBytesLimitErr.Error()) - } + if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { + return nil, validation.LimitError(chunkBytesLimitErr.Error()) + } - if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil { - return nil, validation.LimitError(dataBytesLimitErr.Error()) - } + if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil { + return nil, validation.LimitError(dataBytesLimitErr.Error()) + } - result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) + result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) + } + return result, nil + }) + if err != nil && !partialdata.IsPartialDataError(err) { + return nil, err } - return result, nil - }) - if err != nil && !partialdata.IsPartialDataError(err) { - return nil, err - } + return results, err + }, 3) span, _ := opentracing.StartSpanFromContext(ctx, "Distributor.MergeIngesterStreams") defer span.Finish() @@ -337,3 +340,26 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return resp, nil } + +func (d *Distributor) queryWithRetry(queryFunc func() ([]interface{}, error), retryAttempt int) ([]interface{}, error) { + var result []interface{} + var err error + + for i := 0; i < retryAttempt; i++ { + result, err = queryFunc() + + if err == nil || !d.isRetryableError(err) { + return result, err + } + } + + return result, err +} + +func (d *Distributor) isRetryableError(err error) bool { + if partialdata.IsPartialDataError(err) { + return true + } + + return false +} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index dffe8ae3002..7521d0cdacc 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -150,7 +150,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st } func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { - results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) + results, err := q.queryWithRetry(func() (*client.QueryStreamResponse, error) { + return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) + }, 3) if err != nil && !partialdata.IsPartialDataError(err) { return storage.ErrSeriesSet(err) @@ -192,6 +194,21 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa return seriesSet } +func (q *distributorQuerier) queryWithRetry(queryFunc func() (*client.QueryStreamResponse, error), retryAttempt int) (*client.QueryStreamResponse, error) { + var result *client.QueryStreamResponse + var err error + + for i := 0; i < retryAttempt; i++ { + result, err = queryFunc() + + if err == nil || !q.isRetryableError(err) { + return result, err + } + } + + return result, err +} + func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { var ( lvs []string @@ -201,9 +218,13 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints partialDataEnabled := q.partialDataEnabled(ctx) if q.streamingMetadata { - lvs, err = q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + lvs, err = q.labelsWithRetry(func() ([]string, error) { + return q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + }, 3) } else { - lvs, err = q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + lvs, err = q.labelsWithRetry(func() ([]string, error) { + return q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) + }, 3) } if partialdata.IsPartialDataError(err) { @@ -230,9 +251,13 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe ) if q.streamingMetadata { - ln, err = q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + ln, err = q.labelsWithRetry(func() ([]string, error) { + return q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + }, 3) } else { - ln, err = q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + ln, err = q.labelsWithRetry(func() ([]string, error) { + return q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) + }, 3) } if partialdata.IsPartialDataError(err) { @@ -243,6 +268,21 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe return ln, nil, err } +func (q *distributorQuerier) labelsWithRetry(labelsFunc func() ([]string, error), retryAttempt int) ([]string, error) { + var result []string + var err error + + for i := 0; i < retryAttempt; i++ { + result, err = labelsFunc() + + if err == nil || !q.isRetryableError(err) { + return result, err + } + } + + return result, err +} + // labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { log, ctx := spanlogger.New(ctx, "distributorQuerier.labelNamesWithMatchers") @@ -297,6 +337,14 @@ func (q *distributorQuerier) partialDataEnabled(ctx context.Context) bool { return q.isPartialDataEnabled != nil && q.isPartialDataEnabled(userID) } +func (q *distributorQuerier) isRetryableError(err error) bool { + if partialdata.IsPartialDataError(err) { + return true + } + + return false +} + type distributorExemplarQueryable struct { distributor Distributor } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 457fba03cbc..cff3c54e305 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -9,12 +9,14 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" @@ -210,6 +212,178 @@ func TestIngesterStreaming(t *testing.T) { } } +func TestDistributorQuerier_Retry(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "0") + + tests := map[string]struct { + api string + errors []error + isPartialData bool + isError bool + }{ + "Select - should retry up to 3 times": { + api: "Select", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "Select - should return partial data after 3 times": { + api: "Select", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "Select - should not retry on other error": { + api: "Select", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + "LabelNames - should retry up to 3 times": { + api: "LabelNames", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "LabelNames - should return partial data after 3 times": { + api: "LabelNames", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "LabelNames - should not retry on other error": { + api: "LabelNames", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + "LabelValues - should retry up to 3 times": { + api: "LabelValues", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + nil, + }, + isError: false, + isPartialData: false, + }, + "LabelValues - should return partial data after 3 times": { + api: "LabelValues", + errors: []error{ + partialdata.ErrPartialData, + partialdata.ErrPartialData, + partialdata.ErrPartialData, + }, + isError: false, + isPartialData: true, + }, + "LabelValues - should not retry on other error": { + api: "LabelValues", + errors: []error{ + fmt.Errorf("new error"), + partialdata.ErrPartialData, + }, + isError: true, + isPartialData: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + d := &MockDistributor{} + + if tc.api == "Select" { + promChunk := util.GenerateChunk(t, time.Second, model.TimeFromUnix(time.Now().Unix()), 10, promchunk.PrometheusXorChunk) + clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{promChunk}) + require.NoError(t, err) + + for _, err := range tc.errors { + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Chunks: clientChunks, + }, + }, + }, err).Once() + } + } else if tc.api == "LabelNames" { + for _, err := range tc.errors { + d.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + d.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + } + } else if tc.api == "LabelValues" { + for _, err := range tc.errors { + d.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + d.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + } + } + + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + return true + }) + querier, err := queryable.Querier(mint, maxt) + require.NoError(t, err) + + if tc.api == "Select" { + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}) + if tc.isError { + require.Error(t, seriesSet.Err()) + return + } + require.NoError(t, seriesSet.Err()) + + if tc.isPartialData { + require.Contains(t, seriesSet.Warnings(), partialdata.ErrPartialData.Error()) + } + } else { + var annots annotations.Annotations + var err error + if tc.api == "LabelNames" { + _, annots, err = querier.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + } else if tc.api == "LabelValues" { + _, annots, err = querier.LabelValues(ctx, "foo", nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + } + + if tc.isError { + require.Error(t, err) + return + } + require.NoError(t, err) + + if tc.isPartialData { + warnings, _ := annots.AsStrings("", 1, 0) + require.Contains(t, warnings, partialdata.ErrPartialData.Error()) + } + } + }) + } +} + func TestDistributorQuerier_LabelNames(t *testing.T) { t.Parallel() From 51aaf247e1852b9e4c8ca25e506ec84eb327e260 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 19:11:11 -0700 Subject: [PATCH 2/8] Add querier.ingester-query-max-attempts Signed-off-by: Justin Jung --- pkg/querier/blocks_store_queryable.go | 2 + pkg/querier/distributor_queryable.go | 70 ++++++++++++----------- pkg/querier/distributor_queryable_test.go | 33 ++++++----- pkg/querier/querier.go | 11 +++- pkg/querier/querier_test.go | 4 +- 5 files changed, 69 insertions(+), 51 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index a5647e55451..4824a6c1b8b 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -61,6 +61,8 @@ const ( // store-gateways. If no more store-gateways are left (ie. due to lower replication // factor) than we'll end the retries earlier. maxFetchSeriesAttempts = 3 + + ingesterQueryMaxAttempts = 3 ) var ( diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 7521d0cdacc..121fd733797 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -38,36 +38,39 @@ type Distributor interface { MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int) QueryableWithFilter { return distributorQueryable{ - distributor: distributor, - streamingMetdata: streamingMetdata, - labelNamesWithMatchers: labelNamesWithMatchers, - iteratorFn: iteratorFn, - queryIngestersWithin: queryIngestersWithin, - isPartialDataEnabled: isPartialDataEnabled, + distributor: distributor, + streamingMetdata: streamingMetdata, + labelNamesWithMatchers: labelNamesWithMatchers, + iteratorFn: iteratorFn, + queryIngestersWithin: queryIngestersWithin, + isPartialDataEnabled: isPartialDataEnabled, + ingesterQueryMaxAttempts: ingesterQueryMaxAttempts, } } type distributorQueryable struct { - distributor Distributor - streamingMetdata bool - labelNamesWithMatchers bool - iteratorFn chunkIteratorFunc - queryIngestersWithin time.Duration - isPartialDataEnabled partialdata.IsCfgEnabledFunc + distributor Distributor + streamingMetdata bool + labelNamesWithMatchers bool + iteratorFn chunkIteratorFunc + queryIngestersWithin time.Duration + isPartialDataEnabled partialdata.IsCfgEnabledFunc + ingesterQueryMaxAttempts int } func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) { return &distributorQuerier{ - distributor: d.distributor, - mint: mint, - maxt: maxt, - streamingMetadata: d.streamingMetdata, - labelNamesMatchers: d.labelNamesWithMatchers, - chunkIterFn: d.iteratorFn, - queryIngestersWithin: d.queryIngestersWithin, - isPartialDataEnabled: d.isPartialDataEnabled, + distributor: d.distributor, + mint: mint, + maxt: maxt, + streamingMetadata: d.streamingMetdata, + labelNamesMatchers: d.labelNamesWithMatchers, + chunkIterFn: d.iteratorFn, + queryIngestersWithin: d.queryIngestersWithin, + isPartialDataEnabled: d.isPartialDataEnabled, + ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts, }, nil } @@ -77,13 +80,14 @@ func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bo } type distributorQuerier struct { - distributor Distributor - mint, maxt int64 - streamingMetadata bool - labelNamesMatchers bool - chunkIterFn chunkIteratorFunc - queryIngestersWithin time.Duration - isPartialDataEnabled partialdata.IsCfgEnabledFunc + distributor Distributor + mint, maxt int64 + streamingMetadata bool + labelNamesMatchers bool + chunkIterFn chunkIteratorFunc + queryIngestersWithin time.Duration + isPartialDataEnabled partialdata.IsCfgEnabledFunc + ingesterQueryMaxAttempts int } // Select implements storage.Querier interface. @@ -152,7 +156,7 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { results, err := q.queryWithRetry(func() (*client.QueryStreamResponse, error) { return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) - }, 3) + }, q.ingesterQueryMaxAttempts) if err != nil && !partialdata.IsPartialDataError(err) { return storage.ErrSeriesSet(err) @@ -220,11 +224,11 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints if q.streamingMetadata { lvs, err = q.labelsWithRetry(func() ([]string, error) { return q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) - }, 3) + }, q.ingesterQueryMaxAttempts) } else { lvs, err = q.labelsWithRetry(func() ([]string, error) { return q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) - }, 3) + }, q.ingesterQueryMaxAttempts) } if partialdata.IsPartialDataError(err) { @@ -253,11 +257,11 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe if q.streamingMetadata { ln, err = q.labelsWithRetry(func() ([]string, error) { return q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) - }, 3) + }, q.ingesterQueryMaxAttempts) } else { ln, err = q.labelsWithRetry(func() ([]string, error) { return q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) - }, 3) + }, q.ingesterQueryMaxAttempts) } if partialdata.IsPartialDataError(err) { diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index cff3c54e305..0bdd13ce67a 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -92,7 +92,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil) ctx := user.InjectOrgID(context.Background(), "test") - queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil) + queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil, 1) querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -131,7 +131,7 @@ func TestDistributorQueryableFilter(t *testing.T) { t.Parallel() d := &MockDistributor{} - dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil) + dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil, 1) now := time.Now() @@ -183,7 +183,7 @@ func TestIngesterStreaming(t *testing.T) { queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { return partialDataEnabled - }) + }, 1) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -221,7 +221,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isPartialData bool isError bool }{ - "Select - should retry up to 3 times": { + "Select - should retry": { api: "Select", errors: []error{ partialdata.ErrPartialData, @@ -231,7 +231,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isError: false, isPartialData: false, }, - "Select - should return partial data after 3 times": { + "Select - should return partial data after all retries": { api: "Select", errors: []error{ partialdata.ErrPartialData, @@ -250,7 +250,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isError: true, isPartialData: false, }, - "LabelNames - should retry up to 3 times": { + "LabelNames - should retry": { api: "LabelNames", errors: []error{ partialdata.ErrPartialData, @@ -260,7 +260,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isError: false, isPartialData: false, }, - "LabelNames - should return partial data after 3 times": { + "LabelNames - should return partial data after all retries": { api: "LabelNames", errors: []error{ partialdata.ErrPartialData, @@ -279,7 +279,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isError: true, isPartialData: false, }, - "LabelValues - should retry up to 3 times": { + "LabelValues - should retry": { api: "LabelValues", errors: []error{ partialdata.ErrPartialData, @@ -289,7 +289,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { isError: false, isPartialData: false, }, - "LabelValues - should return partial data after 3 times": { + "LabelValues - should return partial data after all retries": { api: "LabelValues", errors: []error{ partialdata.ErrPartialData, @@ -332,20 +332,23 @@ func TestDistributorQuerier_Retry(t *testing.T) { }, err).Once() } } else if tc.api == "LabelNames" { + res := []string{"foo"} for _, err := range tc.errors { - d.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() - d.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + d.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + d.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() } } else if tc.api == "LabelValues" { + res := []string{"foo"} for _, err := range tc.errors { - d.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() - d.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, err).Once() + d.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() + d.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(res, err).Once() } } + ingesterQueryMaxAttempts := 3 queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { return true - }) + }, ingesterQueryMaxAttempts) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -423,7 +426,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, 0, func(string) bool { return partialDataEnabled - }) + }, 1) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f13121caf98..a1b96edf8f8 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -78,6 +78,9 @@ type Config struct { // The maximum number of times we attempt fetching missing blocks from different Store Gateways. StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"` + // The maximum number of times we attempt fetching data from Ingesters. + IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"` + ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` // Experimental. Use https://github.com/thanos-io/promql-engine rather than @@ -95,6 +98,7 @@ var ( errEmptyTimeRange = errors.New("empty time range") errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)") errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1") + errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -124,6 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier") + f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", ingesterQueryMaxAttempts, "The maximum number of times we attempt fetching data from ingesters for retryable errors.") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") @@ -155,6 +160,10 @@ func (cfg *Config) Validate() error { return errInvalidConsistencyCheckAttempts } + if cfg.IngesterQueryMaxAttempts < 1 { + return errInvalidIngesterQueryMaxAttempts + } + return nil } @@ -174,7 +183,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index d2865408abe..37b8d504b1f 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -300,7 +300,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { } distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) tCases := []struct { name string @@ -446,7 +446,7 @@ func TestLimits(t *testing.T) { response: &streamResponse, } - distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil) + distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) tCases := []struct { name string From 443c6a60df52220f8d1871fe9c65e26b2704a9fd Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 19:17:04 -0700 Subject: [PATCH 3/8] Add docs Signed-off-by: Justin Jung --- docs/blocks-storage/querier.md | 5 +++++ docs/configuration/config-file-reference.md | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 5d09d1169c2..bcb661f9245 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -237,6 +237,11 @@ querier: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] + # The maximum number of times we attempt fetching data from ingesters for + # retryable errors. + # CLI flag: -querier.ingester-query-max-attempts + [ingester_query_max_attempts: | default = 3] + # When distributor's sharding strategy is shuffle-sharding and this setting is # > 0, queriers fetch in-memory series from the minimum set of required # ingesters, selecting only ingesters which may have received series since diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 252887a15e5..deef7b0fd9a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4165,6 +4165,11 @@ store_gateway_client: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] +# The maximum number of times we attempt fetching data from ingesters for +# retryable errors. +# CLI flag: -querier.ingester-query-max-attempts +[ingester_query_max_attempts: | default = 3] + # When distributor's sharding strategy is shuffle-sharding and this setting is > # 0, queriers fetch in-memory series from the minimum set of required ingesters, # selecting only ingesters which may have received series since 'now - lookback From 2158ca70e959d94092c8d8b831f77ca882085d98 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 19:19:13 -0700 Subject: [PATCH 4/8] Add changelog Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5432fc045dd..7f7e8b9ee16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ * [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617 * [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628 * [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676 +* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 From e05746131acd956123c21fcc1e8c8c7c3103326d Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 21:07:36 -0700 Subject: [PATCH 5/8] Lint Signed-off-by: Justin Jung --- pkg/distributor/query.go | 130 +++++++++++---------------- pkg/querier/distributor_queryable.go | 6 +- 2 files changed, 53 insertions(+), 83 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8faa5a6ea5e..8de7630e755 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -227,73 +227,70 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats = stats.FromContext(ctx) ) - results, err := d.queryWithRetry(func() ([]interface{}, error) { - // Fetch samples from multiple ingesters - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { - client, err := d.ingesterPool.GetClientFor(ing.Addr) - if err != nil { - return nil, err - } + // Fetch samples from multiple ingesters + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + client, err := d.ingesterPool.GetClientFor(ing.Addr) + if err != nil { + return nil, err + } - ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) - if err != nil { - level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) - } + ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr) + if err != nil { + level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err) + } + + d.ingesterQueries.WithLabelValues(ingesterId).Inc() - d.ingesterQueries.WithLabelValues(ingesterId).Inc() + stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) + if err != nil { + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() + return nil, err + } + defer stream.CloseSend() //nolint:errcheck + + result := &ingester_client.QueryStreamResponse{} + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + // Do not track a failure if the context was canceled. + if !grpcutil.IsGRPCContextCanceled(err) { + d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() + } - stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req) - if err != nil { - d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() return nil, err } - defer stream.CloseSend() //nolint:errcheck - - result := &ingester_client.QueryStreamResponse{} - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { - // Do not track a failure if the context was canceled. - if !grpcutil.IsGRPCContextCanceled(err) { - d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc() - } - - return nil, err - } - - // Enforce the max chunks limits. - if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { - return nil, validation.LimitError(chunkLimitErr.Error()) - } - s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)) - for _, series := range resp.Chunkseries { - s = append(s, series.Labels) - } + // Enforce the max chunks limits. + if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { + return nil, validation.LimitError(chunkLimitErr.Error()) + } - if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)) + for _, series := range resp.Chunkseries { + s = append(s, series.Labels) + } - if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { - return nil, validation.LimitError(chunkBytesLimitErr.Error()) - } + if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { + return nil, validation.LimitError(limitErr.Error()) + } - if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil { - return nil, validation.LimitError(dataBytesLimitErr.Error()) - } + if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { + return nil, validation.LimitError(chunkBytesLimitErr.Error()) + } - result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) + if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil { + return nil, validation.LimitError(dataBytesLimitErr.Error()) } - return result, nil - }) - if err != nil && !partialdata.IsPartialDataError(err) { - return nil, err + + result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) } - return results, err - }, 3) + return result, nil + }) + if err != nil && !partialdata.IsPartialDataError(err) { + return nil, err + } span, _ := opentracing.StartSpanFromContext(ctx, "Distributor.MergeIngesterStreams") defer span.Finish() @@ -340,26 +337,3 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return resp, nil } - -func (d *Distributor) queryWithRetry(queryFunc func() ([]interface{}, error), retryAttempt int) ([]interface{}, error) { - var result []interface{} - var err error - - for i := 0; i < retryAttempt; i++ { - result, err = queryFunc() - - if err == nil || !d.isRetryableError(err) { - return result, err - } - } - - return result, err -} - -func (d *Distributor) isRetryableError(err error) bool { - if partialdata.IsPartialDataError(err) { - return true - } - - return false -} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 121fd733797..307f3e9861e 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -342,11 +342,7 @@ func (q *distributorQuerier) partialDataEnabled(ctx context.Context) bool { } func (q *distributorQuerier) isRetryableError(err error) bool { - if partialdata.IsPartialDataError(err) { - return true - } - - return false + return partialdata.IsPartialDataError(err) } type distributorExemplarQueryable struct { From 23cf005d5f52e9d226126ae6a6cede7f1398a5a2 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 21:31:41 -0700 Subject: [PATCH 6/8] Use backoff Signed-off-by: Justin Jung --- pkg/querier/blocks_store_queryable.go | 2 -- pkg/querier/distributor_queryable.go | 20 ++++++++++++++++---- pkg/querier/querier.go | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 4824a6c1b8b..a5647e55451 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -61,8 +61,6 @@ const ( // store-gateways. If no more store-gateways are left (ie. due to lower replication // factor) than we'll end the retries earlier. maxFetchSeriesAttempts = 3 - - ingesterQueryMaxAttempts = 3 ) var ( diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 307f3e9861e..a3aa6ec18bf 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -20,10 +20,14 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/backoff" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) +const retryMinBackoff = 10 * time.Second +const retryMaxBackoff = time.Minute + // Distributor is the read interface to the distributor, made an interface here // to reduce package coupling. type Distributor interface { @@ -154,9 +158,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st } func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, partialDataEnabled bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { - results, err := q.queryWithRetry(func() (*client.QueryStreamResponse, error) { + results, err := q.queryWithRetry(ctx, func() (*client.QueryStreamResponse, error) { return q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), partialDataEnabled, matchers...) - }, q.ingesterQueryMaxAttempts) + }) if err != nil && !partialdata.IsPartialDataError(err) { return storage.ErrSeriesSet(err) @@ -198,16 +202,24 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa return seriesSet } -func (q *distributorQuerier) queryWithRetry(queryFunc func() (*client.QueryStreamResponse, error), retryAttempt int) (*client.QueryStreamResponse, error) { +func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func() (*client.QueryStreamResponse, error)) (*client.QueryStreamResponse, error) { var result *client.QueryStreamResponse var err error - for i := 0; i < retryAttempt; i++ { + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: retryMinBackoff, + MaxBackoff: retryMaxBackoff, + MaxRetries: q.ingesterQueryMaxAttempts, + }) + + for retries.Ongoing() { result, err = queryFunc() if err == nil || !q.isRetryableError(err) { return result, err } + + retries.Wait() } return result, err diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a1b96edf8f8..69e2d3fea93 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -128,7 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier") - f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", ingesterQueryMaxAttempts, "The maximum number of times we attempt fetching data from ingesters for retryable errors.") + f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 3, "The maximum number of times we attempt fetching data from ingesters for retryable errors.") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") From 761741f4e91c41608e13c41cf5387c9dbd9d991a Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 22:03:31 -0700 Subject: [PATCH 7/8] Fix test Signed-off-by: Justin Jung --- pkg/querier/distributor_queryable.go | 32 +++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index a3aa6ec18bf..47cfa316b52 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -25,8 +25,8 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" ) -const retryMinBackoff = 10 * time.Second -const retryMaxBackoff = time.Minute +const retryMinBackoff = time.Second +const retryMaxBackoff = 5 * time.Second // Distributor is the read interface to the distributor, made an interface here // to reduce package coupling. @@ -234,13 +234,13 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints partialDataEnabled := q.partialDataEnabled(ctx) if q.streamingMetadata { - lvs, err = q.labelsWithRetry(func() ([]string, error) { + lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { return q.distributor.LabelValuesForLabelNameStream(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) - }, q.ingesterQueryMaxAttempts) + }) } else { - lvs, err = q.labelsWithRetry(func() ([]string, error) { + lvs, err = q.labelsWithRetry(ctx, func() ([]string, error) { return q.distributor.LabelValuesForLabelName(ctx, model.Time(q.mint), model.Time(q.maxt), model.LabelName(name), hints, partialDataEnabled, matchers...) - }, q.ingesterQueryMaxAttempts) + }) } if partialdata.IsPartialDataError(err) { @@ -267,13 +267,13 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe ) if q.streamingMetadata { - ln, err = q.labelsWithRetry(func() ([]string, error) { + ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { return q.distributor.LabelNamesStream(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) - }, q.ingesterQueryMaxAttempts) + }) } else { - ln, err = q.labelsWithRetry(func() ([]string, error) { + ln, err = q.labelsWithRetry(ctx, func() ([]string, error) { return q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt), hints, partialDataEnabled, matchers...) - }, q.ingesterQueryMaxAttempts) + }) } if partialdata.IsPartialDataError(err) { @@ -284,16 +284,24 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe return ln, nil, err } -func (q *distributorQuerier) labelsWithRetry(labelsFunc func() ([]string, error), retryAttempt int) ([]string, error) { +func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) { var result []string var err error - for i := 0; i < retryAttempt; i++ { + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: retryMinBackoff, + MaxBackoff: retryMaxBackoff, + MaxRetries: q.ingesterQueryMaxAttempts, + }) + + for retries.Ongoing() { result, err = labelsFunc() if err == nil || !q.isRetryableError(err) { return result, err } + + retries.Wait() } return result, err From 883d8bc486c9cfd28e688f6e15169d039aca3eb2 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Mon, 21 Apr 2025 22:41:10 -0700 Subject: [PATCH 8/8] Change default retry from 3 to 1 Signed-off-by: Justin Jung --- docs/blocks-storage/querier.md | 2 +- docs/configuration/config-file-reference.md | 2 +- pkg/querier/distributor_queryable.go | 8 ++++++++ pkg/querier/querier.go | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index bcb661f9245..3f11305f0c3 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -240,7 +240,7 @@ querier: # The maximum number of times we attempt fetching data from ingesters for # retryable errors. # CLI flag: -querier.ingester-query-max-attempts - [ingester_query_max_attempts: | default = 3] + [ingester_query_max_attempts: | default = 1] # When distributor's sharding strategy is shuffle-sharding and this setting is # > 0, queriers fetch in-memory series from the minimum set of required diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index deef7b0fd9a..1308cacfadc 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4168,7 +4168,7 @@ store_gateway_client: # The maximum number of times we attempt fetching data from ingesters for # retryable errors. # CLI flag: -querier.ingester-query-max-attempts -[ingester_query_max_attempts: | default = 3] +[ingester_query_max_attempts: | default = 1] # When distributor's sharding strategy is shuffle-sharding and this setting is > # 0, queriers fetch in-memory series from the minimum set of required ingesters, diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 47cfa316b52..11e2dceb2a5 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -203,6 +203,10 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa } func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func() (*client.QueryStreamResponse, error)) (*client.QueryStreamResponse, error) { + if q.ingesterQueryMaxAttempts == 1 { + return queryFunc() + } + var result *client.QueryStreamResponse var err error @@ -285,6 +289,10 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe } func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) { + if q.ingesterQueryMaxAttempts == 1 { + return labelsFunc() + } + var result []string var err error diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 69e2d3fea93..9220faad5c4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -128,7 +128,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier") - f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 3, "The maximum number of times we attempt fetching data from ingesters for retryable errors.") + f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors.") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")