Skip to content

Commit 2fc431b

Browse files
liu-congrlakhtakia
authored andcommitted
Refactor scheduler to run plugins (kubernetes-sigs#677)
* Refactor scheduler to run plugins * Add scheduler plugin latency metric * Address comments * Address comments
1 parent 61bcc1e commit 2fc431b

File tree

7 files changed

+979
-4
lines changed

7 files changed

+979
-4
lines changed

pkg/epp/scheduling/plugins/filter.go

+394
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
18+
package filter
19+
========
20+
package plugins
21+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
22+
23+
import (
24+
"math"
25+
"math/rand"
26+
"time"
27+
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
29+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
31+
========
32+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
34+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
35+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
36+
)
37+
38+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
39+
type baseFilter struct {
40+
========
41+
type Filter struct {
42+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
43+
name string
44+
filter filterFunc
45+
}
46+
47+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
48+
func (f *baseFilter) Name() string {
49+
if f == nil {
50+
return "nil"
51+
}
52+
return f.name
53+
}
54+
55+
func (f *baseFilter) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
56+
loggerTrace := ctx.Logger.V(logutil.TRACE)
57+
loggerTrace.Info("Running a filter", "name", f.Name(), "podCount", len(pods))
58+
59+
return f.filter(ctx, pods)
60+
}
61+
62+
// DecisionTreeFilter applies current filterFunc, and then recursively applies next filters
63+
// depending success or failure of the current filter.
64+
// It can be used to construct a flow chart algorithm.
65+
type DecisionTreeFilter struct {
66+
Current plugins.Filter
67+
// NextOnSuccess filter will be applied after successfully applying the current filter.
68+
// The filtered results will be passed to the next filter.
69+
NextOnSuccess plugins.Filter
70+
// NextOnFailure filter will be applied if current filter fails.
71+
// The original input will be passed to the next filter.
72+
NextOnFailure plugins.Filter
73+
// NextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the
74+
// success or failure of the current filter.
75+
// NOTE: When using NextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil.
76+
// However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of
77+
// NextOnSuccessOrFailure, in the success and failure scenarios, respectively.
78+
NextOnSuccessOrFailure plugins.Filter
79+
}
80+
81+
func (f *DecisionTreeFilter) Name() string {
82+
if f == nil {
83+
return "nil"
84+
}
85+
return f.Current.Name()
86+
}
87+
88+
func (f *DecisionTreeFilter) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
89+
loggerTrace := ctx.Logger.V(logutil.TRACE)
90+
filtered := f.Current.Filter(ctx, pods)
91+
92+
next := f.NextOnSuccessOrFailure
93+
if len(filtered) > 0 {
94+
========
95+
func (bf *Filter) Name() string {
96+
if bf == nil {
97+
return "nil"
98+
}
99+
return bf.name
100+
}
101+
102+
func (bf *Filter) Filter(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
103+
loggerTrace := ctx.Logger.V(logutil.TRACE)
104+
loggerTrace.Info("Running a filter", "name", bf.Name(), "podCount", len(pods))
105+
106+
return bf.filter(ctx, pods)
107+
}
108+
109+
// DecisionTreeFilter applies current filterFunc, and then recursively applies next filters
110+
// depending success or failure of the current filter.
111+
// It can be used to construct a flow chart algorithm.
112+
type DecisionTreeFilter struct {
113+
Current types.Filter
114+
// NextOnSuccess filter will be applied after successfully applying the current filter.
115+
// The filtered results will be passed to the next filter.
116+
NextOnSuccess types.Filter
117+
// NextOnFailure filter will be applied if current filter fails.
118+
// The original input will be passed to the next filter.
119+
NextOnFailure types.Filter
120+
// NextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the
121+
// success or failure of the current filter.
122+
// NOTE: When using NextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil.
123+
// However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of
124+
// NextOnSuccessOrFailure, in the success and failure scenarios, respectively.
125+
NextOnSuccessOrFailure types.Filter
126+
}
127+
128+
func (f *DecisionTreeFilter) Name() string {
129+
if f == nil {
130+
return "nil"
131+
}
132+
return f.Current.Name()
133+
}
134+
135+
func (f *DecisionTreeFilter) Filter(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
136+
loggerTrace := ctx.Logger.V(logutil.TRACE)
137+
filtered, err := f.Current.Filter(ctx, pods)
138+
139+
next := f.NextOnSuccessOrFailure
140+
if err == nil && len(filtered) > 0 {
141+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
142+
if f.NextOnSuccess == nil && f.NextOnSuccessOrFailure == nil {
143+
// No succeeding filters to run, return.
144+
return filtered
145+
}
146+
if f.NextOnSuccess != nil {
147+
next = f.NextOnSuccess
148+
}
149+
loggerTrace.Info("Filter succeeded", "filter", f.Name(), "next", next.Name(), "filteredPodCount", len(filtered))
150+
// On success, pass the filtered result to the next filter.
151+
return next.Filter(ctx, filtered)
152+
} else {
153+
if f.NextOnFailure == nil && f.NextOnSuccessOrFailure == nil {
154+
// No succeeding filters to run, return.
155+
return filtered
156+
}
157+
if f.NextOnFailure != nil {
158+
next = f.NextOnFailure
159+
}
160+
loggerTrace.Info("Filter failed", "filter", f.Name(), "next", next.Name())
161+
// On failure, pass the initial set of pods to the next filter.
162+
return next.Filter(ctx, pods)
163+
}
164+
}
165+
166+
// filterFunc filters a set of input pods to a subset.
167+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
168+
type filterFunc func(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod
169+
170+
// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc.
171+
func toFilterFunc(pp podPredicate) filterFunc {
172+
return func(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
173+
========
174+
type filterFunc func(ctx *types.Context, pods []types.Pod) ([]types.Pod, error)
175+
176+
// toFilterFunc is a helper function to convert a per pod filter func to the FilterFunc.
177+
func toFilterFunc(pp podPredicate) filterFunc {
178+
return func(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
179+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
180+
filtered := []types.Pod{}
181+
for _, pod := range pods {
182+
pass := pp(ctx.Req, pod)
183+
if pass {
184+
filtered = append(filtered, pod)
185+
}
186+
}
187+
188+
return filtered
189+
}
190+
}
191+
192+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
193+
var LeastQueueFilter = &baseFilter{
194+
========
195+
var LeastQueueFilter = &Filter{
196+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
197+
name: "least queuing",
198+
filter: leastQueuingFilterFunc,
199+
}
200+
201+
// leastQueuingFilterFunc finds the max and min queue size of all pods, divides the whole range
202+
// (max-min) by the number of pods, and finds the pods that fall into the first range.
203+
// The intuition is that if there are multiple pods that share similar queue size in the low range,
204+
// we should consider them all instead of the absolute minimum one. This worked better than picking
205+
// the least one as it gives more choices for the next filter, which on aggregate gave better
206+
// results.
207+
// TODO: Compare this strategy with other strategies such as top K.
208+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
209+
func leastQueuingFilterFunc(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
210+
========
211+
func leastQueuingFilterFunc(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
212+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
213+
min := math.MaxInt
214+
max := 0
215+
filtered := []types.Pod{}
216+
217+
for _, pod := range pods {
218+
if pod.GetMetrics().WaitingQueueSize <= min {
219+
min = pod.GetMetrics().WaitingQueueSize
220+
}
221+
if pod.GetMetrics().WaitingQueueSize >= max {
222+
max = pod.GetMetrics().WaitingQueueSize
223+
}
224+
}
225+
226+
for _, pod := range pods {
227+
if pod.GetMetrics().WaitingQueueSize >= min && pod.GetMetrics().WaitingQueueSize <= min+(max-min)/len(pods) {
228+
filtered = append(filtered, pod)
229+
}
230+
}
231+
return filtered
232+
}
233+
234+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
235+
var LowQueueFilter = &baseFilter{
236+
========
237+
var LowQueueFilter = &Filter{
238+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
239+
name: "low queueing filter",
240+
filter: toFilterFunc((queueThresholdPredicate(config.Conf.QueueingThresholdLoRA))),
241+
}
242+
243+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
244+
var LeastKVCacheFilter = &baseFilter{
245+
========
246+
var LeastKVCacheFilter = &Filter{
247+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
248+
name: "least KV cache percent",
249+
filter: leastKVCacheFilterFunc,
250+
}
251+
252+
// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range
253+
// (max-min) by the number of pods, and finds the pods that fall into the first range.
254+
// The intuition is that if there are multiple pods that share similar KV cache in the low range, we
255+
// should consider them all instead of the absolute minimum one. This worked better than picking the
256+
// least one as it gives more choices for the next filter, which on aggregate gave better results.
257+
// TODO: Compare this strategy with other strategies such as top K.
258+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
259+
func leastKVCacheFilterFunc(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
260+
========
261+
func leastKVCacheFilterFunc(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
262+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
263+
min := math.MaxFloat64
264+
var max float64 = 0
265+
filtered := []types.Pod{}
266+
267+
for _, pod := range pods {
268+
if pod.GetMetrics().KVCacheUsagePercent <= min {
269+
min = pod.GetMetrics().KVCacheUsagePercent
270+
}
271+
if pod.GetMetrics().KVCacheUsagePercent >= max {
272+
max = pod.GetMetrics().KVCacheUsagePercent
273+
}
274+
}
275+
276+
for _, pod := range pods {
277+
if pod.GetMetrics().KVCacheUsagePercent >= min && pod.GetMetrics().KVCacheUsagePercent <= min+(max-min)/float64(len(pods)) {
278+
filtered = append(filtered, pod)
279+
}
280+
}
281+
return filtered
282+
}
283+
284+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
285+
var LoRAAffinityFilter = &baseFilter{
286+
========
287+
var LoRAAffinityFilter = &Filter{
288+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
289+
name: "affinity LoRA",
290+
filter: loRASoftAffinityFilterFunc,
291+
}
292+
293+
// loRASoftAffinityPredicate implements a pod selection strategy that prioritizes pods
294+
// with existing LoRA model affinity while allowing for load balancing through randomization.
295+
//
296+
// The function works by:
297+
// 1. Separating pods into two groups: those with target model affinity and those with available capacity
298+
// 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing
299+
// 3. Falling back to whatever group has pods if one group is empty
300+
//
301+
// Parameters:
302+
// - logger: Logger interface for diagnostic output
303+
// - req: LLM request containing the resolved target model
304+
// - pods: Slice of pod metrics to filter
305+
//
306+
// Returns:
307+
// - Filtered slice of pod metrics based on affinity and availability
308+
// - Error if any issues occur during filtering
309+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
310+
func loRASoftAffinityFilterFunc(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
311+
========
312+
func loRASoftAffinityFilterFunc(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
313+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
314+
315+
// Pre-allocate slices with estimated capacity
316+
filtered_affinity := make([]types.Pod, 0, len(pods))
317+
filtered_available := make([]types.Pod, 0, len(pods))
318+
319+
// Categorize pods based on affinity and availability
320+
for _, pod := range pods {
321+
_, active := pod.GetMetrics().ActiveModels[ctx.Req.ResolvedTargetModel]
322+
_, waiting := pod.GetMetrics().WaitingModels[ctx.Req.ResolvedTargetModel]
323+
324+
if active || waiting {
325+
filtered_affinity = append(filtered_affinity, pod)
326+
} else if len(pod.GetMetrics().ActiveModels)+len(pod.GetMetrics().WaitingModels) < pod.GetMetrics().MaxActiveModels {
327+
filtered_available = append(filtered_available, pod)
328+
}
329+
}
330+
331+
// Use crypto/rand for better randomization in production environments
332+
randSource := rand.NewSource(time.Now().UnixNano())
333+
randGen := rand.New(randSource)
334+
335+
// If both groups have pods, use probability to select which group to return
336+
if len(filtered_affinity) > 0 && len(filtered_available) > 0 {
337+
if randGen.Float64() < config.Conf.LoraAffinityThreshold {
338+
<<<<<<<< HEAD:pkg/epp/scheduling/plugins/filter/filter.go
339+
return filtered_affinity
340+
========
341+
return filtered_affinity, nil
342+
>>>>>>>> 45209f6 (Refactor scheduler to run plugins (#677)):pkg/epp/scheduling/plugins/filter.go
343+
}
344+
return filtered_available
345+
}
346+
347+
// Return whichever group has pods
348+
if len(filtered_affinity) > 0 {
349+
return filtered_affinity
350+
}
351+
352+
return filtered_available
353+
}
354+
355+
var HasCapacityFilter = &baseFilter{
356+
name: "has capacity for sheddable requests",
357+
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
358+
}
359+
360+
var HasCapacityFilter = &Filter{
361+
name: "has capacity for sheddable requests",
362+
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
363+
}
364+
365+
var DropRequestFilter = &Filter{
366+
name: "drop request",
367+
filter: func(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
368+
ctx.Logger.V(logutil.DEFAULT).Info("Request dropped", "request", ctx.Req)
369+
return []types.Pod{}, errutil.Error{
370+
Code: errutil.InferencePoolResourceExhausted, Msg: "dropping request due to limited backend resources",
371+
}
372+
},
373+
}
374+
375+
// podPredicate is a filter function to check whether a pod is desired.
376+
type podPredicate func(req *types.LLMRequest, pod types.Pod) bool
377+
378+
func queueThresholdPredicate(queueThreshold int) podPredicate {
379+
return func(req *types.LLMRequest, pod types.Pod) bool {
380+
return pod.GetMetrics().WaitingQueueSize <= queueThreshold
381+
}
382+
}
383+
384+
func kvCacheThresholdPredicate(kvCacheThreshold float64) podPredicate {
385+
return func(req *types.LLMRequest, pod types.Pod) bool {
386+
return pod.GetMetrics().KVCacheUsagePercent <= kvCacheThreshold
387+
}
388+
}
389+
390+
func (pp podPredicate) and(another podPredicate) podPredicate {
391+
return func(req *types.LLMRequest, pod types.Pod) bool {
392+
return pp(req, pod) && another(req, pod)
393+
}
394+
}

0 commit comments

Comments
 (0)