Skip to content

Commit 29b51e9

Browse files
authored
Merge pull request #6 from Fedosin/delay-window
Create dedicated delay window structure
2 parents 130bc1e + 40004a7 commit 29b51e9

File tree

7 files changed

+795
-286
lines changed

7 files changed

+795
-286
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func main() {
7373
- **`algorithm/`** - Autoscaling algorithm implementations (sliding window, panic mode)
7474
- **`metrics/`** - Time-windowed metric collection and aggregation
7575
- **`transmitter/`** - Metric reporting interfaces for monitoring integration
76+
- **`delaywindow/`** - Delay window collection and aggregation
7677

7778
## Documentation
7879

algorithm/sliding_window.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/Fedosin/libkpa/api"
27-
"github.com/Fedosin/libkpa/metrics"
27+
"github.com/Fedosin/libkpa/delaywindow"
2828
)
2929

3030
// SlidingWindowAutoscaler implements the sliding window autoscaling algorithm
@@ -40,14 +40,18 @@ type SlidingWindowAutoscaler struct {
4040
maxPanicPods int32
4141

4242
// Delay window for scale-down decisions
43-
delayWindow *metrics.TimeWindow
43+
delayWindow *delaywindow.DelayWindow
4444
}
4545

46+
const (
47+
scaleDownDelayGranularity = 1 * time.Second
48+
)
49+
4650
// NewSlidingWindowAutoscaler creates a new sliding window autoscaler.
4751
func NewSlidingWindowAutoscaler(spec api.AutoscalerSpec) *SlidingWindowAutoscaler {
48-
var delayWindow *metrics.TimeWindow
52+
var delayWindow *delaywindow.DelayWindow
4953
if spec.ScaleDownDelay > 0 {
50-
delayWindow = metrics.NewTimeWindow(spec.ScaleDownDelay, 2*time.Second)
54+
delayWindow = delaywindow.NewDelayWindow(spec.ScaleDownDelay, scaleDownDelayGranularity)
5155
}
5256

5357
return &SlidingWindowAutoscaler{
@@ -149,8 +153,7 @@ func (a *SlidingWindowAutoscaler) Scale(ctx context.Context, snapshot api.Metric
149153
// Apply scale-down delay if configured
150154
if a.spec.Reachable && a.delayWindow != nil {
151155
a.delayWindow.Record(now, desiredPodCount)
152-
delayedPodCount := a.delayWindow.Current()
153-
desiredPodCount = delayedPodCount
156+
desiredPodCount = a.delayWindow.CurrentMax()
154157
}
155158

156159
// Apply min/max scale bounds
@@ -190,9 +193,9 @@ func (a *SlidingWindowAutoscaler) Update(spec api.AutoscalerSpec) error {
190193
// Update delay window if needed
191194
if spec.ScaleDownDelay > 0 {
192195
if a.delayWindow == nil {
193-
a.delayWindow = metrics.NewTimeWindow(spec.ScaleDownDelay, 2*time.Second)
196+
a.delayWindow = delaywindow.NewDelayWindow(spec.ScaleDownDelay, scaleDownDelayGranularity)
194197
} else {
195-
a.delayWindow.ResizeWindow(spec.ScaleDownDelay)
198+
a.delayWindow.Resize(spec.ScaleDownDelay)
196199
}
197200
} else {
198201
a.delayWindow = nil

delaywindow/delaywindow.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
Copyright 2025 The libkpa Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package delaywindow provides a delay window collection and aggregation.
18+
package delaywindow
19+
20+
import (
21+
"sync"
22+
"time"
23+
)
24+
25+
// DelayWindow maintains a sliding window of values over time, organized into fixed-size time slots.
26+
// It allows recording values at specific times and retrieving the maximum value within the window.
27+
type DelayWindow struct {
28+
// windowSize is the total duration of the sliding window
29+
windowSize time.Duration
30+
// granularity is the duration of each time slot
31+
granularity time.Duration
32+
// numSlots is the total number of slots in the window (windowSize/granularity)
33+
numSlots int
34+
// slots is a circular buffer storing values for each time slot
35+
slots []int32
36+
// slotTimes stores the truncated timestamp for each slot to track validity
37+
slotTimes []time.Time
38+
// lastRecordedTime tracks the most recent time a value was recorded
39+
lastRecordedTime time.Time
40+
// mu protects concurrent access
41+
mu sync.RWMutex
42+
}
43+
44+
// NewDelayWindow creates a new DelayWindow with the specified window size and granularity.
45+
// The windowSize must be divisible by granularity, and both must be positive.
46+
func NewDelayWindow(windowSize, granularity time.Duration) *DelayWindow {
47+
if windowSize <= 0 || granularity <= 0 {
48+
panic("windowSize and granularity must be positive")
49+
}
50+
if windowSize%granularity != 0 {
51+
panic("windowSize must be divisible by granularity")
52+
}
53+
54+
numSlots := int(windowSize / granularity)
55+
return &DelayWindow{
56+
windowSize: windowSize,
57+
granularity: granularity,
58+
numSlots: numSlots,
59+
slots: make([]int32, numSlots),
60+
slotTimes: make([]time.Time, numSlots),
61+
}
62+
}
63+
64+
// Record stores a value at the specified time.
65+
// The time is truncated to the granularity, and the value is stored in the corresponding slot.
66+
// If a value already exists for that slot, it is overwritten.
67+
func (dw *DelayWindow) Record(now time.Time, value int32) {
68+
dw.mu.Lock()
69+
defer dw.mu.Unlock()
70+
71+
// Truncate time to granularity
72+
truncatedTime := now.Truncate(dw.granularity)
73+
74+
// Calculate slot index using modulo for circular buffer behavior
75+
slot := dw.getSlotIndex(truncatedTime)
76+
77+
// Store the value and timestamp
78+
dw.slots[slot] = value
79+
dw.slotTimes[slot] = truncatedTime
80+
81+
// Update last recorded time
82+
if truncatedTime.After(dw.lastRecordedTime) {
83+
dw.lastRecordedTime = truncatedTime
84+
}
85+
}
86+
87+
// CurrentMax returns the maximum value within the current window.
88+
// The window extends from (now - windowSize) to now, where now is the latest recorded time
89+
// or the current time if no recordings exist.
90+
// Returns 0 if no values exist in the window.
91+
func (dw *DelayWindow) CurrentMax() int32 {
92+
dw.mu.RLock()
93+
defer dw.mu.RUnlock()
94+
95+
// Determine the current time (latest recorded or time.Now())
96+
now := dw.lastRecordedTime
97+
if now.IsZero() {
98+
now = time.Now().Truncate(dw.granularity)
99+
}
100+
101+
// Calculate the start of the window
102+
windowStart := now.Add(-dw.windowSize + dw.granularity)
103+
104+
maxValue := int32(0)
105+
106+
// Scan all slots to find values within the window
107+
for i := 0; i < dw.numSlots; i++ {
108+
// Check if this slot has a valid timestamp within the window
109+
if !dw.slotTimes[i].IsZero() &&
110+
!dw.slotTimes[i].Before(windowStart) &&
111+
!dw.slotTimes[i].After(now) {
112+
if dw.slots[i] > maxValue {
113+
maxValue = dw.slots[i]
114+
}
115+
}
116+
}
117+
118+
return maxValue
119+
}
120+
121+
// getSlotIndex calculates the circular buffer index for a given time.
122+
// This ensures proper wrap-around behavior.
123+
func (dw *DelayWindow) getSlotIndex(t time.Time) int {
124+
// Convert time to a slot number based on Unix time and granularity
125+
slotNumber := t.Unix() / int64(dw.granularity.Seconds())
126+
// Use modulo to wrap around in the circular buffer
127+
return int(slotNumber % int64(dw.numSlots))
128+
}
129+
130+
// Resize adjusts the window size while maintaining the same granularity.
131+
// Existing data within the new window is preserved, while data outside
132+
// the new window is discarded. The granularity remains unchanged.
133+
func (dw *DelayWindow) Resize(newWindowSize time.Duration) {
134+
dw.mu.Lock()
135+
defer dw.mu.Unlock()
136+
137+
// Validate new window size
138+
if newWindowSize <= 0 {
139+
panic("newWindowSize must be positive")
140+
}
141+
if newWindowSize%dw.granularity != 0 {
142+
panic("newWindowSize must be divisible by granularity")
143+
}
144+
145+
newNumSlots := int(newWindowSize / dw.granularity)
146+
147+
// If the number of slots hasn't changed, just update windowSize
148+
if newNumSlots == dw.numSlots {
149+
dw.windowSize = newWindowSize
150+
return
151+
}
152+
153+
// Create new slots and slotTimes arrays
154+
newSlots := make([]int32, newNumSlots)
155+
newSlotTimes := make([]time.Time, newNumSlots)
156+
157+
// Determine the current time for window calculation
158+
now := dw.lastRecordedTime
159+
if now.IsZero() {
160+
now = time.Now().Truncate(dw.granularity)
161+
}
162+
163+
// Calculate the start of the new window
164+
windowStart := now.Add(-newWindowSize + dw.granularity)
165+
166+
// Copy existing valid data to new arrays
167+
for i := 0; i < dw.numSlots; i++ {
168+
// Check if this slot has valid data within the new window
169+
if !dw.slotTimes[i].IsZero() &&
170+
!dw.slotTimes[i].Before(windowStart) &&
171+
!dw.slotTimes[i].After(now) {
172+
// Calculate new slot index based on the timestamp
173+
slotNumber := dw.slotTimes[i].Unix() / int64(dw.granularity.Seconds())
174+
newSlot := int(slotNumber % int64(newNumSlots))
175+
newSlots[newSlot] = dw.slots[i]
176+
newSlotTimes[newSlot] = dw.slotTimes[i]
177+
}
178+
}
179+
180+
// Update the structure with new values
181+
dw.windowSize = newWindowSize
182+
dw.numSlots = newNumSlots
183+
dw.slots = newSlots
184+
dw.slotTimes = newSlotTimes
185+
}

0 commit comments

Comments
 (0)