Skip to content

Commit 22d4be0

Browse files
committed
Externalize Scheduler's saturation logic
This commit refactors the request processing pipeline, externalizing saturation detection and criticality-based service differentiation from the Scheduler. These responsibilities are now primarily managed by the RequestControl.Director. This change is a preparatory step for the introduction of a new Flow Controller component, which will eventually absorb these admission control duties. Key changes include: - Introduced `PreDispatch` method to `RequestControl.Director` It utilizes the `SaturationDetector` for admission control of non-critical requests and handles request criticality to determine if saturation checks are bypassed. - The saturation detection logic for dropping non-critical requests is intentionally preserved within the `Director` at this stage. This allows the option to bypass the future Flow Controller component during its maturation, ensuring the existing saturation and sheddable request behavior can be maintained as a fallback. - Updated `main.go` to instantiate the `SaturationDetector`, wiring it into the request handling flow. - Updated `director_test.go` to align with the new component responsibilities, adding additional coverage where necessary. This refactoring leads to a cleaner architecture, making the `Scheduler` a more focused component and centralizing initial admission control logic while paving the way for the future Flow Controller. This is aligned with the direction in `0683-epp-architecture-proposal` and should be nearly no-op in terms of EPP behavior.
1 parent f1e8288 commit 22d4be0

File tree

7 files changed

+443
-246
lines changed

7 files changed

+443
-246
lines changed

cmd/epp/main.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
43+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4344
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
@@ -151,14 +152,17 @@ func run() error {
151152
})
152153
setupLog.Info("Flags processed", "flags", flags)
153154

154-
// Init runtime.
155+
// --- Load Configurations from Environment Variables ---
156+
sdConfig := saturationdetector.LoadConfigFromEnv()
157+
158+
// --- Get Kubernetes Config ---
155159
cfg, err := ctrl.GetConfig()
156160
if err != nil {
157-
setupLog.Error(err, "Failed to get rest config")
161+
setupLog.Error(err, "Failed to get Kubernetes rest config")
158162
return err
159163
}
160164

161-
// Set up mapper for metric scraping.
165+
// --- Setup Datastore ---
162166
mapping, err := backendmetrics.NewMetricMapping(
163167
*totalQueuedRequestsMetric,
164168
*kvCacheUsagePercentageMetric,
@@ -169,13 +173,11 @@ func run() error {
169173
return err
170174
}
171175
verifyMetricMapping(*mapping, setupLog)
172-
173176
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
174-
// Setup runner.
175177
ctx := ctrl.SetupSignalHandler()
176-
177178
datastore := datastore.NewDatastore(ctx, pmf)
178179

180+
// --- Setup Metrics Server ---
179181
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
180182
metrics.Register(customCollectors...)
181183
metrics.RecordInferenceExtensionInfo()
@@ -199,6 +201,7 @@ func run() error {
199201
return err
200202
}
201203

204+
// --- Initialize Core EPP Components ---
202205
scheduler := scheduling.NewScheduler(datastore)
203206
if schedulerV2 == "true" {
204207
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
@@ -221,6 +224,10 @@ func run() error {
221224
schedulerConfig := scheduling.NewSchedulerConfig(profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
222225
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
223226
}
227+
228+
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
229+
230+
// --- Setup ExtProc Server Runner ---
224231
serverRunner := &runserver.ExtProcServerRunner{
225232
GrpcPort: *grpcPort,
226233
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
@@ -231,24 +238,26 @@ func run() error {
231238
CertPath: *certPath,
232239
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
233240
Scheduler: scheduler,
241+
SaturationDetector: saturationDetector,
234242
}
235243
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
236-
setupLog.Error(err, "Failed to setup ext-proc controllers")
244+
setupLog.Error(err, "Failed to setup EPP controllers")
237245
return err
238246
}
239247

248+
// --- Add Runnables to Manager ---
240249
// Register health server.
241250
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
242251
return err
243252
}
244253

245254
// Register ext-proc server.
246-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
247-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
255+
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
248256
return err
249257
}
250258

251-
// Start the manager. This blocks until a signal is received.
259+
// --- Start Manager ---
260+
// This blocks until a signal is received.
252261
setupLog.Info("Controller manager starting")
253262
if err := mgr.Start(ctx); err != nil {
254263
setupLog.Error(err, "Error starting controller manager")
@@ -276,6 +285,16 @@ func initLogging(opts *zap.Options) {
276285
ctrl.SetLogger(logger)
277286
}
278287

288+
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager.
289+
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
290+
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
291+
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
292+
return err
293+
}
294+
setupLog.Info("ExtProc server runner added to manager.")
295+
return nil
296+
}
297+
279298
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
280299
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
281300
srv := grpc.NewServer()
@@ -309,5 +328,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
309328
if mapping.LoraRequestInfo == nil {
310329
logger.Info("Not scraping metric: LoraRequestInfo")
311330
}
312-
313331
}

pkg/epp/requestcontrol/director.go

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
// Package requestcontrol defines the Director component responsible for orchestrating request processing after initial
18+
// parsing.
1719
package requestcontrol
1820

1921
import (
@@ -34,33 +36,45 @@ import (
3436
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
3537
)
3638

39+
// Scheduler defines the interface required by the Director for scheduling.
3740
type Scheduler interface {
3841
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
3942
OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string)
4043
}
4144

