@@ -227,73 +227,70 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
227
227
reqStats = stats .FromContext (ctx )
228
228
)
229
229
230
- results , err := d .queryWithRetry (func () ([]interface {}, error ) {
231
- // Fetch samples from multiple ingesters
232
- results , err := replicationSet .Do (ctx , d .cfg .ExtraQueryDelay , false , partialDataEnabled , func (ctx context.Context , ing * ring.InstanceDesc ) (interface {}, error ) {
233
- client , err := d .ingesterPool .GetClientFor (ing .Addr )
234
- if err != nil {
235
- return nil , err
236
- }
230
+ // Fetch samples from multiple ingesters
231
+ results , err := replicationSet .Do (ctx , d .cfg .ExtraQueryDelay , false , partialDataEnabled , func (ctx context.Context , ing * ring.InstanceDesc ) (interface {}, error ) {
232
+ client , err := d .ingesterPool .GetClientFor (ing .Addr )
233
+ if err != nil {
234
+ return nil , err
235
+ }
237
236
238
- ingesterId , err := d .ingestersRing .GetInstanceIdByAddr (ing .Addr )
239
- if err != nil {
240
- level .Warn (d .log ).Log ("msg" , "instance not found in the ring" , "addr" , ing .Addr , "err" , err )
241
- }
237
+ ingesterId , err := d .ingestersRing .GetInstanceIdByAddr (ing .Addr )
238
+ if err != nil {
239
+ level .Warn (d .log ).Log ("msg" , "instance not found in the ring" , "addr" , ing .Addr , "err" , err )
240
+ }
241
+
242
+ d .ingesterQueries .WithLabelValues (ingesterId ).Inc ()
242
243
243
- d .ingesterQueries .WithLabelValues (ingesterId ).Inc ()
244
+ stream , err := client .(ingester_client.IngesterClient ).QueryStream (ctx , req )
245
+ if err != nil {
246
+ d .ingesterQueryFailures .WithLabelValues (ingesterId ).Inc ()
247
+ return nil , err
248
+ }
249
+ defer stream .CloseSend () //nolint:errcheck
250
+
251
+ result := & ingester_client.QueryStreamResponse {}
252
+ for {
253
+ resp , err := stream .Recv ()
254
+ if err == io .EOF {
255
+ break
256
+ } else if err != nil {
257
+ // Do not track a failure if the context was canceled.
258
+ if ! grpcutil .IsGRPCContextCanceled (err ) {
259
+ d .ingesterQueryFailures .WithLabelValues (ingesterId ).Inc ()
260
+ }
244
261
245
- stream , err := client .(ingester_client.IngesterClient ).QueryStream (ctx , req )
246
- if err != nil {
247
- d .ingesterQueryFailures .WithLabelValues (ingesterId ).Inc ()
248
262
return nil , err
249
263
}
250
- defer stream .CloseSend () //nolint:errcheck
251
-
252
- result := & ingester_client.QueryStreamResponse {}
253
- for {
254
- resp , err := stream .Recv ()
255
- if err == io .EOF {
256
- break
257
- } else if err != nil {
258
- // Do not track a failure if the context was canceled.
259
- if ! grpcutil .IsGRPCContextCanceled (err ) {
260
- d .ingesterQueryFailures .WithLabelValues (ingesterId ).Inc ()
261
- }
262
-
263
- return nil , err
264
- }
265
-
266
- // Enforce the max chunks limits.
267
- if chunkLimitErr := queryLimiter .AddChunks (resp .ChunksCount ()); chunkLimitErr != nil {
268
- return nil , validation .LimitError (chunkLimitErr .Error ())
269
- }
270
264
271
- s := make ([][]cortexpb. LabelAdapter , 0 , len ( resp . Chunkseries ))
272
- for _ , series := range resp .Chunkseries {
273
- s = append ( s , series . Labels )
274
- }
265
+ // Enforce the max chunks limits.
266
+ if chunkLimitErr := queryLimiter . AddChunks ( resp .ChunksCount ()); chunkLimitErr != nil {
267
+ return nil , validation . LimitError ( chunkLimitErr . Error () )
268
+ }
275
269
276
- if limitErr := queryLimiter .AddSeries (s ... ); limitErr != nil {
277
- return nil , validation .LimitError (limitErr .Error ())
278
- }
270
+ s := make ([][]cortexpb.LabelAdapter , 0 , len (resp .Chunkseries ))
271
+ for _ , series := range resp .Chunkseries {
272
+ s = append (s , series .Labels )
273
+ }
279
274
280
- if chunkBytesLimitErr := queryLimiter .AddChunkBytes ( resp . ChunksSize ()); chunkBytesLimitErr != nil {
281
- return nil , validation .LimitError (chunkBytesLimitErr .Error ())
282
- }
275
+ if limitErr := queryLimiter .AddSeries ( s ... ); limitErr != nil {
276
+ return nil , validation .LimitError (limitErr .Error ())
277
+ }
283
278
284
- if dataBytesLimitErr := queryLimiter .AddDataBytes (resp .Size ()); dataBytesLimitErr != nil {
285
- return nil , validation .LimitError (dataBytesLimitErr .Error ())
286
- }
279
+ if chunkBytesLimitErr := queryLimiter .AddChunkBytes (resp .ChunksSize ()); chunkBytesLimitErr != nil {
280
+ return nil , validation .LimitError (chunkBytesLimitErr .Error ())
281
+ }
287
282
288
- result .Chunkseries = append (result .Chunkseries , resp .Chunkseries ... )
283
+ if dataBytesLimitErr := queryLimiter .AddDataBytes (resp .Size ()); dataBytesLimitErr != nil {
284
+ return nil , validation .LimitError (dataBytesLimitErr .Error ())
289
285
}
290
- return result , nil
291
- })
292
- if err != nil && ! partialdata .IsPartialDataError (err ) {
293
- return nil , err
286
+
287
+ result .Chunkseries = append (result .Chunkseries , resp .Chunkseries ... )
294
288
}
295
- return results , err
296
- }, 3 )
289
+ return result , nil
290
+ })
291
+ if err != nil && ! partialdata .IsPartialDataError (err ) {
292
+ return nil , err
293
+ }
297
294
298
295
span , _ := opentracing .StartSpanFromContext (ctx , "Distributor.MergeIngesterStreams" )
299
296
defer span .Finish ()
@@ -340,26 +337,3 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
340
337
341
338
return resp , nil
342
339
}
343
-
344
- func (d * Distributor ) queryWithRetry (queryFunc func () ([]interface {}, error ), retryAttempt int ) ([]interface {}, error ) {
345
- var result []interface {}
346
- var err error
347
-
348
- for i := 0 ; i < retryAttempt ; i ++ {
349
- result , err = queryFunc ()
350
-
351
- if err == nil || ! d .isRetryableError (err ) {
352
- return result , err
353
- }
354
- }
355
-
356
- return result , err
357
- }
358
-
359
- func (d * Distributor ) isRetryableError (err error ) bool {
360
- if partialdata .IsPartialDataError (err ) {
361
- return true
362
- }
363
-
364
- return false
365
- }
0 commit comments