Skip to content

Commit 9c3a6bf

Browse files
Implement xrate (extended rate), xincrease and xdelta.
1 parent c448ada commit 9c3a6bf

File tree

4 files changed

+408
-38
lines changed

4 files changed

+408
-38
lines changed

Diff for: promql/engine.go

+83-28
Original file line numberDiff line numberDiff line change
@@ -627,23 +627,30 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration {
627627
func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time {
628628
var maxOffset time.Duration
629629
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
630+
var nodeOffset time.Duration
630631
subqOffset := ng.cumulativeSubqueryOffset(path)
631632
switch n := node.(type) {
632633
case *parser.VectorSelector:
633-
if maxOffset < ng.lookbackDelta+subqOffset {
634-
maxOffset = ng.lookbackDelta + subqOffset
635-
}
636-
if n.Offset+ng.lookbackDelta+subqOffset > maxOffset {
637-
maxOffset = n.Offset + ng.lookbackDelta + subqOffset
634+
nodeOffset += ng.lookbackDelta + subqOffset
635+
if n.Offset > 0 {
636+
nodeOffset += n.Offset
638637
}
639638
case *parser.MatrixSelector:
640-
if maxOffset < n.Range+subqOffset {
641-
maxOffset = n.Range + subqOffset
639+
nodeOffset += n.Range + subqOffset
640+
if m := n.VectorSelector.(*parser.VectorSelector).Offset; m > 0 {
641+
nodeOffset += m
642642
}
643-
if m := n.VectorSelector.(*parser.VectorSelector).Offset + n.Range + subqOffset; m > maxOffset {
644-
maxOffset = m
643+
// Include an extra lookbackDelta iff this is the argument to an
644+
// extended range function. Extended ranges include one extra
645+
// point, this is how far back we need to look for it.
646+
f, ok := parser.Functions[extractFuncFromPath(path)]
647+
if ok && f.ExtRange {
648+
nodeOffset += ng.lookbackDelta
645649
}
646650
}
651+
if maxOffset < nodeOffset {
652+
maxOffset = nodeOffset
653+
}
647654
return nil
648655
})
649656
return s.Start.Add(-maxOffset)
@@ -678,18 +685,26 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
678685

679686
switch n := node.(type) {
680687
case *parser.VectorSelector:
688+
hints.Func = extractFuncFromPath(path)
689+
hints.By, hints.Grouping = extractGroupsFromPath(path)
690+
681691
if evalRange == 0 {
682692
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
683693
} else {
684694
hints.Range = durationMilliseconds(evalRange)
685695
// For all matrix queries we want to ensure that we have (end-start) + range selected
686696
// this way we have `range` data before the start time
687697
hints.Start = hints.Start - durationMilliseconds(evalRange)
698+
// Include an extra lookbackDelta iff this is the argument to an
699+
// extended range function. Extended ranges include one extra
700+
// point, this is how far back we need to look for it.
701+
f, ok := parser.Functions[hints.Func]
702+
if ok && f.ExtRange {
703+
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
704+
}
688705
evalRange = 0
689706
}
690707

691-
hints.Func = extractFuncFromPath(path)
692-
hints.By, hints.Grouping = extractGroupsFromPath(path)
693708
if n.Offset > 0 {
694709
offsetMilliseconds := durationMilliseconds(n.Offset)
695710
hints.Start = hints.Start - offsetMilliseconds
@@ -1108,7 +1123,15 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
11081123
mat := make(Matrix, 0, len(selVS.Series)) // Output matrix.
11091124
offset := durationMilliseconds(selVS.Offset)
11101125
selRange := durationMilliseconds(sel.Range)
1126+
bufferRange := selRange
11111127
stepRange := selRange
1128+
// Include an extra lookbackDelta iff this is an extended
1129+
// range function. Extended ranges include one extra point,
1130+
// this is how far back we need to look for it.
1131+
if e.Func.ExtRange {
1132+
bufferRange += durationMilliseconds(ev.lookbackDelta)
1133+
stepRange += durationMilliseconds(ev.lookbackDelta)
1134+
}
11121135
if stepRange > ev.interval {
11131136
stepRange = ev.interval
11141137
}
@@ -1118,7 +1141,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
11181141
inArgs[matrixArgIndex] = inMatrix
11191142
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
11201143
// Process all the calls for one time series at a time.
1121-
it := storage.NewBuffer(selRange)
1144+
it := storage.NewBuffer(bufferRange)
11221145
for i, s := range selVS.Series {
11231146
ev.currentSamples -= len(points)
11241147
points = points[:0]
@@ -1144,7 +1167,7 @@ func (ev *evaluator) eval(expr parser.Expr) parser.Value {
11441167
maxt := ts - offset
11451168
mint := maxt - selRange
11461169
// Evaluate the matrix selector for this series for this step.
1147-
points = ev.matrixIterSlice(it, mint, maxt, points)
1170+
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points)
11481171
if len(points) == 0 {
11491172
continue
11501173
}
@@ -1453,7 +1476,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
14531476
Metric: series[i].Labels(),
14541477
}
14551478

1456-
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
1479+
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16))
14571480

14581481
if len(ss.Points) > 0 {
14591482
matrix = append(matrix, ss)
@@ -1469,24 +1492,39 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) Matrix {
14691492
//
14701493
// As an optimization, the matrix vector may already contain points of the same
14711494
// time series from the evaluation of an earlier step (with lower mint and maxt
1472-
// values). Any such points falling before mint are discarded; points that fall
1473-
// into the [mint, maxt] range are retained; only points with later timestamps
1474-
// are populated from the iterator.
1475-
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
1476-
if len(out) > 0 && out[len(out)-1].T >= mint {
1495+
// values). Any such points falling before mint (except the last, when extRange
1496+
// is true) are discarded; points that fall into the [mint, maxt] range are
1497+
// retained; only points with later timestamps are populated from the iterator.
1498+
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point {
1499+
extMint := mint - durationMilliseconds(ev.lookbackDelta)
1500+
if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) {
14771501
// There is an overlap between previous and current ranges, retain common
14781502
// points. In most such cases:
14791503
// (a) the overlap is significantly larger than the eval step; and/or
14801504
// (b) the number of samples is relatively small.
14811505
// so a linear search will be as fast as a binary search.
14821506
var drop int
1483-
for drop = 0; out[drop].T < mint; drop++ {
1507+
if !extRange {
1508+
for drop = 0; out[drop].T < mint; drop++ {
1509+
}
1510+
// Only append points with timestamps after the last timestamp we have.
1511+
mint = out[len(out)-1].T + 1
1512+
} else {
1513+
// This is an argument to an extended range function, first go past mint.
1514+
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
1515+
}
1516+
// Then, go back one sample if within lookbackDelta of mint.
1517+
if drop > 0 && out[drop-1].T >= extMint {
1518+
drop--
1519+
}
1520+
if out[len(out)-1].T >= mint {
1521+
// Only append points with timestamps after the last timestamp we have.
1522+
mint = out[len(out)-1].T + 1
1523+
}
14841524
}
14851525
ev.currentSamples -= drop
14861526
copy(out, out[drop:])
14871527
out = out[:len(out)-drop]
1488-
// Only append points with timestamps after the last timestamp we have.
1489-
mint = out[len(out)-1].T + 1
14901528
} else {
14911529
ev.currentSamples -= len(out)
14921530
out = out[:0]
@@ -1500,18 +1538,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
15001538
}
15011539

15021540
buf := it.Buffer()
1541+
appendedPointBeforeMint := len(out) > 0
15031542
for buf.Next() {
15041543
t, v := buf.At()
15051544
if value.IsStaleNaN(v) {
15061545
continue
15071546
}
1508-
// Values in the buffer are guaranteed to be smaller than maxt.
1509-
if t >= mint {
1510-
if ev.currentSamples >= ev.maxSamples {
1511-
ev.error(ErrTooManySamples(env))
1547+
if !extRange {
1548+
// Values in the buffer are guaranteed to be smaller than maxt.
1549+
if t >= mint {
1550+
if ev.currentSamples >= ev.maxSamples {
1551+
ev.error(ErrTooManySamples(env))
1552+
}
1553+
out = append(out, Point{T: t, V: v})
1554+
ev.currentSamples++
1555+
}
1556+
} else {
1557+
// This is the argument to an extended range function: if any point
1558+
// exists at or before range start, add it and then keep replacing
1559+
// it with later points while not yet (strictly) inside the range.
1560+
if t > mint || !appendedPointBeforeMint {
1561+
if ev.currentSamples >= ev.maxSamples {
1562+
ev.error(ErrTooManySamples(env))
1563+
}
1564+
out = append(out, Point{T: t, V: v})
1565+
ev.currentSamples++
1566+
appendedPointBeforeMint = true
1567+
} else {
1568+
out[len(out)-1] = Point{T: t, V: v}
15121569
}
1513-
out = append(out, Point{T: t, V: v})
1514-
ev.currentSamples++
15151570
}
15161571
}
15171572
// The seeked sample might also be in the range.

Diff for: promql/functions.go

+112
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
package promql
1515

1616
import (
17+
"fmt"
1718
"math"
19+
"os"
1820
"regexp"
1921
"sort"
2022
"strconv"
@@ -131,6 +133,73 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
131133
})
132134
}
133135

136+
// extendedRate is a utility function for xrate/xincrease/xdelta.
137+
// It calculates the rate (allowing for counter resets if isCounter is true),
138+
// taking into account the last sample before the range start, and returns
139+
// the result as either per-second (if isRate is true) or overall.
140+
func extendedRate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
141+
ms := args[0].(*parser.MatrixSelector)
142+
vs := ms.VectorSelector.(*parser.VectorSelector)
143+
144+
var (
145+
samples = vals[0].(Matrix)[0]
146+
rangeStart = enh.ts - durationMilliseconds(ms.Range+vs.Offset)
147+
rangeEnd = enh.ts - durationMilliseconds(vs.Offset)
148+
)
149+
150+
points := samples.Points
151+
if len(points) < 2 {
152+
return enh.out
153+
}
154+
sampledRange := float64(points[len(points)-1].T - points[0].T)
155+
averageInterval := sampledRange / float64(len(points)-1)
156+
157+
firstPoint := 0
158+
// If the point before the range is too far from rangeStart, drop it.
159+
if float64(rangeStart-points[0].T) > averageInterval {
160+
if len(points) < 3 {
161+
return enh.out
162+
}
163+
firstPoint = 1
164+
sampledRange = float64(points[len(points)-1].T - points[1].T)
165+
averageInterval = sampledRange / float64(len(points)-2)
166+
}
167+
168+
var (
169+
counterCorrection float64
170+
lastValue float64
171+
)
172+
if isCounter {
173+
for i := firstPoint; i < len(points); i++ {
174+
sample := points[i]
175+
if sample.V < lastValue {
176+
counterCorrection += lastValue
177+
}
178+
lastValue = sample.V
179+
}
180+
}
181+
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection
182+
183+
// Duration between last sample and boundary of range.
184+
durationToEnd := float64(rangeEnd - points[len(points)-1].T)
185+
186+
// If the points cover the whole range (i.e. they start just before the
187+
// range start and end just before the range end) adjust the value from
188+
// the sampled range to the requested range.
189+
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
190+
adjustToRange := float64(durationMilliseconds(ms.Range))
191+
resultValue = resultValue * (adjustToRange / sampledRange)
192+
}
193+
194+
if isRate {
195+
resultValue = resultValue / ms.Range.Seconds()
196+
}
197+
198+
return append(enh.out, Sample{
199+
Point: Point{V: resultValue},
200+
})
201+
}
202+
134203
// === delta(Matrix parser.ValueTypeMatrix) Vector ===
135204
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
136205
return extrapolatedRate(vals, args, enh, false, false)
@@ -146,6 +215,21 @@ func funcIncrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
146215
return extrapolatedRate(vals, args, enh, true, false)
147216
}
148217

218+
// === xdelta(Matrix parser.ValueTypeMatrix) Vector ===
219+
func funcXdelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
220+
return extendedRate(vals, args, enh, false, false)
221+
}
222+
223+
// === xrate(node parser.ValueTypeMatrix) Vector ===
224+
func funcXrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
225+
return extendedRate(vals, args, enh, true, true)
226+
}
227+
228+
// === xincrease(node parser.ValueTypeMatrix) Vector ===
229+
func funcXincrease(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
230+
return extendedRate(vals, args, enh, true, false)
231+
}
232+
149233
// === irate(node parser.ValueTypeMatrix) Vector ===
150234
func funcIrate(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
151235
return instantValue(vals, enh.out, true)
@@ -918,9 +1002,37 @@ var FunctionCalls = map[string]FunctionCall{
9181002
"time": funcTime,
9191003
"timestamp": funcTimestamp,
9201004
"vector": funcVector,
1005+
"xdelta": funcXdelta,
1006+
"xincrease": funcXincrease,
1007+
"xrate": funcXrate,
9211008
"year": funcYear,
9221009
}
9231010

1011+
func init() {
1012+
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
1013+
// with xrate functions. This allows for a drop-in replacement and Grafana
1014+
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
1015+
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
1016+
FunctionCalls["delta"] = FunctionCalls["xdelta"]
1017+
FunctionCalls["increase"] = FunctionCalls["xincrease"]
1018+
FunctionCalls["rate"] = FunctionCalls["xrate"]
1019+
delete(FunctionCalls, "xdelta")
1020+
delete(FunctionCalls, "xincrease")
1021+
delete(FunctionCalls, "xrate")
1022+
1023+
parser.Functions["delta"] = parser.Functions["xdelta"]
1024+
parser.Functions["increase"] = parser.Functions["xincrease"]
1025+
parser.Functions["rate"] = parser.Functions["xrate"]
1026+
parser.Functions["delta"].Name = "delta"
1027+
parser.Functions["increase"].Name = "increase"
1028+
parser.Functions["rate"].Name = "rate"
1029+
delete(parser.Functions, "xdelta")
1030+
delete(parser.Functions, "xincrease")
1031+
delete(parser.Functions, "xrate")
1032+
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
1033+
}
1034+
}
1035+
9241036
type vectorByValueHeap Vector
9251037

