diff --git a/retrieval/manager.go b/retrieval/manager.go index 9b83ce24..9495b892 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -166,7 +166,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error { ) go seriesCache.run(ctx) - builder := &sampleBuilder{series: seriesCache} + builder := &sampleBuilder{ + logger: r.logger, + series: seriesCache, + } // NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned // with performance. The WAL reader will do a lot of tiny reads otherwise. @@ -221,7 +224,6 @@ Outer: level.Error(r.logger).Log("error", err) continue } - backoff := time.Duration(0) // Do not increment the metric for produced samples each time but rather // once at the end. // Otherwise it will increase CPU usage by ~10%. @@ -233,19 +235,11 @@ Outer: break Outer default: } - // We intentionally don't use time.After in the select statement above - // since we'd unnecessarily spawn a new goroutine for each sample - // we process even when there are no errors. - if backoff > 0 { - time.Sleep(backoff) - } - var outputSample *monitoring_pb.TimeSeries var hash uint64 outputSample, hash, samples, err = builder.next(ctx, samples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) - backoff = exponential(backoff) continue } if outputSample == nil { @@ -343,18 +337,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 { } return h } - -func exponential(d time.Duration) time.Duration { - const ( - min = 10 * time.Millisecond - max = 2 * time.Second - ) - d *= 2 - if d < min { - d = min - } - if d > max { - d = max - } - return d -} diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index d09f7045..f74474d1 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -43,6 +43,18 @@ var ( keyReason, _ = tag.NewKey("reason") ) +type recoverableError struct { + e &error +} + +func (r *recoverableError) Cause() error { + return r.e +} + +func (r *recoverableError) Error() string { + return r.e.Error() +} + func init() { if err := view.Register(&view.View{ Name: "prometheus_sidecar/dropped_series", @@ -345,7 +357,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { // If either of those pieces of data is missing, the series will be skipped. target, err := c.targets.Get(ctx, pkgLabels(entry.lset)) if err != nil { - return errors.Wrap(err, "retrieving target failed") + return recoverableError{e: errors.Wrap(err, "retrieving target failed")} } if target == nil { ctx, _ = tag.New(ctx, tag.Insert(keyReason, "target_not_found")) @@ -388,7 +400,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ) metadata, err := c.metadata.Get(ctx, job, instance, metricName) if err != nil { - return errors.Wrap(err, "get metadata") + return recoverableError{Error: errors.Wrap(err, "get metadata")} } if metadata == nil { // The full name didn't turn anything up. Check again in case it's a summary, @@ -397,7 +409,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { if baseMetricName, suffix, ok = stripComplexMetricSuffix(metricName); ok { metadata, err = c.metadata.Get(ctx, job, instance, baseMetricName) if err != nil { - return errors.Wrap(err, "get metadata") + return recoverableError{Error: errors.Wrap(err, "get metadata")} } } if metadata == nil { diff --git a/retrieval/transform.go b/retrieval/transform.go index 2a1c5cae..75a36698 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -21,6 +21,8 @@ import ( "strings" "time" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/textparse" @@ -31,6 +33,7 @@ import ( ) type sampleBuilder struct { + logger log.Logger series seriesGetter } @@ -39,9 +42,9 @@ type sampleBuilder struct { func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] - entry, ok, err := b.series.get(ctx, sample.Ref) + entry, ok, err := b.getSeriesWithRetry(ctx, sample) if err != nil { - return nil, 0, samples, errors.Wrap(err, "get series information") + return nil, 0, samples, err } if !ok { return nil, 0, samples[1:], nil @@ -128,6 +131,28 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo return &ts, entry.hash, samples[1:], nil } +func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) { + backoff := time.Duration(0) + for { + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + default: + } + entry, ok, err = b.series.get(ctx, sample.Ref) + if err == nil { + break + } + if _, ok := err.(recoverableError); !ok { + return nil, false, err + } + level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) + backoff = exponential(backoff) + time.Sleep(backoff) + } + return entry, ok, nil +} + const ( metricSuffixBucket = "_bucket" metricSuffixSum = "_sum" @@ -210,7 +235,7 @@ func (b *sampleBuilder) buildDistribution( // until we hit a new metric. Loop: for i, s := range samples { - e, ok, err := b.series.get(ctx, s.Ref) + e, ok, err := b.getSeriesWithRetry(ctx, s) if err != nil { return nil, 0, samples, err } @@ -349,3 +374,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool { // If one label set still has labels left, they are not equal. return i == len(a) && j == len(b) } + +func exponential(d time.Duration) time.Duration { + const ( + min = 10 * time.Millisecond + max = 2 * time.Second + ) + d *= 2 + if d < min { + d = min + } + if d > max { + d = max + } + return d +}