Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid retry on non-recoverable errors. #101

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 4 additions & 25 deletions retrieval/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
)
go seriesCache.run(ctx)

builder := &sampleBuilder{series: seriesCache}
builder := &sampleBuilder{
logger: r.logger,
series: seriesCache,
}

// NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned
// with performance. The WAL reader will do a lot of tiny reads otherwise.
Expand Down Expand Up @@ -221,7 +224,6 @@ Outer:
level.Error(r.logger).Log("error", err)
continue
}
backoff := time.Duration(0)
// Do not increment the metric for produced samples each time but rather
// once at the end.
// Otherwise it will increase CPU usage by ~10%.
Expand All @@ -233,19 +235,11 @@ Outer:
break Outer
default:
}
// We intentionally don't use time.After in the select statement above
// since we'd unnecessarily spawn a new goroutine for each sample
// we process even when there are no errors.
if backoff > 0 {
time.Sleep(backoff)
}

var outputSample *monitoring_pb.TimeSeries
var hash uint64
outputSample, hash, samples, err = builder.next(ctx, samples)
if err != nil {
level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err)
backoff = exponential(backoff)
continue
}
if outputSample == nil {
Expand Down Expand Up @@ -343,18 +337,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 {
}
return h
}

func exponential(d time.Duration) time.Duration {
const (
min = 10 * time.Millisecond
max = 2 * time.Second
)
d *= 2
if d < min {
d = min
}
if d > max {
d = max
}
return d
}
18 changes: 15 additions & 3 deletions retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ var (
keyReason, _ = tag.NewKey("reason")
)

type recoverableError struct {
e &error
}

func (r *recoverableError) Cause() error {
return r.e
}

func (r *recoverableError) Error() string {
return r.e.Error()
}

func init() {
if err := view.Register(&view.View{
Name: "prometheus_sidecar/dropped_series",
Expand Down Expand Up @@ -345,7 +357,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error {
// If either of those pieces of data is missing, the series will be skipped.
target, err := c.targets.Get(ctx, pkgLabels(entry.lset))
if err != nil {
return errors.Wrap(err, "retrieving target failed")
return recoverableError{e: errors.Wrap(err, "retrieving target failed")}
}
if target == nil {
ctx, _ = tag.New(ctx, tag.Insert(keyReason, "target_not_found"))
Expand Down Expand Up @@ -388,7 +400,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error {
)
metadata, err := c.metadata.Get(ctx, job, instance, metricName)
if err != nil {
return errors.Wrap(err, "get metadata")
return recoverableError{Error: errors.Wrap(err, "get metadata")}
}
if metadata == nil {
// The full name didn't turn anything up. Check again in case it's a summary,
Expand All @@ -397,7 +409,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error {
if baseMetricName, suffix, ok = stripComplexMetricSuffix(metricName); ok {
metadata, err = c.metadata.Get(ctx, job, instance, baseMetricName)
if err != nil {
return errors.Wrap(err, "get metadata")
return recoverableError{Error: errors.Wrap(err, "get metadata")}
}
}
if metadata == nil {
Expand Down
46 changes: 43 additions & 3 deletions retrieval/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
timestamp_pb "github.com/golang/protobuf/ptypes/timestamp"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/textparse"
Expand All @@ -31,6 +33,7 @@ import (
)

type sampleBuilder struct {
logger log.Logger
series seriesGetter
}

Expand All @@ -39,9 +42,9 @@ type sampleBuilder struct {
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
sample := samples[0]

entry, ok, err := b.series.get(ctx, sample.Ref)
entry, ok, err := b.getSeriesWithRetry(ctx, sample)
if err != nil {
return nil, 0, samples, errors.Wrap(err, "get series information")
return nil, 0, samples, err
}
if !ok {
return nil, 0, samples[1:], nil
Expand Down Expand Up @@ -128,6 +131,28 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
return &ts, entry.hash, samples[1:], nil
}

func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) {
backoff := time.Duration(0)
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to honor ctx.Done anymore. Is that intended?

Copy link
Contributor Author

@StevenYCChou StevenYCChou Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now ctx.Done() is captured in this function. If context cancellation happens, it will at least be caught here (though if ctx.Err() is thrown inside series.get(), it won't immediately handle it. It needs to wait until next iteration to check ctx.Done()).

I didn't re-check ctx.Done() after calling series.geT() is because I think checking ctx.Done() is better at the top of the for-loop for simplicity and readability.

select {
case <-ctx.Done():
return nil, false, ctx.Err()
default:
}
entry, ok, err = b.series.get(ctx, sample.Ref)
if err == nil {
break
}
if _, ok := err.(recoverableError); !ok {
return nil, false, err
}
level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err)
backoff = exponential(backoff)
time.Sleep(backoff)
}
return entry, ok, nil
}

const (
metricSuffixBucket = "_bucket"
metricSuffixSum = "_sum"
Expand Down Expand Up @@ -210,7 +235,7 @@ func (b *sampleBuilder) buildDistribution(
// until we hit a new metric.
Loop:
for i, s := range samples {
e, ok, err := b.series.get(ctx, s.Ref)
e, ok, err := b.getSeriesWithRetry(ctx, s)
if err != nil {
return nil, 0, samples, err
}
Expand Down Expand Up @@ -349,3 +374,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool {
// If one label set still has labels left, they are not equal.
return i == len(a) && j == len(b)
}

func exponential(d time.Duration) time.Duration {
const (
min = 10 * time.Millisecond
max = 2 * time.Second
)
d *= 2
if d < min {
d = min
}
if d > max {
d = max
}
return d
}