9261038
func (s vectorByValueHeap) Len() int {

Diff for: promql/parser/functions.go

+19
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Function struct {
2020
ArgTypes []ValueType
2121
Variadic int
2222
ReturnType ValueType
23+
ExtRange bool
2324
}
2425

2526
// Functions is a list of all functions supported by PromQL, including their types.
@@ -262,6 +263,24 @@ var Functions = map[string]*Function{
262263
ArgTypes: []ValueType{ValueTypeScalar},
263264
ReturnType: ValueTypeVector,
264265
},
266+
"xdelta": {
267+
Name: "xdelta",
268+
ArgTypes: []ValueType{ValueTypeMatrix},
269+
ReturnType: ValueTypeVector,
270+
ExtRange: true,
271+
},
272+
"xincrease": {
273+
Name: "xincrease",
274+
ArgTypes: []ValueType{ValueTypeMatrix},
275+
ReturnType: ValueTypeVector,
276+
ExtRange: true,
277+
},
278+
"xrate": {
279+
Name: "xrate",
280+
ArgTypes: []ValueType{ValueTypeMatrix},
281+
ReturnType: ValueTypeVector,
282+
ExtRange: true,
283+
},
265284
"year": {
266285
Name: "year",
267286
ArgTypes: []ValueType{ValueTypeVector},

0 commit comments

Comments
 (0)