Skip to content

Commit 0a15f54

Browse files
committed
Merge branch 'xrate' into xrate_v2.4.0
2 parents 068eaa5 + ef1967e commit 0a15f54

3 files changed

Lines changed: 378 additions & 34 deletions

File tree

promql/engine.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -448,23 +448,31 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
448448

449449
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
450450
var maxOffset time.Duration
451-
Inspect(s.Expr, func(node Node, _ []Node) error {
451+
452+
Inspect(s.Expr, func(node Node, path []Node) error {
453+
var nodeOffset time.Duration
452454
switch n := node.(type) {
453455
case *VectorSelector:
454-
if maxOffset < LookbackDelta {
455-
maxOffset = LookbackDelta
456-
}
457-
if n.Offset+LookbackDelta > maxOffset {
458-
maxOffset = n.Offset + LookbackDelta
456+
nodeOffset += LookbackDelta
457+
if n.Offset > 0 {
458+
nodeOffset += n.Offset
459459
}
460460
case *MatrixSelector:
461-
if maxOffset < n.Range {
462-
maxOffset = n.Range
461+
nodeOffset += n.Range
462+
if n.Offset > 0 {
463+
nodeOffset += n.Offset
463464
}
464-
if n.Offset+n.Range > maxOffset {
465-
maxOffset = n.Offset + n.Range
465+
// Include an extra LookbackDelta iff this is the argument to an
466+
// extended range function. Extended ranges include one extra
467+
// point, this is how far back we need to look for it.
468+
f, ok := getFunction(extractFuncFromPath(path))
469+
if ok && f.ExtRange {
470+
nodeOffset += LookbackDelta
466471
}
467472
}
473+
if maxOffset < nodeOffset {
474+
maxOffset = nodeOffset
475+
}
468476
return nil
469477
})
470478

@@ -832,13 +840,20 @@ func (ev *evaluator) eval(expr Expr) Value {
832840
if stepRange > ev.interval {
833841
stepRange = ev.interval
834842
}
843+
bufferRange := selRange
844+
// Include an extra LookbackDelta iff this is an extended
845+
// range function. Extended ranges include one extra point,
846+
// this is how far back we need to look for it.
847+
if e.Func.ExtRange {
848+
bufferRange += durationMilliseconds(LookbackDelta)
849+
}
835850
// Reuse objects across steps to save memory allocations.
836851
points := getPointSlice(16)
837852
inMatrix := make(Matrix, 1)
838853
inArgs[matrixArgIndex] = inMatrix
839854
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
840855
// Process all the calls for one time series at a time.
841-
it := storage.NewBuffer(selRange)
856+
it := storage.NewBuffer(bufferRange)
842857
for i, s := range sel.series {
843858
points = points[:0]
844859
it.Reset(s.Iterator())
@@ -863,7 +878,7 @@ func (ev *evaluator) eval(expr Expr) Value {
863878
maxt := ts - offset
864879
mint := maxt - selRange
865880
// Evaluate the matrix selector for this series for this step.
866-
points = ev.matrixIterSlice(it, mint, maxt, points)
881+
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points)
867882
if len(points) == 0 {
868883
continue
869884
}
@@ -1060,7 +1075,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
10601075
Metric: node.series[i].Labels(),
10611076
}
10621077

1063-
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
1078+
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16))
10641079

10651080
if len(ss.Points) > 0 {
10661081
matrix = append(matrix, ss)
@@ -1076,23 +1091,34 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
10761091
//
10771092
// As an optimization, the matrix vector may already contain points of the same
10781093
// time series from the evaluation of an earlier step (with lower mint and maxt
1079-
// values). Any such points falling before mint are discarded; points that fall
1080-
// into the [mint, maxt] range are retained; only points with later timestamps
1081-
// are populated from the iterator.
1082-
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
1083-
if len(out) > 0 && out[len(out)-1].T >= mint {
1094+
// values). Any such points falling before mint (except the last, when extRange
1095+
// is true) are discarded; points that fall into the [mint, maxt] range are
1096+
// retained; only points with later timestamps are populated from the iterator.
1097+
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point {
1098+
extMint := mint - durationMilliseconds(LookbackDelta)
1099+
if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) {
10841100
// There is an overlap between previous and current ranges, retain common
10851101
// points. In most such cases:
10861102
// (a) the overlap is significantly larger than the eval step; and/or
10871103
// (b) the number of samples is relatively small.
10881104
// so a linear search will be as fast as a binary search.
10891105
var drop int
1090-
for drop = 0; out[drop].T < mint; drop++ {
1106+
if !extRange {
1107+
for drop = 0; out[drop].T < mint; drop++ {
1108+
}
1109+
// Only append points with timestamps after the last timestamp we have.
1110+
mint = out[len(out)-1].T + 1
1111+
} else {
1112+
// This is an argument to an extended range function, first go past mint.
1113+
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
1114+
}
1115+
// Then, go back one sample if within LookbackDelta of mint.
1116+
if drop > 0 && out[drop-1].T >= extMint {
1117+
drop--
1118+
}
10911119
}
10921120
copy(out, out[drop:])
10931121
out = out[:len(out)-drop]
1094-
// Only append points with timestamps after the last timestamp we have.
1095-
mint = out[len(out)-1].T + 1
10961122
} else {
10971123
out = out[:0]
10981124
}
@@ -1105,14 +1131,27 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
11051131
}
11061132

11071133
buf := it.Buffer()
1134+
appendedPointBeforeMint := len(out) > 0
11081135
for buf.Next() {
11091136
t, v := buf.At()
11101137
if value.IsStaleNaN(v) {
11111138
continue
11121139
}
1113-
// Values in the buffer are guaranteed to be smaller than maxt.
1114-
if t >= mint {
1115-
out = append(out, Point{T: t, V: v})
1140+
if !extRange {
1141+
// Values in the buffer are guaranteed to be smaller than maxt.
1142+
if t >= mint {
1143+
out = append(out, Point{T: t, V: v})
1144+
}
1145+
} else {
1146+
// This is the argument to an extended range function: if any point
1147+
// exists at or before range start, add it and then keep replacing
1148+
// it with later points while not yet (strictly) inside the range.
1149+
if t > mint || !appendedPointBeforeMint {
1150+
out = append(out, Point{T: t, V: v})
1151+
appendedPointBeforeMint = true
1152+
} else {
1153+
out[len(out)-1] = Point{T: t, V: v}
1154+
}
11161155
}
11171156
}
11181157
// The seeked sample might also be in the range.

promql/functions.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package promql
1616
import (
1717
"fmt"
1818
"math"
19+
"os"
1920
"regexp"
2021
"sort"
2122
"strconv"
@@ -33,6 +34,7 @@ type Function struct {
3334
ArgTypes []ValueType
3435
Variadic int
3536
ReturnType ValueType
37+
ExtRange bool
3638

3739
// vals is a list of the evaluated arguments for the function call.
3840
// For range vectors it will be a Matrix with one series, instant vectors a
@@ -138,6 +140,71 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou
138140
return enh.out
139141
}
140142

143+
func extendedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
144+
ms := args[0].(*MatrixSelector)
145+
matrix := vals[0].(Matrix)
146+
147+
var (
148+
rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset)
149+
rangeEnd = enh.ts - durationMilliseconds(ms.Offset)
150+
)
151+
152+
for _, samples := range matrix {
153+
points := samples.Points
154+
if len(points) < 2 {
155+
continue
156+
}
157+
sampledRange := float64(points[len(points)-1].T - points[0].T)
158+
averageInterval := sampledRange / float64(len(points)-1)
159+
160+
firstPoint := 0
161+
// If the point before the range is too far from rangeStart, drop it.
162+
if float64(rangeStart-points[0].T) > averageInterval {
163+
if len(points) < 3 {
164+
continue
165+
}
166+
firstPoint = 1
167+
sampledRange = float64(points[len(points)-1].T - points[1].T)
168+
averageInterval = sampledRange / float64(len(points)-2)
169+
}
170+
171+
var (
172+
counterCorrection float64
173+
lastValue float64
174+
)
175+
if isCounter {
176+
for i := firstPoint; i < len(points); i++ {
177+
sample := points[i]
178+
if sample.V < lastValue {
179+
counterCorrection += lastValue
180+
}
181+
lastValue = sample.V
182+
}
183+
}
184+
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection
185+
186+
// Duration between last sample and boundary of range.
187+
durationToEnd := float64(rangeEnd - points[len(points)-1].T)
188+
189+
// If the points cover the whole range (i.e. they start just before the
190+
// range start and end just before the range end) adjust the value from
191+
// the sampled range to the requested range.
192+
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
193+
adjustToRange := float64(durationMilliseconds(ms.Range))
194+
resultValue = resultValue * (adjustToRange / sampledRange)
195+
}
196+
197+
if isRate {
198+
resultValue = resultValue / ms.Range.Seconds()
199+
}
200+
201+
enh.out = append(enh.out, Sample{
202+
Point: Point{V: resultValue},
203+
})
204+
}
205+
return enh.out
206+
}
207+
141208
// === delta(Matrix ValueTypeMatrix) Vector ===
142209
func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
143210
return extrapolatedRate(vals, args, enh, false, false)
@@ -153,6 +220,21 @@ func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
153220
return extrapolatedRate(vals, args, enh, true, false)
154221
}
155222

223+
// === xdelta(Matrix ValueTypeMatrix) Vector ===
224+
func funcXdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
225+
return extendedRate(vals, args, enh, false, false)
226+
}
227+
228+
// === xrate(node ValueTypeMatrix) Vector ===
229+
func funcXrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
230+
return extendedRate(vals, args, enh, true, true)
231+
}
232+
233+
// === xincrease(node ValueTypeMatrix) Vector ===
234+
func funcXincrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
235+
return extendedRate(vals, args, enh, true, false)
236+
}
237+
156238
// === irate(node ValueTypeMatrix) Vector ===
157239
func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
158240
return instantValue(vals, enh.out, true)
@@ -1196,6 +1278,27 @@ var functions = map[string]*Function{
11961278
ReturnType: ValueTypeVector,
11971279
Call: funcVector,
11981280
},
1281+
"xdelta": {
1282+
Name: "xdelta",
1283+
ArgTypes: []ValueType{ValueTypeMatrix},
1284+
ReturnType: ValueTypeVector,
1285+
Call: funcXdelta,
1286+
ExtRange: true,
1287+
},
1288+
"xincrease": {
1289+
Name: "xincrease",
1290+
ArgTypes: []ValueType{ValueTypeMatrix},
1291+
ReturnType: ValueTypeVector,
1292+
Call: funcXincrease,
1293+
ExtRange: true,
1294+
},
1295+
"xrate": {
1296+
Name: "xrate",
1297+
ArgTypes: []ValueType{ValueTypeMatrix},
1298+
ReturnType: ValueTypeVector,
1299+
Call: funcXrate,
1300+
ExtRange: true,
1301+
},
11991302
"year": {
12001303
Name: "year",
12011304
ArgTypes: []ValueType{ValueTypeVector},
@@ -1205,6 +1308,24 @@ var functions = map[string]*Function{
12051308
},
12061309
}
12071310

1311+
func init() {
1312+
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
1313+
// with xrate functions. This allows for a drop-in replacement and Grafana
1314+
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
1315+
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
1316+
functions["delta"] = functions["xdelta"]
1317+
functions["increase"] = functions["xincrease"]
1318+
functions["rate"] = functions["xrate"]
1319+
functions["delta"].Name = "delta"
1320+
functions["increase"].Name = "increase"
1321+
functions["rate"].Name = "rate"
1322+
delete(functions, "xdelta")
1323+
delete(functions, "xincrease")
1324+
delete(functions, "xrate")
1325+
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
1326+
}
1327+
}
1328+
12081329
// getFunction returns a predefined Function object for the given name.
12091330
func getFunction(name string) (*Function, bool) {
12101331
function, ok := functions[name]

0 commit comments

Comments
 (0)