From ebb40e626985432424dc82013d4c8f7a9f3a20be Mon Sep 17 00:00:00 2001 From: StevenYCChou <3055688+StevenYCChou@users.noreply.github.com> Date: Tue, 5 Mar 2019 15:48:51 +0800 Subject: [PATCH 1/7] Avoid retry on non-recoverable errors. Only retry when calling seriesCache.get(). --- retrieval/manager.go | 29 ++++---------------------- retrieval/transform.go | 46 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index 9b83ce24..9495b892 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -166,7 +166,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error { ) go seriesCache.run(ctx) - builder := &sampleBuilder{series: seriesCache} + builder := &sampleBuilder{ + logger: r.logger, + series: seriesCache, + } // NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned // with performance. The WAL reader will do a lot of tiny reads otherwise. @@ -221,7 +224,6 @@ Outer: level.Error(r.logger).Log("error", err) continue } - backoff := time.Duration(0) // Do not increment the metric for produced samples each time but rather // once at the end. // Otherwise it will increase CPU usage by ~10%. @@ -233,19 +235,11 @@ Outer: break Outer default: } - // We intentionally don't use time.After in the select statement above - // since we'd unnecessarily spawn a new goroutine for each sample - // we process even when there are no errors. - if backoff > 0 { - time.Sleep(backoff) - } - var outputSample *monitoring_pb.TimeSeries var hash uint64 outputSample, hash, samples, err = builder.next(ctx, samples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) - backoff = exponential(backoff) continue } if outputSample == nil { @@ -343,18 +337,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 { } return h } - -func exponential(d time.Duration) time.Duration { - const ( - min = 10 * time.Millisecond - max = 2 * time.Second - ) - d *= 2 - if d < min { - d = min - } - if d > max { - d = max - } - return d -} diff --git a/retrieval/transform.go b/retrieval/transform.go index 2a1c5cae..f9f2449d 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -22,6 +22,8 @@ import ( "time" timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" @@ -31,6 +33,7 @@ import ( ) type sampleBuilder struct { + logger log.Logger series seriesGetter } @@ -39,10 +42,7 @@ type sampleBuilder struct { func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] - entry, ok, err := b.series.get(ctx, sample.Ref) - if err != nil { - return nil, 0, samples, errors.Wrap(err, "get series information") - } + entry, ok := b.seriesGetWithRetry(ctx, sample) if !ok { return nil, 0, samples[1:], nil } @@ -65,6 +65,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo } ts.Points = append(ts.Points, point) + var err error var resetTimestamp int64 switch entry.metadata.Type { @@ -128,6 +129,23 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo return &ts, entry.hash, samples[1:], nil } +func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) { + backoff := time.Duration(0) + entry, ok, err := b.series.get(ctx, sample.Ref) + for { + if err == nil { + break + } + level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) + backoff = exponential(backoff) + if backoff > 0 { + time.Sleep(backoff) + } + entry, ok, err = b.series.get(ctx, sample.Ref) + } + return entry, ok +} + const ( metricSuffixBucket = "_bucket" metricSuffixSum = "_sum" @@ -210,10 +228,7 @@ func (b *sampleBuilder) buildDistribution( // until we hit a new metric. Loop: for i, s := range samples { - e, ok, err := b.series.get(ctx, s.Ref) - if err != nil { - return nil, 0, samples, err - } + e, ok := b.seriesGetWithRetry(ctx, s) if !ok { consumed++ // TODO(fabxc): increment metric. @@ -349,3 +364,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool { // If one label set still has labels left, they are not equal. return i == len(a) && j == len(b) } + +func exponential(d time.Duration) time.Duration { + const ( + min = 10 * time.Millisecond + max = 2 * time.Second + ) + d *= 2 + if d < min { + d = min + } + if d > max { + d = max + } + return d +} From 17adc08a4ff6e4047852433899de6dbb637c4ead Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:20:05 +0000 Subject: [PATCH 2/7] Rename function; consider error from series cache; consider context.Done(). --- retrieval/transform.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index f9f2449d..3125a577 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -42,7 +42,10 @@ type sampleBuilder struct { func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] - entry, ok := b.seriesGetWithRetry(ctx, sample) + entry, ok, err := b.getSeriesWithRetry(ctx, sample) + if err != nil { + return nil, 0, samples, err + } if !ok { return nil, 0, samples[1:], nil } @@ -65,7 +68,6 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo } ts.Points = append(ts.Points, point) - var err error var resetTimestamp int64 switch entry.metadata.Type { @@ -129,10 +131,15 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo return &ts, entry.hash, samples[1:], nil } -func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) { +func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) { backoff := time.Duration(0) - entry, ok, err := b.series.get(ctx, sample.Ref) for { + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + default: + } + entry, ok, err = b.series.get(ctx, sample.Ref) if err == nil { break } @@ -141,9 +148,8 @@ func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefS if backoff > 0 { time.Sleep(backoff) } - entry, ok, err = b.series.get(ctx, sample.Ref) } - return entry, ok + return entry, ok, nil } const ( @@ -228,7 +234,10 @@ func (b *sampleBuilder) buildDistribution( // until we hit a new metric. Loop: for i, s := range samples { - e, ok := b.seriesGetWithRetry(ctx, s) + e, ok, err := b.getSeriesWithRetry(ctx, s) + if err != nil { + return nil, 0, samples, err + } if !ok { consumed++ // TODO(fabxc): increment metric. From de023f9e1b6665635acf247ab77872e252fda577 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:21:00 +0000 Subject: [PATCH 3/7] Do not retry on unrecoverable errors. For error caused by unexpected metric name suffix or by unexpected metric type, it should return as unrecoverable error because retrying doesn't help the situation. --- retrieval/series_cache.go | 8 ++++++-- retrieval/transform.go | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index d09f7045..85ddd40c 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -43,6 +43,10 @@ var ( keyReason, _ = tag.NewKey("reason") ) +type unrecoverableError struct { + error +} + func init() { if err := view.Register(&view.View{ Name: "prometheus_sidecar/dropped_series", @@ -447,14 +451,14 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ts.MetricKind = metric_pb.MetricDescriptor_GAUGE ts.ValueType = metric_pb.MetricDescriptor_DOUBLE default: - return errors.Errorf("unexpected metric name suffix %q", suffix) + return unrecoverableError{errors.Errorf("unexpected metric name suffix %q", suffix)} } case textparse.MetricTypeHistogram: ts.Metric.Type = c.getMetricType(c.metricsPrefix, baseMetricName) ts.MetricKind = metric_pb.MetricDescriptor_CUMULATIVE ts.ValueType = metric_pb.MetricDescriptor_DISTRIBUTION default: - return errors.Errorf("unexpected metric type %s", metadata.Type) + return unrecoverableError{errors.Errorf("unexpected metric type %s", metadata.Type)} } entry.proto = ts diff --git a/retrieval/transform.go b/retrieval/transform.go index 3125a577..70cc1000 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -143,6 +143,9 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS if err == nil { break } + if _, ok := err.(unrecoverableError); ok { + return nil, false, err + } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) backoff = exponential(backoff) if backoff > 0 { From 18c94e9b93bf10a67a48a79e254294f1094483be Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:21:22 +0000 Subject: [PATCH 4/7] Reorder import. --- retrieval/transform.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index 70cc1000..e75937dc 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -21,9 +21,9 @@ import ( "strings" "time" - timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" From a324a33bb0ab6cf8449f41620bf71ee61e05ff7a Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:22:38 +0000 Subject: [PATCH 5/7] Remove redundant condition checking for Sleep(). --- retrieval/transform.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index e75937dc..92b13d6a 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -148,9 +148,7 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) backoff = exponential(backoff) - if backoff > 0 { - time.Sleep(backoff) - } + time.Sleep(backoff) } return entry, ok, nil } From 027c2bf9b6a135035b1f795976d19b95cd0b27e0 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Tue, 12 Mar 2019 14:39:16 +0000 Subject: [PATCH 6/7] Rename error. --- retrieval/series_cache.go | 6 +++--- retrieval/transform.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index 85ddd40c..4edc18d3 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -43,7 +43,7 @@ var ( keyReason, _ = tag.NewKey("reason") ) -type unrecoverableError struct { +type unknownMetricError struct { error } @@ -451,14 +451,14 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ts.MetricKind = metric_pb.MetricDescriptor_GAUGE ts.ValueType = metric_pb.MetricDescriptor_DOUBLE default: - return unrecoverableError{errors.Errorf("unexpected metric name suffix %q", suffix)} + return unknownMetricError{errors.Errorf("unexpected metric name suffix %q", suffix)} } case textparse.MetricTypeHistogram: ts.Metric.Type = c.getMetricType(c.metricsPrefix, baseMetricName) ts.MetricKind = metric_pb.MetricDescriptor_CUMULATIVE ts.ValueType = metric_pb.MetricDescriptor_DISTRIBUTION default: - return unrecoverableError{errors.Errorf("unexpected metric type %s", metadata.Type)} + return unknownMetricError{errors.Errorf("unexpected metric type %s", metadata.Type)} } entry.proto = ts diff --git a/retrieval/transform.go b/retrieval/transform.go index 92b13d6a..dec1ed2a 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -143,7 +143,7 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS if err == nil { break } - if _, ok := err.(unrecoverableError); ok { + if _, ok := err.(unknownMetricError); ok { return nil, false, err } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) From abb1d33735f6fda3b7d25eda80e45f6b47fec8dd Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Thu, 29 Aug 2019 14:06:11 +0000 Subject: [PATCH 7/7] Use Recoverable Error. --- retrieval/series_cache.go | 22 +++++++++++++++------- retrieval/transform.go | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index 4edc18d3..f74474d1 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -43,8 +43,16 @@ var ( keyReason, _ = tag.NewKey("reason") ) -type unknownMetricError struct { - error +type recoverableError struct { + e &error +} + +func (r *recoverableError) Cause() error { + return r.e +} + +func (r *recoverableError) Error() string { + return r.e.Error() } func init() { @@ -349,7 +357,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { // If either of those pieces of data is missing, the series will be skipped. target, err := c.targets.Get(ctx, pkgLabels(entry.lset)) if err != nil { - return errors.Wrap(err, "retrieving target failed") + return recoverableError{e: errors.Wrap(err, "retrieving target failed")} } if target == nil { ctx, _ = tag.New(ctx, tag.Insert(keyReason, "target_not_found")) @@ -392,7 +400,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ) metadata, err := c.metadata.Get(ctx, job, instance, metricName) if err != nil { - return errors.Wrap(err, "get metadata") + return recoverableError{Error: errors.Wrap(err, "get metadata")} } if metadata == nil { // The full name didn't turn anything up. Check again in case it's a summary, @@ -401,7 +409,7 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { if baseMetricName, suffix, ok = stripComplexMetricSuffix(metricName); ok { metadata, err = c.metadata.Get(ctx, job, instance, baseMetricName) if err != nil { - return errors.Wrap(err, "get metadata") + return recoverableError{Error: errors.Wrap(err, "get metadata")} } } if metadata == nil { @@ -451,14 +459,14 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ts.MetricKind = metric_pb.MetricDescriptor_GAUGE ts.ValueType = metric_pb.MetricDescriptor_DOUBLE default: - return unknownMetricError{errors.Errorf("unexpected metric name suffix %q", suffix)} + return errors.Errorf("unexpected metric name suffix %q", suffix) } case textparse.MetricTypeHistogram: ts.Metric.Type = c.getMetricType(c.metricsPrefix, baseMetricName) ts.MetricKind = metric_pb.MetricDescriptor_CUMULATIVE ts.ValueType = metric_pb.MetricDescriptor_DISTRIBUTION default: - return unknownMetricError{errors.Errorf("unexpected metric type %s", metadata.Type)} + return errors.Errorf("unexpected metric type %s", metadata.Type) } entry.proto = ts diff --git a/retrieval/transform.go b/retrieval/transform.go index dec1ed2a..75a36698 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -143,7 +143,7 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS if err == nil { break } - if _, ok := err.(unknownMetricError); ok { + if _, ok := err.(recoverableError); !ok { return nil, false, err } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err)