@@ -20,10 +20,14 @@ import (
20
20
"github.com/cortexproject/cortex/pkg/querier/series"
21
21
"github.com/cortexproject/cortex/pkg/tenant"
22
22
"github.com/cortexproject/cortex/pkg/util"
23
+ "github.com/cortexproject/cortex/pkg/util/backoff"
23
24
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
24
25
"github.com/cortexproject/cortex/pkg/util/spanlogger"
25
26
)
26
27
28
+ const retryMinBackoff = time .Millisecond
29
+ const retryMaxBackoff = 5 * time .Millisecond
30
+
27
31
// Distributor is the read interface to the distributor, made an interface here
28
32
// to reduce package coupling.
29
33
type Distributor interface {
@@ -38,36 +42,39 @@ type Distributor interface {
38
42
MetricsMetadata (ctx context.Context , req * client.MetricsMetadataRequest ) ([]scrape.MetricMetadata , error )
39
43
}
40
44
41
- func newDistributorQueryable (distributor Distributor , streamingMetdata bool , labelNamesWithMatchers bool , iteratorFn chunkIteratorFunc , queryIngestersWithin time.Duration , isPartialDataEnabled partialdata.IsCfgEnabledFunc ) QueryableWithFilter {
45
+ func newDistributorQueryable (distributor Distributor , streamingMetdata bool , labelNamesWithMatchers bool , iteratorFn chunkIteratorFunc , queryIngestersWithin time.Duration , isPartialDataEnabled partialdata.IsCfgEnabledFunc , ingesterQueryMaxAttempts int ) QueryableWithFilter {
42
46
return distributorQueryable {
43
- distributor : distributor ,
44
- streamingMetdata : streamingMetdata ,
45
- labelNamesWithMatchers : labelNamesWithMatchers ,
46
- iteratorFn : iteratorFn ,
47
- queryIngestersWithin : queryIngestersWithin ,
48
- isPartialDataEnabled : isPartialDataEnabled ,
47
+ distributor : distributor ,
48
+ streamingMetdata : streamingMetdata ,
49
+ labelNamesWithMatchers : labelNamesWithMatchers ,
50
+ iteratorFn : iteratorFn ,
51
+ queryIngestersWithin : queryIngestersWithin ,
52
+ isPartialDataEnabled : isPartialDataEnabled ,
53
+ ingesterQueryMaxAttempts : ingesterQueryMaxAttempts ,
49
54
}
50
55
}
51
56
52
57
type distributorQueryable struct {
53
- distributor Distributor
54
- streamingMetdata bool
55
- labelNamesWithMatchers bool
56
- iteratorFn chunkIteratorFunc
57
- queryIngestersWithin time.Duration
58
- isPartialDataEnabled partialdata.IsCfgEnabledFunc
58
+ distributor Distributor
59
+ streamingMetdata bool
60
+ labelNamesWithMatchers bool
61
+ iteratorFn chunkIteratorFunc
62
+ queryIngestersWithin time.Duration
63
+ isPartialDataEnabled partialdata.IsCfgEnabledFunc
64
+ ingesterQueryMaxAttempts int
59
65
}
60
66
61
67
func (d distributorQueryable ) Querier (mint , maxt int64 ) (storage.Querier , error ) {
62
68
return & distributorQuerier {
63
- distributor : d .distributor ,
64
- mint : mint ,
65
- maxt : maxt ,
66
- streamingMetadata : d .streamingMetdata ,
67
- labelNamesMatchers : d .labelNamesWithMatchers ,
68
- chunkIterFn : d .iteratorFn ,
69
- queryIngestersWithin : d .queryIngestersWithin ,
70
- isPartialDataEnabled : d .isPartialDataEnabled ,
69
+ distributor : d .distributor ,
70
+ mint : mint ,
71
+ maxt : maxt ,
72
+ streamingMetadata : d .streamingMetdata ,
73
+ labelNamesMatchers : d .labelNamesWithMatchers ,
74
+ chunkIterFn : d .iteratorFn ,
75
+ queryIngestersWithin : d .queryIngestersWithin ,
76
+ isPartialDataEnabled : d .isPartialDataEnabled ,
77
+ ingesterQueryMaxAttempts : d .ingesterQueryMaxAttempts ,
71
78
}, nil
72
79
}
73
80
@@ -77,13 +84,14 @@ func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bo
77
84
}
78
85
79
86
type distributorQuerier struct {
80
- distributor Distributor
81
- mint , maxt int64
82
- streamingMetadata bool
83
- labelNamesMatchers bool
84
- chunkIterFn chunkIteratorFunc
85
- queryIngestersWithin time.Duration
86
- isPartialDataEnabled partialdata.IsCfgEnabledFunc
87
+ distributor Distributor
88
+ mint , maxt int64
89
+ streamingMetadata bool
90
+ labelNamesMatchers bool
91
+ chunkIterFn chunkIteratorFunc
92
+ queryIngestersWithin time.Duration
93
+ isPartialDataEnabled partialdata.IsCfgEnabledFunc
94
+ ingesterQueryMaxAttempts int
87
95
}
88
96
89
97
// Select implements storage.Querier interface.
@@ -150,7 +158,9 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
150
158
}
151
159
152
160
func (q * distributorQuerier ) streamingSelect (ctx context.Context , sortSeries , partialDataEnabled bool , minT , maxT int64 , matchers []* labels.Matcher ) storage.SeriesSet {
153
- results , err := q .distributor .QueryStream (ctx , model .Time (minT ), model .Time (maxT ), partialDataEnabled , matchers ... )
161
+ results , err := q .queryWithRetry (ctx , func () (* client.QueryStreamResponse , error ) {
162
+ return q .distributor .QueryStream (ctx , model .Time (minT ), model .Time (maxT ), partialDataEnabled , matchers ... )
163
+ })
154
164
155
165
if err != nil && ! partialdata .IsPartialDataError (err ) {
156
166
return storage .ErrSeriesSet (err )
@@ -192,6 +202,33 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries, pa
192
202
return seriesSet
193
203
}
194
204
205
+ func (q * distributorQuerier ) queryWithRetry (ctx context.Context , queryFunc func () (* client.QueryStreamResponse , error )) (* client.QueryStreamResponse , error ) {
206
+ if q .ingesterQueryMaxAttempts <= 1 {
207
+ return queryFunc ()
208
+ }
209
+
210
+ var result * client.QueryStreamResponse
211
+ var err error
212
+
213
+ retries := backoff .New (ctx , backoff.Config {
214
+ MinBackoff : retryMinBackoff ,
215
+ MaxBackoff : retryMaxBackoff ,
216
+ MaxRetries : q .ingesterQueryMaxAttempts ,
217
+ })
218
+
219
+ for retries .Ongoing () {
220
+ result , err = queryFunc ()
221
+
222
+ if err == nil || ! q .isRetryableError (err ) {
223
+ return result , err
224
+ }
225
+
226
+ retries .Wait ()
227
+ }
228
+
229
+ return result , err
230
+ }
231
+
195
232
func (q * distributorQuerier ) LabelValues (ctx context.Context , name string , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
196
233
var (
197
234
lvs []string
@@ -201,9 +238,13 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints
201
238
partialDataEnabled := q .partialDataEnabled (ctx )
202
239
203
240
if q .streamingMetadata {
204
- lvs , err = q .distributor .LabelValuesForLabelNameStream (ctx , model .Time (q .mint ), model .Time (q .maxt ), model .LabelName (name ), hints , partialDataEnabled , matchers ... )
241
+ lvs , err = q .labelsWithRetry (ctx , func () ([]string , error ) {
242
+ return q .distributor .LabelValuesForLabelNameStream (ctx , model .Time (q .mint ), model .Time (q .maxt ), model .LabelName (name ), hints , partialDataEnabled , matchers ... )
243
+ })
205
244
} else {
206
- lvs , err = q .distributor .LabelValuesForLabelName (ctx , model .Time (q .mint ), model .Time (q .maxt ), model .LabelName (name ), hints , partialDataEnabled , matchers ... )
245
+ lvs , err = q .labelsWithRetry (ctx , func () ([]string , error ) {
246
+ return q .distributor .LabelValuesForLabelName (ctx , model .Time (q .mint ), model .Time (q .maxt ), model .LabelName (name ), hints , partialDataEnabled , matchers ... )
247
+ })
207
248
}
208
249
209
250
if partialdata .IsPartialDataError (err ) {
@@ -230,9 +271,13 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe
230
271
)
231
272
232
273
if q .streamingMetadata {
233
- ln , err = q .distributor .LabelNamesStream (ctx , model .Time (q .mint ), model .Time (q .maxt ), hints , partialDataEnabled , matchers ... )
274
+ ln , err = q .labelsWithRetry (ctx , func () ([]string , error ) {
275
+ return q .distributor .LabelNamesStream (ctx , model .Time (q .mint ), model .Time (q .maxt ), hints , partialDataEnabled , matchers ... )
276
+ })
234
277
} else {
235
- ln , err = q .distributor .LabelNames (ctx , model .Time (q .mint ), model .Time (q .maxt ), hints , partialDataEnabled , matchers ... )
278
+ ln , err = q .labelsWithRetry (ctx , func () ([]string , error ) {
279
+ return q .distributor .LabelNames (ctx , model .Time (q .mint ), model .Time (q .maxt ), hints , partialDataEnabled , matchers ... )
280
+ })
236
281
}
237
282
238
283
if partialdata .IsPartialDataError (err ) {
@@ -243,6 +288,33 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe
243
288
return ln , nil , err
244
289
}
245
290
291
+ func (q * distributorQuerier ) labelsWithRetry (ctx context.Context , labelsFunc func () ([]string , error )) ([]string , error ) {
292
+ if q .ingesterQueryMaxAttempts == 1 {
293
+ return labelsFunc ()
294
+ }
295
+
296
+ var result []string
297
+ var err error
298
+
299
+ retries := backoff .New (ctx , backoff.Config {
300
+ MinBackoff : retryMinBackoff ,
301
+ MaxBackoff : retryMaxBackoff ,
302
+ MaxRetries : q .ingesterQueryMaxAttempts ,
303
+ })
304
+
305
+ for retries .Ongoing () {
306
+ result , err = labelsFunc ()
307
+
308
+ if err == nil || ! q .isRetryableError (err ) {
309
+ return result , err
310
+ }
311
+
312
+ retries .Wait ()
313
+ }
314
+
315
+ return result , err
316
+ }
317
+
246
318
// labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method
247
319
func (q * distributorQuerier ) labelNamesWithMatchers (ctx context.Context , hints * storage.LabelHints , partialDataEnabled bool , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
248
320
log , ctx := spanlogger .New (ctx , "distributorQuerier.labelNamesWithMatchers" )
@@ -297,6 +369,10 @@ func (q *distributorQuerier) partialDataEnabled(ctx context.Context) bool {
297
369
return q .isPartialDataEnabled != nil && q .isPartialDataEnabled (userID )
298
370
}
299
371
372
+ func (q * distributorQuerier ) isRetryableError (err error ) bool {
373
+ return partialdata .IsPartialDataError (err )
374
+ }
375
+
300
376
type distributorExemplarQueryable struct {
301
377
distributor Distributor
302
378
}
0 commit comments