45+
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
46+
type SaturationDetector interface {
47+
IsSaturated(ctx context.Context) bool
48+
}
49+
50+
// Director orchestrates the request handling flow, including scheduling.
4251
type Director struct {
43-
datastore datastore.Datastore
44-
scheduler Scheduler
52+
datastore datastore.Datastore
53+
scheduler Scheduler
54+
saturationDetector SaturationDetector
4555
}
4656

47-
func NewDirector(datastore datastore.Datastore, scheduler Scheduler) *Director {
48-
return &Director{
49-
datastore: datastore,
50-
scheduler: scheduler,
51-
}
57+
// NewDirector creates a new Director instance with all dependencies.
58+
func NewDirector(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector) *Director {
59+
return &Director{datastore, scheduler, saturationDetector}
5260
}
5361

54-
// HandleRequest always returns the requestContext even in the error case, as the request context is used in error handling.
62+
// HandleRequest orchestrates the request lifecycle:
63+
// 1. Parses request details.
64+
// 2. Calls PreDispatch for admission control.
65+
// 3. Calls Dispatch (which calls Scheduler) if request is approved.
66+
// 4. Calls PostDispatch to populate RequestContext with results.
67+
//
68+
// It always returns the requestContext even in the error case, as the request context is used in error handling.
5569
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
5670
logger := log.FromContext(ctx)
5771

58-
// Resolve target models.
72+
// --- 1. Parse Request, Resolve Target Models, and Determine Parameters ---
5973
var ok bool
6074
requestBodyMap := reqCtx.Request.Body
6175
reqCtx.Model, ok = requestBodyMap["model"].(string)
6276
if !ok {
63-
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
77+
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"}
6478
}
6579
prompt, err := requtil.ExtractPromptFromRequestBody(requestBodyMap)
6680
if err != nil {
@@ -84,29 +98,69 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
8498
reqCtx.Request.Body["model"] = reqCtx.ResolvedTargetModel // Update target model in the body.
8599
}
86100

101+
requestCriticality := v1alpha2.Standard
102+
if modelObj.Spec.Criticality != nil {
103+
requestCriticality = *modelObj.Spec.Criticality
104+
}
105+
106+
// Prepare LLMRequest (needed for both saturation detection and Scheduler)
87107
llmReq := &schedulingtypes.LLMRequest{
88108
TargetModel: reqCtx.ResolvedTargetModel,
89109
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
90-
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical,
110+
Critical: requestCriticality == v1alpha2.Critical,
91111
Prompt: prompt,
92112
Headers: reqCtx.Request.Headers,
93113
}
94-
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq)
95-
results, err := d.Dispatch(ctx, llmReq)
96-
if err != nil {
97-
return reqCtx, err
114+
logger = logger.WithValues(
115+
"model", reqCtx.Model,
116+
"resolvedTargetModel", llmReq.TargetModel,
117+
"criticality", requestCriticality,
118+
)
119+
ctx = log.IntoContext(ctx, logger)
120+
logger.V(logutil.DEBUG).Info("LLM request assembled")
121+
122+
// --- 2. Saturation Check ---
123+
preDispatchErr := d.PreDispatch(ctx, reqCtx, requestCriticality)
124+
if preDispatchErr != nil {
125+
return reqCtx, preDispatchErr
98126
}
99127

128+
// --- 3. Dispatch (Calls Scheduler) ---
129+
results, dispatchErr := d.Dispatch(ctx, llmReq)
130+
if dispatchErr != nil {
131+
return reqCtx, dispatchErr
132+
}
133+
134+
// --- 4. PostDispatch (Populates RequestContext) ---
100135
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
101-
// Attach the port number
102-
reqCtx, err = d.PostDispatch(ctx, reqCtx, results)
103-
if err != nil {
104-
return reqCtx, err
136+
// Attach the port number.
137+
reqCtx, postDispatchErr := d.PostDispatch(ctx, reqCtx, results)
138+
if postDispatchErr != nil {
139+
return reqCtx, postDispatchErr
105140
}
106141

107142
return reqCtx, nil
108143
}
109144

145+
// PreDispatch handles admission control before dispatch.
146+
func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestContext, reqCriticality v1alpha2.Criticality) error {
147+
logger := log.FromContext(ctx)
148+
149+
if reqCriticality == v1alpha2.Critical {
150+
logger.V(logutil.DEBUG).Info("Critical request bypassing saturation check.")
151+
return nil
152+
}
153+
154+
logger.V(logutil.DEBUG).Info("Performing saturation check for non-critical request.")
155+
if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector
156+
return errutil.Error{
157+
Code: errutil.InferencePoolResourceExhausted,
158+
Msg: "system saturated, non-critical request dropped",
159+
}
160+
}
161+
return nil
162+
}
163+
110164
// Dispatch runs one or many scheduling cycles.
111165
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (map[string]*schedulingtypes.Result, error) {
112166
var err error
@@ -118,6 +172,7 @@ func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequ
118172
return res, nil // TODO handle multi cycle result after defining the PostDispatch extension point
119173
}
120174

175+
// PostDispatch populates the RequestContext based on scheduling results.
121176
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, results map[string]*schedulingtypes.Result) (*handlers.RequestContext, error) {
122177
logger := log.FromContext(ctx)
123178
// currently only get a single result. Will refactor to pluggably implement the PostSchedule

0 commit comments

Comments
 (0)