From 9c3a6bfe75b849ddc5d7d4d8739af5185d872f87 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Sun, 28 Jun 2020 22:29:08 +0200 Subject: [PATCH] Implement `xrate` (extended rate), `xincrease` and `xdelta`. --- promql/engine.go | 111 +++++++++++++----- promql/functions.go | 112 ++++++++++++++++++ promql/parser/functions.go | 19 +++ promql/testdata/functions.test | 204 +++++++++++++++++++++++++++++++-- 4 files changed, 408 insertions(+), 38 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index c7f02b7ae72..ad4efe19238 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -627,23 +627,30 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration { func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time { var maxOffset time.Duration parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { + var nodeOffset time.Duration subqOffset := ng.cumulativeSubqueryOffset(path) switch n := node.(type) { case *parser.VectorSelector: - if maxOffset < ng.lookbackDelta+subqOffset { - maxOffset = ng.lookbackDelta + subqOffset - } - if n.Offset+ng.lookbackDelta+subqOffset > maxOffset { - maxOffset = n.Offset + ng.lookbackDelta + subqOffset + nodeOffset += ng.lookbackDelta + subqOffset + if n.Offset > 0 { + nodeOffset += n.Offset } case *parser.MatrixSelector: - if maxOffset < n.Range+subqOffset { - maxOffset = n.Range + subqOffset + nodeOffset += n.Range + subqOffset + if m := n.VectorSelector.(*parser.VectorSelector).Offset; m > 0 { + nodeOffset += m } - if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset; m > maxOffset { - maxOffset = m + // Include an extra lookbackDelta iff this is the argument to an + // extended range function. Extended ranges include one extra + // point, this is how far back we need to look for it. + f, ok := parser.Functions[extractFuncFromPath(path)] + if ok && f.ExtRange { + nodeOffset += ng.lookbackDelta } } + if maxOffset < nodeOffset { + maxOffset = nodeOffset + } return nil }) return s.Start.Add(-maxOffset) @@ -678,6 +685,9 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s switch n := node.(type) { case *parser.VectorSelector: + hints.Func = extractFuncFromPath(path) + hints.By, hints.Grouping = extractGroupsFromPath(path) + if evalRange == 0 { hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta) } else { @@ -685,11 +695,16 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s // For all matrix queries we want to ensure that we have (end-start) + range selected // this way we have `range` data before the start time hints.Start = hints.Start - durationMilliseconds(evalRange) + // Include an extra lookbackDelta iff this is the argument to an + // extended range function. Extended ranges include one extra + // point, this is how far back we need to look for it. + f, ok := parser.Functions[hints.Func] + if ok && f.ExtRange { + hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta) + } evalRange = 0 } - hints.Func = extractFuncFromPath(path) - hints.By, hints.Grouping = extractGroupsFromPath(path) if n.Offset > 0 { offsetMilliseconds := durationMilliseconds(n.Offset) hints.Start = hints.Start - offsetMilliseconds @@ -1108,7 +1123,15 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value { mat := make(Matrix, 0, len(selVS.Series)) // Output matrix. offset := durationMilliseconds(selVS.Offset) selRange := durationMilliseconds(sel.Range) + bufferRange := selRange stepRange := selRange + // Include an extra lookbackDelta iff this is an extended + // range function. Extended ranges include one extra point, + // this is how far back we need to look for it. + if e.Func.ExtRange { + bufferRange += durationMilliseconds(ev.lookbackDelta) + stepRange += durationMilliseconds(ev.lookbackDelta) + } if stepRange > ev.interval { stepRange = ev.interval } @@ -1118,7 +1141,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value { inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. - it := storage.NewBuffer(selRange) + it := storage.NewBuffer(bufferRange) for i, s := range selVS.Series { ev.currentSamples -= len(points) points = points[:0] @@ -1144,7 +1167,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value { maxt := ts - offset mint := maxt - selRange // Evaluate the matrix selector for this series for this step. - points = ev.matrixIterSlice(it, mint, maxt, points) + points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points) if len(points) == 0 { continue } @@ -1453,7 +1476,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix { Metric: series[i].Labels(), } - ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) + ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16)) if len(ss.Points) > 0 { matrix = append(matrix, ss) @@ -1469,24 +1492,39 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix { // // As an optimization, the matrix vector may already contain points of the same // time series from the evaluation of an earlier step (with lower mint and maxt -// values). Any such points falling before mint are discarded; points that fall -// into the [mint, maxt] range are retained; only points with later timestamps -// are populated from the iterator. -func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { - if len(out) > 0 && out[len(out)-1].T >= mint { +// values). Any such points falling before mint (except the last, when extRange +// is true) are discarded; points that fall into the [mint, maxt] range are +// retained; only points with later timestamps are populated from the iterator. +func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point { + extMint := mint - durationMilliseconds(ev.lookbackDelta) + if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) { // There is an overlap between previous and current ranges, retain common // points. In most such cases: // (a) the overlap is significantly larger than the eval step; and/or // (b) the number of samples is relatively small. // so a linear search will be as fast as a binary search. var drop int - for drop = 0; out[drop].T < mint; drop++ { + if !extRange { + for drop = 0; out[drop].T < mint; drop++ { + } + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } else { + // This is an argument to an extended range function, first go past mint. + for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ { + } + // Then, go back one sample if within lookbackDelta of mint. + if drop > 0 && out[drop-1].T >= extMint { + drop-- + } + if out[len(out)-1].T >= mint { + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } } ev.currentSamples -= drop copy(out, out[drop:]) out = out[:len(out)-drop] - // Only append points with timestamps after the last timestamp we have. - mint = out[len(out)-1].T + 1 } else { ev.currentSamples -= len(out) out = out[:0] @@ -1500,18 +1538,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } buf := it.Buffer() + appendedPointBeforeMint := len(out) > 0 for buf.Next() { t, v := buf.At() if value.IsStaleNaN(v) { continue } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + if !extRange { + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + } + } else { + // This is the argument to an extended range function: if any point + // exists at or before range start, add it and then keep replacing + // it with later points while not yet (strictly) inside the range. + if t > mint || !appendedPointBeforeMint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + appendedPointBeforeMint = true + } else { + out[len(out)-1] = Point{T: t, V: v} } - out = append(out, Point{T: t, V: v}) - ev.currentSamples++ } } // The seeked sample might also be in the range. diff --git a/promql/functions.go b/promql/functions.go index 454ff4fa53a..79e3a909f50 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,7 +14,9 @@ package promql import ( + "fmt" "math" + "os" "regexp" "sort" "strconv" @@ -131,6 +133,73 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod }) } +// extendedRate is a utility function for xrate/xincrease/xdelta. +// It calculates the rate (allowing for counter resets if isCounter is true), +// taking into account the last sample before the range start, and returns +// the result as either per-second (if isRate is true) or overall. +func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { + ms := args[0].(*parser.MatrixSelector) + vs := ms.VectorSelector.(*parser.VectorSelector) + + var ( + samples = vals[0].(Matrix)[0] + rangeStart = enh.ts - durationMilliseconds(ms.Range+vs.Offset) + rangeEnd = enh.ts - durationMilliseconds(vs.Offset) + ) + + points := samples.Points + if len(points) < 2 { + return enh.out + } + sampledRange := float64(points[len(points)-1].T - points[0].T) + averageInterval := sampledRange / float64(len(points)-1) + + firstPoint := 0 + // If the point before the range is too far from rangeStart, drop it. + if float64(rangeStart-points[0].T) > averageInterval { + if len(points) < 3 { + return enh.out + } + firstPoint = 1 + sampledRange = float64(points[len(points)-1].T - points[1].T) + averageInterval = sampledRange / float64(len(points)-2) + } + + var ( + counterCorrection float64 + lastValue float64 + ) + if isCounter { + for i := firstPoint; i < len(points); i++ { + sample := points[i] + if sample.V < lastValue { + counterCorrection += lastValue + } + lastValue = sample.V + } + } + resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection + + // Duration between last sample and boundary of range. + durationToEnd := float64(rangeEnd - points[len(points)-1].T) + + // If the points cover the whole range (i.e. they start just before the + // range start and end just before the range end) adjust the value from + // the sampled range to the requested range. + if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval { + adjustToRange := float64(durationMilliseconds(ms.Range)) + resultValue = resultValue * (adjustToRange / sampledRange) + } + + if isRate { + resultValue = resultValue / ms.Range.Seconds() + } + + return append(enh.out, Sample{ + Point: Point{V: resultValue}, + }) +} + // === delta(Matrix parser.ValueTypeMatrix) Vector === func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return extrapolatedRate(vals, args, enh, false, false) @@ -146,6 +215,21 @@ func funcIncrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel return extrapolatedRate(vals, args, enh, true, false) } +// === xdelta(Matrix parser.ValueTypeMatrix) Vector === +func funcXdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, false, false) +} + +// === xrate(node parser.ValueTypeMatrix) Vector === +func funcXrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, true) +} + +// === xincrease(node parser.ValueTypeMatrix) Vector === +func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, false) +} + // === irate(node parser.ValueTypeMatrix) Vector === func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return instantValue(vals, enh.out, true) @@ -918,9 +1002,37 @@ var FunctionCalls = map[string]FunctionCall{ "time": funcTime, "timestamp": funcTimestamp, "vector": funcVector, + "xdelta": funcXdelta, + "xincrease": funcXincrease, + "xrate": funcXrate, "year": funcYear, } +func init() { + // REPLACE_RATE_FUNCS replaces the default rate extrapolation functions + // with xrate functions. This allows for a drop-in replacement and Grafana + // auto-completion, Prometheus tooling, Thanos, etc. should still work as expected. + if os.Getenv("REPLACE_RATE_FUNCS") == "1" { + FunctionCalls["delta"] = FunctionCalls["xdelta"] + FunctionCalls["increase"] = FunctionCalls["xincrease"] + FunctionCalls["rate"] = FunctionCalls["xrate"] + delete(FunctionCalls, "xdelta") + delete(FunctionCalls, "xincrease") + delete(FunctionCalls, "xrate") + + parser.Functions["delta"] = parser.Functions["xdelta"] + parser.Functions["increase"] = parser.Functions["xincrease"] + parser.Functions["rate"] = parser.Functions["xrate"] + parser.Functions["delta"].Name = "delta" + parser.Functions["increase"].Name = "increase" + parser.Functions["rate"].Name = "rate" + delete(parser.Functions, "xdelta") + delete(parser.Functions, "xincrease") + delete(parser.Functions, "xrate") + fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).") + } +} + type vectorByValueHeap Vector func (s vectorByValueHeap) Len() int { diff --git a/promql/parser/functions.go b/promql/parser/functions.go index 4516829e551..8d65e894bb7 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -20,6 +20,7 @@ type Function struct { ArgTypes []ValueType Variadic int ReturnType ValueType + ExtRange bool } // Functions is a list of all functions supported by PromQL, including their types. @@ -262,6 +263,24 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeScalar}, ReturnType: ValueTypeVector, }, + "xdelta": { + Name: "xdelta", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "xincrease": { + Name: "xincrease", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, + "xrate": { + Name: "xrate", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + ExtRange: true, + }, "year": { Name: "year", ArgTypes: []ValueType{ValueTypeVector}, diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index 7d36ccb3b7a..54a23b4c5e9 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -1,3 +1,138 @@ +# Comparison of rate vs xrate. + +load 5s + http_requests{path="/foo"} 1 1 1 2 2 2 2 2 3 3 3 + http_requests{path="/bar"} 1 2 3 4 5 6 7 8 9 10 11 + + +# +# Timeseries starts inside range, (presumably) goes on after range end. +# + +# 1. Reference eval, aligned with collection. +eval instant at 25s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .12 + +eval instant at 25s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 2. Eval 1 second earlier compared to (1). +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo" +eval instant at 24s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .116 + +eval instant at 24s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .08 + +# 3. Eval 1 second later compared to (1). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 26s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .124 + +eval instant at 26s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + + +# +# Timeseries starts before range, ends within range. +# + +# 4. Reference eval, aligned with collection. +eval instant at 75s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .11 + +eval instant at 75s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 5. Eval 1s earlier compared to (4). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 74s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .114 + +# XXX Higher instead of lower for {path="/bar"}. +eval instant at 74s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .12 + +# 6. Eval 1s later compared to (4). Rate/increase (should be) fractionally smaller. +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo", decrease instead of increase for path="/bar". +eval instant at 76s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .106 + +eval instant at 76s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# +# Evaluation of 10 second rate every 10 seconds, not aligned with collection. +# + +eval instant at 9s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 19s rate(http_requests[10s]) + {path="/foo"} 0.2 + {path="/bar"} 0.2 + +eval instant at 29s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Missed an increase in path="/foo" between timestamps 35 and 40 (both in this eval and the one before). +eval instant at 49s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 9s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.1 + +eval instant at 19s xrate(http_requests[10s]) + {path="/foo"} 0.1 + {path="/bar"} 0.2 + +eval instant at 29s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Sees the increase in path="/foo" between timestamps 35 and 40. +eval instant at 49s xrate(http_requests[10s]) + {path="/foo"} .1 + {path="/bar"} 0.2 + +clear + + + + + # Testdata for resets() and changes(). load 5m http_requests{path="/foo"} 1 2 3 0 1 0 0 1 2 0 @@ -67,23 +202,57 @@ eval instant at 15m changes(x[15m]) clear -# Tests for increase(). -load 5m +# Tests for increase()/xincrease()/xrate(). +load 5s http_requests{path="/foo"} 0+10x10 - http_requests{path="/bar"} 0+10x5 0+10x5 + http_requests{path="/bar"} 0+10x5 0+10x4 # Tests for increase(). -eval instant at 50m increase(http_requests[50m]) +eval instant at 50s increase(http_requests[50s]) + {path="/foo"} 100 + {path="/bar"} 90 + +eval instant at 50s increase(http_requests[100s]) {path="/foo"} 100 {path="/bar"} 90 -eval instant at 50m increase(http_requests[100m]) +# Tests for xincrease(). +eval instant at 50s xincrease(http_requests[50s]) {path="/foo"} 100 {path="/bar"} 90 +eval instant at 50s xincrease(http_requests[5s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 50s xincrease(http_requests[3s]) + {path="/foo"} 6 + {path="/bar"} 6 + +eval instant at 49s xincrease(http_requests[3s]) + +# Tests for xrate(). +eval instant at 50s xrate(http_requests[50s]) + {path="/foo"} 2 + {path="/bar"} 1.8 + +eval instant at 50s xrate(http_requests[100s]) + {path="/foo"} 1 + {path="/bar"} 0.9 + +eval instant at 50s xrate(http_requests[5s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 50s xrate(http_requests[3s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 49s xrate(http_requests[3s]) + clear -# Test for increase() with counter reset. +# Test for increase()/xincrease with counter reset. # When the counter is reset, it always starts at 0. # So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2. # Prometheus assumes it missed the intermediate values 0 and 1. @@ -91,7 +260,10 @@ load 5m http_requests{path="/foo"} 0 1 2 3 2 3 4 eval instant at 30m increase(http_requests[30m]) - {path="/foo"} 7 + {path="/foo"} 7 + +eval instant at 30m xincrease(http_requests[30m]) + {path="/foo"} 7 clear @@ -111,15 +283,27 @@ eval instant at 30m irate(http_requests[50m]) clear -# Tests for delta(). +# Tests for delta()/xdelta(). load 5m - http_requests{path="/foo"} 0 50 100 150 200 - http_requests{path="/bar"} 200 150 100 50 0 + http_requests{path="/foo"} 0 50 300 150 200 + http_requests{path="/bar"} 200 150 300 50 0 eval instant at 20m delta(http_requests[20m]) {path="/foo"} 200 {path="/bar"} -200 +eval instant at 20m xdelta(http_requests[20m]) + {path="/foo"} 200 + {path="/bar"} -200 + +eval instant at 20m xdelta(http_requests[19m]) + {path="/foo"} 190 + {path="/bar"} -190 + +eval instant at 20m xdelta(http_requests[1m]) + {path="/foo"} 10 + {path="/bar"} -10 + clear # Tests for idelta().