Skip to content

Commit 75a4be5

Browse files
committed
Refactor metrics structure
1 parent 9098488 commit 75a4be5

File tree

5 files changed

+375
-311
lines changed

5 files changed

+375
-311
lines changed

metrics/metric_snapshot.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2025 The libkpa Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import "time"
20+
21+
// MetricSnapshot represents a point-in-time view of metrics.
22+
type MetricSnapshot struct {
23+
stableValue float64
24+
panicValue float64
25+
readyPodCount int32
26+
timestamp time.Time
27+
}
28+
29+
// NewMetricSnapshot creates a new metric snapshot.
30+
func NewMetricSnapshot(stableValue, panicValue float64, readyPods int32, timestamp time.Time) *MetricSnapshot {
31+
return &MetricSnapshot{
32+
stableValue: stableValue,
33+
panicValue: panicValue,
34+
readyPodCount: readyPods,
35+
timestamp: timestamp,
36+
}
37+
}
38+
39+
// StableValue returns the metric value averaged over the stable window.
40+
func (s *MetricSnapshot) StableValue() float64 {
41+
return s.stableValue
42+
}
43+
44+
// PanicValue returns the metric value averaged over the panic window.
45+
func (s *MetricSnapshot) PanicValue() float64 {
46+
return s.panicValue
47+
}
48+
49+
// ReadyPodCount returns the number of ready pods.
50+
func (s *MetricSnapshot) ReadyPodCount() int32 {
51+
return s.readyPodCount
52+
}
53+
54+
// Timestamp returns when this snapshot was taken.
55+
func (s *MetricSnapshot) Timestamp() time.Time {
56+
return s.timestamp
57+
}

metrics/metrics_window.go renamed to metrics/time_window.go

Lines changed: 54 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -24,71 +24,51 @@ import (
2424
"time"
2525
)
2626

