Skip to content

Commit 8a2ec31

Browse files
committed
Apply xrate patch.
1 parent dbd1d58 commit 8a2ec31

File tree

3 files changed

+397
-37
lines changed

3 files changed

+397
-37
lines changed

promql/engine.go

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -479,23 +479,31 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
479479

480480
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) {
481481
var maxOffset time.Duration
482-
Inspect(s.Expr, func(node Node, _ []Node) error {
482+
483+
Inspect(s.Expr, func(node Node, path []Node) error {
484+
var nodeOffset time.Duration
483485
switch n := node.(type) {
484486
case *VectorSelector:
485-
if maxOffset < LookbackDelta {
486-
maxOffset = LookbackDelta
487-
}
488-
if n.Offset+LookbackDelta > maxOffset {
489-
maxOffset = n.Offset + LookbackDelta
487+
nodeOffset += LookbackDelta
488+
if n.Offset > 0 {
489+
nodeOffset += n.Offset
490490
}
491491
case *MatrixSelector:
492-
if maxOffset < n.Range {
493-
maxOffset = n.Range
492+
nodeOffset += n.Range
493+
if n.Offset > 0 {
494+
nodeOffset += n.Offset
494495
}
495-
if n.Offset+n.Range > maxOffset {
496-
maxOffset = n.Offset + n.Range
496+
// Include an extra LookbackDelta iff this is the argument to an
497+
// extended range function. Extended ranges include one extra
498+
// point, this is how far back we need to look for it.
499+
f, ok := getFunction(extractFuncFromPath(path))
500+
if ok && f.ExtRange {
501+
nodeOffset += LookbackDelta
497502
}
498503
}
504+
if maxOffset < nodeOffset {
505+
maxOffset = nodeOffset
506+
}
499507
return nil
500508
})
501509

@@ -540,6 +548,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
540548
// For all matrix queries we want to ensure that we have (end-start) + range selected
541549
// this way we have `range` data before the start time
542550
params.Start = params.Start - durationMilliseconds(n.Range)
551+
// Include an extra LookbackDelta iff this is the argument to an
552+
// extended range function. Extended ranges include one extra
553+
// point, this is how far back we need to look for it.
554+
f, ok := getFunction(params.Func)
555+
if ok && f.ExtRange {
556+
params.Start = params.Start - durationMilliseconds(LookbackDelta)
557+
}
543558
if n.Offset > 0 {
544559
offsetMilliseconds := durationMilliseconds(n.Offset)
545560
params.Start = params.Start - offsetMilliseconds
@@ -913,7 +928,15 @@ func (ev *evaluator) eval(expr Expr) Value {
913928
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
914929
offset := durationMilliseconds(sel.Offset)
915930
selRange := durationMilliseconds(sel.Range)
931+
bufferRange := selRange
916932
stepRange := selRange
933+
// Include an extra LookbackDelta iff this is an extended
934+
// range function. Extended ranges include one extra point,
935+
// this is how far back we need to look for it.
936+
if e.Func.ExtRange {
937+
bufferRange += durationMilliseconds(LookbackDelta)
938+
stepRange += durationMilliseconds(LookbackDelta)
939+
}
917940
if stepRange > ev.interval {
918941
stepRange = ev.interval
919942
}
@@ -923,7 +946,7 @@ func (ev *evaluator) eval(expr Expr) Value {
923946
inArgs[matrixArgIndex] = inMatrix
924947
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
925948
// Process all the calls for one time series at a time.
926-
it := storage.NewBuffer(selRange)
949+
it := storage.NewBuffer(bufferRange)
927950
for i, s := range sel.series {
928951
points = points[:0]
929952
it.Reset(s.Iterator())
@@ -948,7 +971,7 @@ func (ev *evaluator) eval(expr Expr) Value {
948971
maxt := ts - offset
949972
mint := maxt - selRange
950973
// Evaluate the matrix selector for this series for this step.
951-
points = ev.matrixIterSlice(it, mint, maxt, points)
974+
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points)
952975
if len(points) == 0 {
953976
continue
954977
}
@@ -1178,7 +1201,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
11781201
Metric: node.series[i].Labels(),
11791202
}
11801203

1181-
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
1204+
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16))
11821205

11831206
if len(ss.Points) > 0 {
11841207
matrix = append(matrix, ss)
@@ -1194,23 +1217,38 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
11941217
//
11951218
// As an optimization, the matrix vector may already contain points of the same
11961219
// time series from the evaluation of an earlier step (with lower mint and maxt
1197-
// values). Any such points falling before mint are discarded; points that fall
1198-
// into the [mint, maxt] range are retained; only points with later timestamps
1199-
// are populated from the iterator.
1200-
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
1201-
if len(out) > 0 && out[len(out)-1].T >= mint {
1220+
// values). Any such points falling before mint (except the last, when extRange
1221+
// is true) are discarded; points that fall into the [mint, maxt] range are
1222+
// retained; only points with later timestamps are populated from the iterator.
1223+
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point {
1224+
extMint := mint - durationMilliseconds(LookbackDelta)
1225+
if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) {
12021226
// There is an overlap between previous and current ranges, retain common
12031227
// points. In most such cases:
12041228
// (a) the overlap is significantly larger than the eval step; and/or
12051229
// (b) the number of samples is relatively small.
12061230
// so a linear search will be as fast as a binary search.
12071231
var drop int
1208-
for drop = 0; out[drop].T < mint; drop++ {
1232+
if !extRange {
1233+
for drop = 0; out[drop].T < mint; drop++ {
1234+
}
1235+
// Only append points with timestamps after the last timestamp we have.
1236+
mint = out[len(out)-1].T + 1
1237+
} else {
1238+
// This is an argument to an extended range function, first go past mint.
1239+
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
1240+
}
1241+
// Then, go back one sample if within LookbackDelta of mint.
1242+
if drop > 0 && out[drop-1].T >= extMint {
1243+
drop--
1244+
}
1245+
if out[len(out)-1].T >= mint {
1246+
// Only append points with timestamps after the last timestamp we have.
1247+
mint = out[len(out)-1].T + 1
1248+
}
12091249
}
12101250
copy(out, out[drop:])
12111251
out = out[:len(out)-drop]
1212-
// Only append points with timestamps after the last timestamp we have.
1213-
mint = out[len(out)-1].T + 1
12141252
} else {
12151253
out = out[:0]
12161254
}
@@ -1223,18 +1261,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
12231261
}
12241262

12251263
buf := it.Buffer()
1264+
appendedPointBeforeMint := len(out) > 0
12261265
for buf.Next() {
12271266
t, v := buf.At()
12281267
if value.IsStaleNaN(v) {
12291268
continue
12301269
}
1231-
// Values in the buffer are guaranteed to be smaller than maxt.
1232-
if t >= mint {
1233-
if ev.currentSamples >= ev.maxSamples {
1234-
ev.error(ErrTooManySamples(env))
1270+
if !extRange {
1271+
// Values in the buffer are guaranteed to be smaller than maxt.
1272+
if t >= mint {
1273+
if ev.currentSamples >= ev.maxSamples {
1274+
ev.error(ErrTooManySamples(env))
1275+
}
1276+
out = append(out, Point{T: t, V: v})
1277+
ev.currentSamples++
1278+
}
1279+
} else {
1280+
// This is the argument to an extended range function: if any point
1281+
// exists at or before range start, add it and then keep replacing
1282+
// it with later points while not yet (strictly) inside the range.
1283+
if t > mint || !appendedPointBeforeMint {
1284+
if ev.currentSamples >= ev.maxSamples {
1285+
ev.error(ErrTooManySamples(env))
1286+
}
1287+
out = append(out, Point{T: t, V: v})
1288+
ev.currentSamples++
1289+
appendedPointBeforeMint = true
1290+
} else {
1291+
out[len(out)-1] = Point{T: t, V: v}
12351292
}
1236-
out = append(out, Point{T: t, V: v})
1237-
ev.currentSamples++
12381293
}
12391294
}
12401295
// The seeked sample might also be in the range.

promql/functions.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package promql
1616
import (
1717
"fmt"
1818
"math"
19+
"os"
1920
"regexp"
2021
"sort"
2122
"strconv"
@@ -33,6 +34,7 @@ type Function struct {
3334
ArgTypes []ValueType
3435
Variadic int
3536
ReturnType ValueType
37+
ExtRange bool
3638

3739
// vals is a list of the evaluated arguments for the function call.
3840
// For range vectors it will be a Matrix with one series, instant vectors a
@@ -138,6 +140,71 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou
138140
return enh.out
139141
}
140142

143+
func extendedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
144+
ms := args[0].(*MatrixSelector)
145+
matrix := vals[0].(Matrix)
146+
147+
var (
148+
rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset)
149+
rangeEnd = enh.ts - durationMilliseconds(ms.Offset)
150+
)
151+
152+
for _, samples := range matrix {
153+
points := samples.Points
154+
if len(points) < 2 {
155+
continue
156+
}
157+
sampledRange := float64(points[len(points)-1].T - points[0].T)
158+
averageInterval := sampledRange / float64(len(points)-1)
159+
160+
firstPoint := 0
161+
// If the point before the range is too far from rangeStart, drop it.
162+
if float64(rangeStart-points[0].T) > averageInterval {
163+
if len(points) < 3 {
164+
continue
165+
}
166+
firstPoint = 1
167+
sampledRange = float64(points[len(points)-1].T - points[1].T)
168+
averageInterval = sampledRange / float64(len(points)-2)
169+
}
170+
171+
var (
172+
counterCorrection float64
173+
lastValue float64
174+
)
175+
if isCounter {
176+
for i := firstPoint; i < len(points); i++ {
177+
sample := points[i]
178+
if sample.V < lastValue {
179+
counterCorrection += lastValue
180+
}
181+
lastValue = sample.V
182+
}
183+
}
184+
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection
185+
186+
// Duration between last sample and boundary of range.
187+
durationToEnd := float64(rangeEnd - points[len(points)-1].T)
188+
189+
// If the points cover the whole range (i.e. they start just before the
190+
// range start and end just before the range end) adjust the value from
191+
// the sampled range to the requested range.
192+
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
193+
adjustToRange := float64(durationMilliseconds(ms.Range))
194+
resultValue = resultValue * (adjustToRange / sampledRange)
195+
}
196+
197+
if isRate {
198+
resultValue = resultValue / ms.Range.Seconds()
199+
}
200+
201+
enh.out = append(enh.out, Sample{
202+
Point: Point{V: resultValue},
203+
})
204+
}
205+
return enh.out
206+
}
207+
141208
// === delta(Matrix ValueTypeMatrix) Vector ===
142209
func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
143210
return extrapolatedRate(vals, args, enh, false, false)
@@ -153,6 +220,21 @@ func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
153220
return extrapolatedRate(vals, args, enh, true, false)
154221
}
155222

223+
// === xdelta(Matrix ValueTypeMatrix) Vector ===
224+
func funcXdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
225+
return extendedRate(vals, args, enh, false, false)
226+
}
227+
228+
// === xrate(node ValueTypeMatrix) Vector ===
229+
func funcXrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
230+
return extendedRate(vals, args, enh, true, true)
231+
}
232+
233+
// === xincrease(node ValueTypeMatrix) Vector ===
234+
func funcXincrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
235+
return extendedRate(vals, args, enh, true, false)
236+
}
237+
156238
// === irate(node ValueTypeMatrix) Vector ===
157239
func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
158240
return instantValue(vals, enh.out, true)
@@ -1185,6 +1267,27 @@ var functions = map[string]*Function{
11851267
ReturnType: ValueTypeVector,
11861268
Call: funcVector,
11871269
},
1270+
"xdelta": {
1271+
Name: "xdelta",
1272+
ArgTypes: []ValueType{ValueTypeMatrix},
1273+
ReturnType: ValueTypeVector,
1274+
Call: funcXdelta,
1275+
ExtRange: true,
1276+
},
1277+
"xincrease": {
1278+
Name: "xincrease",
1279+
ArgTypes: []ValueType{ValueTypeMatrix},
1280+
ReturnType: ValueTypeVector,
1281+
Call: funcXincrease,
1282+
ExtRange: true,
1283+
},
1284+
"xrate": {
1285+
Name: "xrate",
1286+
ArgTypes: []ValueType{ValueTypeMatrix},
1287+
ReturnType: ValueTypeVector,
1288+
Call: funcXrate,
1289+
ExtRange: true,
1290+
},
11881291
"year": {
11891292
Name: "year",
11901293
ArgTypes: []ValueType{ValueTypeVector},
@@ -1194,6 +1297,24 @@ var functions = map[string]*Function{
11941297
},
11951298
}
11961299

1300+
func init() {
1301+
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
1302+
// with xrate functions. This allows for a drop-in replacement and Grafana
1303+
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
1304+
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
1305+
functions["delta"] = functions["xdelta"]
1306+
functions["increase"] = functions["xincrease"]
1307+
functions["rate"] = functions["xrate"]
1308+
functions["delta"].Name = "delta"
1309+
functions["increase"].Name = "increase"
1310+
functions["rate"].Name = "rate"
1311+
delete(functions, "xdelta")
1312+
delete(functions, "xincrease")
1313+
delete(functions, "xrate")
1314+
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
1315+
}
1316+
}
1317+
11971318
// getFunction returns a predefined Function object for the given name.
11981319
func getFunction(name string) (*Function, bool) {
11991320
function, ok := functions[name]

0 commit comments

Comments
 (0)