diff --git a/cmd/epp/main.go b/cmd/epp/main.go index b5e6fbe6b..c0a87e62e 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -142,8 +142,8 @@ func run() error { } poolNamespacedName := types.NamespacedName{ - Namespace: *poolNamespace, Name: *poolName, + Namespace: *poolNamespace, } mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg) if err != nil { @@ -151,8 +151,6 @@ func run() error { return err } - ctx := ctrl.SetupSignalHandler() - // Set up mapper for metric scraping. mapping, err := backendmetrics.NewMetricMapping( *totalQueuedRequestsMetric, @@ -167,14 +165,15 @@ func run() error { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval) // Setup runner. + ctx := ctrl.SetupSignalHandler() + datastore := datastore.NewDatastore(ctx, pmf) serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace, DestinationEndpointHintKey: *destinationEndpointHintKey, - PoolName: *poolName, - PoolNamespace: *poolNamespace, + PoolNamespacedName: poolNamespacedName, Datastore: datastore, SecureServing: *secureServing, CertPath: *certPath, diff --git a/pkg/epp/backend/metrics/logger.go b/pkg/epp/backend/metrics/logger.go index d9a930277..7dc1a8b8b 100644 --- a/pkg/epp/backend/metrics/logger.go +++ b/pkg/epp/backend/metrics/logger.go @@ -55,8 +55,8 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh case <-ctx.Done(): logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread") return - case <-ticker.C: // Periodically flush prometheus metrics for inference pool - flushPrometheusMetricsOnce(logger, datastore) + case <-ticker.C: // Periodically refresh prometheus metrics for inference pool + refreshPrometheusMetrics(logger, datastore) } } }() @@ -86,11 +86,11 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh } } -func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) { +func refreshPrometheusMetrics(logger logr.Logger, datastore Datastore) { pool, err := datastore.PoolGet() if err != nil { // No inference pool or not initialize. - logger.V(logutil.DEFAULT).Info("pool is not initialized, skipping flushing metrics") + logger.V(logutil.DEFAULT).Info("Pool is not initialized, skipping refreshing metrics") return } @@ -98,7 +98,7 @@ func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) { var queueTotal int podMetrics := datastore.PodGetAll() - logger.V(logutil.VERBOSE).Info("Flushing Prometheus Metrics", "ReadyPods", len(podMetrics)) + logger.V(logutil.TRACE).Info("Refreshing Prometheus Metrics", "ReadyPods", len(podMetrics)) if len(podMetrics) == 0 { return } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 65a6e7879..0c0a6a6dc 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -43,8 +43,7 @@ type ExtProcServerRunner struct { GrpcPort int DestinationEndpointHintMetadataNamespace string DestinationEndpointHintKey string - PoolName string - PoolNamespace string + PoolNamespacedName types.NamespacedName Datastore datastore.Datastore SecureServing bool CertPath string @@ -73,8 +72,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { GrpcPort: DefaultGrpcPort, DestinationEndpointHintKey: DefaultDestinationEndpointHintKey, DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace, - PoolName: DefaultPoolName, - PoolNamespace: DefaultPoolNamespace, + PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, SecureServing: DefaultSecureServing, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, // Datastore can be assigned later. @@ -93,13 +91,10 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man } if err := (&controller.InferenceModelReconciler{ - Datastore: r.Datastore, - Client: mgr.GetClient(), - PoolNamespacedName: types.NamespacedName{ - Name: r.PoolName, - Namespace: r.PoolNamespace, - }, - Record: mgr.GetEventRecorderFor("InferenceModel"), + Datastore: r.Datastore, + Client: mgr.GetClient(), + PoolNamespacedName: r.PoolNamespacedName, + Record: mgr.GetEventRecorderFor("InferenceModel"), }).SetupWithManager(ctx, mgr); err != nil { return fmt.Errorf("failed setting up InferenceModelReconciler: %w", err) } diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 372158f4b..79b619fd6 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1348,7 +1348,7 @@ func BeforeSuite() func() { serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults - serverRunner.PoolName = "vllm-llama3-8b-instruct-pool" + serverRunner.PoolNamespacedName = types.NamespacedName{Name: "vllm-llama3-8b-instruct-pool", Namespace: "default"} serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) serverRunner.SecureServing = false