27-
type (
28-
// TimeWindow keeps buckets that have been collected at a certain time.
29-
TimeWindow struct {
30-
bucketsMutex sync.RWMutex
31-
// buckets is a ring buffer indexed by timeToIndex() % len(buckets).
32-
// Each element represents a certain granularity of time, and the total
33-
// represented duration adds up to a window length of time.
34-
buckets []float64
35-
36-
// firstWrite holds the time when the first write has been made.
37-
// This time is reset to `now` when the very first write happens,
38-
// or when a first write happens after `window` time of inactivity.
39-
// The difference between `now` and `firstWrite` is used to compute
40-
// the number of eligible buckets for computation of average values.
41-
firstWrite time.Time
42-
43-
// lastWrite stores the time when the last write was made.
44-
// This is used to detect when we have gaps in the data (i.e. more than a
45-
// granularity has expired since the last write) so that we can zero those
46-
// entries in the buckets array. It is also used when calculating the
47-
// WindowAverage to know how much of the buckets array represents valid data.
48-
lastWrite time.Time
49-
50-
// granularity is the duration represented by each bucket in the buckets ring buffer.
51-
granularity time.Duration
52-
// window is the total time represented by the buckets ring buffer.
53-
window time.Duration
54-
// The total sum of all buckets within the window. This total includes
55-
// invalid buckets, e.g. buckets written to before firstTime or after
56-
// lastTime are included in this total.
57-
windowTotal float64
58-
}
59-
60-
// WeightedTimeWindow is the implementation of buckets, that
61-
// uses weighted average algorithm.
62-
WeightedTimeWindow struct {
63-
*TimeWindow
64-
65-
// smoothingCoeff contains the speed with which the importance
66-
// of items in the past decays. The larger the faster weights decay.
67-
// It is autocomputed from window size and weightPrecision constant
68-
// and is bounded by minExponent below.
69-
smoothingCoeff float64
70-
}
27+
const (
28+
// minExponent is the minimal decay multiplier for the weighted average computations.
29+
minExponent = 0.2
30+
// The sum of weights for weighted average should be at least this much.
31+
weightPrecision = 0.9999
32+
precision = 6
7133
)
7234

35+
// TimeWindow keeps buckets that have been collected at a certain time.
36+
type TimeWindow struct {
37+
bucketsMutex sync.RWMutex
38+
// buckets is a ring buffer indexed by timeToIndex() % len(buckets).
39+
// Each element represents a certain granularity of time, and the total
40+
// represented duration adds up to a window length of time.
41+
buckets []float64
42+
43+
// firstWrite holds the time when the first write has been made.
44+
// This time is reset to `now` when the very first write happens,
45+
// or when a first write happens after `window` time of inactivity.
46+
// The difference between `now` and `firstWrite` is used to compute
47+
// the number of eligible buckets for computation of average values.
48+
firstWrite time.Time
49+
50+
// lastWrite stores the time when the last write was made.
51+
// This is used to detect when we have gaps in the data (i.e. more than a
52+
// granularity has expired since the last write) so that we can zero those
53+
// entries in the buckets array. It is also used when calculating the
54+
// WindowAverage to know how much of the buckets array represents valid data.
55+
lastWrite time.Time
56+
57+
// granularity is the duration represented by each bucket in the buckets ring buffer.
58+
granularity time.Duration
59+
// window is the total time represented by the buckets ring buffer.
60+
window time.Duration
61+
// The total sum of all buckets within the window. This total includes
62+
// invalid buckets, e.g. buckets written to before firstTime or after
63+
// lastTime are included in this total.
64+
windowTotal float64
65+
}
66+
7367
// String implements the Stringer interface.
7468
func (t *TimeWindow) String() string {
7569
return fmt.Sprintf("%v", t.buckets)
7670
}
7771

78-
// computeSmoothingCoeff computes the decay given number of buckets.
79-
// The function uses precision and min exponent value constants.
80-
func computeSmoothingCoeff(nb float64) float64 {
81-
return math.Max(
82-
// Given number of buckets, infer the desired multiplier
83-
// so that at least weightPrecision sum of buckets is met.
84-
1-math.Pow(1-weightPrecision, 1/nb),
85-
// If it's smaller than minExponent — then use minExponent,
86-
// otherwise with extremely large windows we basically end up
87-
// very close to the simple average.
88-
minExponent,
89-
)
90-
}
91-
9272
// NewTimeWindow generates a new TimeWindow with the given
9373
// granularity.
9474
func NewTimeWindow(window, granularity time.Duration) *TimeWindow {
@@ -109,18 +89,6 @@ func NewTimeWindow(window, granularity time.Duration) *TimeWindow {
10989
}
11090
}
11191

112-
// NewWeightedTimeWindow generates a new WeightedTimeWindow with the given
113-
// granularity.
114-
func NewWeightedTimeWindow(window, granularity time.Duration) *WeightedTimeWindow {
115-
// Number of buckets is `window` divided by `granularity`, rounded up.
116-
// e.g. 60s / 2s = 30.
117-
nb := math.Ceil(float64(window) / float64(granularity))
118-
return &WeightedTimeWindow{
119-
TimeWindow: NewTimeWindow(window, granularity),
120-
smoothingCoeff: computeSmoothingCoeff(nb),
121-
}
122-
}
123-
12492
// IsEmpty returns true if no data has been recorded for the `window` period.
12593
func (t *TimeWindow) IsEmpty(now time.Time) bool {
12694
now = now.Truncate(t.granularity)
@@ -134,62 +102,6 @@ func (t *TimeWindow) isEmptyLocked(now time.Time) bool {
134102
return now.Sub(t.lastWrite) > t.window
135103
}
136104

137-
// roundToNDigits rounds a float64 to n decimal places.
138-
func roundToNDigits(n int, f float64) float64 {
139-
p := math.Pow10(n)
140-
return math.Round(f*p) / p
141-
}
142-
143-
const (
144-
// minExponent is the minimal decay multiplier for the weighted average computations.
145-
minExponent = 0.2
146-
// The sum of weights for weighted average should be at least this much.
147-
weightPrecision = 0.9999
148-
precision = 6
149-
)
150-
151-
// WindowAverage returns the exponential weighted average. This means
152-
// that more recent items have much greater impact on the average than
153-
// the older ones.
154-
// TODO(vagababov): optimize for O(1) computation, if possible.
155-
// E.g. with data [10, 10, 5, 5] (newest last), then
156-
// the `WindowAverage` would return (10+10+5+5)/4 = 7.5
157-
// This with exponent of 0.6 would return 5*0.6+5*0.6*0.4+10*0.6*0.4^2+10*0.6*0.4^3 = 5.544
158-
// If we reverse the data to [5, 5, 10, 10] the simple average would remain the same,
159-
// but this one would change to 9.072.
160-
func (t *WeightedTimeWindow) WindowAverage(now time.Time) float64 {
161-
now = now.Truncate(t.granularity)
162-
t.bucketsMutex.RLock()
163-
defer t.bucketsMutex.RUnlock()
164-
if t.isEmptyLocked(now) {
165-
return 0
166-
}
167-
168-
totalB := len(t.buckets)
169-
numB := len(t.buckets)
170-
171-
multiplier := t.smoothingCoeff
172-
// We start with 0es. But we know that we have _some_ data because
173-
// IsEmpty returned false.
174-
if now.After(t.lastWrite) {
175-
numZ := now.Sub(t.lastWrite) / t.granularity
176-
// Skip to this multiplier directly: m*(1-m)^(nz-1).
177-
multiplier *= math.Pow(1-t.smoothingCoeff, float64(numZ))
178-
// Reduce effective number of buckets.
179-
numB -= int(numZ)
180-
}
181-
startIdx := t.timeToIndex(t.lastWrite) + totalB // To ensure always positive % operation.
182-
ret := 0.
183-
for i := range numB {
184-
effectiveIdx := (startIdx - i) % totalB
185-
v := t.buckets[effectiveIdx] * multiplier
186-
ret += v
187-
multiplier *= (1 - t.smoothingCoeff)
188-
// TODO(vagababov): bail out if sm > weightPrecision?
189-
}
190-
return ret
191-
}
192-
193105
// WindowAverage returns the average bucket value over the window.
194106
//
195107
// If the first write was less than the window length ago, an average is
@@ -302,12 +214,6 @@ func (t *TimeWindow) Record(now time.Time, value float64) {
302214
t.windowTotal += value
303215
}
304216

305-
// ResizeWindow implements window resizing for the weighted averaging buckets object.
306-
func (t *WeightedTimeWindow) ResizeWindow(w time.Duration) {
307-
t.TimeWindow.ResizeWindow(w)
308-
t.smoothingCoeff = computeSmoothingCoeff(math.Ceil(float64(w) / float64(t.granularity)))
309-
}
310-
311217
// ResizeWindow resizes the window. This is an O(N) operation,
312218
// and is not supposed to be executed very often.
313219
func (t *TimeWindow) ResizeWindow(w time.Duration) {
@@ -357,40 +263,22 @@ func (t *TimeWindow) ResizeWindow(w time.Duration) {
357263
t.windowTotal = newTotal
358264
}
359265

360-
// MetricSnapshot represents a point-in-time view of metrics.
361-
type MetricSnapshot struct {
362-
stableValue float64
363-
panicValue float64
364-
readyPodCount int32
365-
timestamp time.Time
366-
}
367-
368-
// NewMetricSnapshot creates a new metric snapshot.
369-
func NewMetricSnapshot(stableValue, panicValue float64, readyPods int32, timestamp time.Time) *MetricSnapshot {
370-
return &MetricSnapshot{
371-
stableValue: stableValue,
372-
panicValue: panicValue,
373-
readyPodCount: readyPods,
374-
timestamp: timestamp,
375-
}
376-
}
377-
378-
// StableValue returns the metric value averaged over the stable window.
379-
func (s *MetricSnapshot) StableValue() float64 {
380-
return s.stableValue
381-
}
382-
383-
// PanicValue returns the metric value averaged over the panic window.
384-
func (s *MetricSnapshot) PanicValue() float64 {
385-
return s.panicValue
386-
}
387-
388-
// ReadyPodCount returns the number of ready pods.
389-
func (s *MetricSnapshot) ReadyPodCount() int32 {
390-
return s.readyPodCount
266+
// roundToNDigits rounds a float64 to n decimal places.
267+
func roundToNDigits(n int, f float64) float64 {
268+
p := math.Pow10(n)
269+
return math.Round(f*p) / p
391270
}
392271

393-
// Timestamp returns when this snapshot was taken.
394-
func (s *MetricSnapshot) Timestamp() time.Time {
395-
return s.timestamp
272+
// computeSmoothingCoeff computes the decay given number of buckets.
273+
// The function uses precision and min exponent value constants.
274+
func computeSmoothingCoeff(nb float64) float64 {
275+
return math.Max(
276+
// Given number of buckets, infer the desired multiplier
277+
// so that at least weightPrecision sum of buckets is met.
278+
1-math.Pow(1-weightPrecision, 1/nb),
279+
// If it's smaller than minExponent — then use minExponent,
280+
// otherwise with extremely large windows we basically end up
281+
// very close to the simple average.
282+
minExponent,
283+
)
396284
}

0 commit comments

Comments
 (0)