From 4bb3be348314455e7d21783578b26f4ac75fee83 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Mon, 31 Mar 2025 09:56:23 +0300 Subject: [PATCH] removed ctx from datastore struct. instead added ctx to one function that required it which is more aligned with best practices. Signed-off-by: Nir Rozenbaum --- cmd/epp/main.go | 2 +- .../controller/inferencemodel_reconciler_test.go | 2 +- pkg/epp/controller/inferencepool_reconciler_test.go | 2 +- pkg/epp/controller/pod_reconciler.go | 6 +++--- pkg/epp/controller/pod_reconciler_test.go | 4 ++-- pkg/epp/datastore/datastore.go | 13 +++++-------- pkg/epp/datastore/datastore_test.go | 8 ++++---- test/integration/epp/hermetic_test.go | 2 +- 8 files changed, 18 insertions(+), 21 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 39baf18b..becd0d33 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -167,7 +167,7 @@ func run() error { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval) // Setup runner. - datastore := datastore.NewDatastore(ctx, pmf) + datastore := datastore.NewDatastore(pmf) serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, diff --git a/pkg/epp/controller/inferencemodel_reconciler_test.go b/pkg/epp/controller/inferencemodel_reconciler_test.go index cd1ff1fb..7db123cb 100644 --- a/pkg/epp/controller/inferencemodel_reconciler_test.go +++ b/pkg/epp/controller/inferencemodel_reconciler_test.go @@ -192,7 +192,7 @@ func TestInferenceModelReconciler(t *testing.T) { WithIndex(&v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf) + ds := datastore.NewDatastore(pmf) for _, m := range test.modelsInStore { ds.ModelSetIfOlder(m) } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 27c4238e..1f72f79c 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -95,7 +95,7 @@ func TestInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - datastore := datastore.NewDatastore(ctx, pmf) + datastore := datastore.NewDatastore(pmf) inferencePoolReconciler := &InferencePoolReconciler{PoolNamespacedName: namespacedName, Client: fakeClient, Datastore: datastore} // Step 1: Inception, only ready pods matching pool1 are added to the store. diff --git a/pkg/epp/controller/pod_reconciler.go b/pkg/epp/controller/pod_reconciler.go index 046561e4..086b0e6d 100644 --- a/pkg/epp/controller/pod_reconciler.go +++ b/pkg/epp/controller/pod_reconciler.go @@ -59,7 +59,7 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } - c.updateDatastore(logger, pod, pool) + c.updateDatastore(ctx, logger, pod, pool) return ctrl.Result{}, nil } @@ -69,13 +69,13 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(c) } -func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) { +func (c *PodReconciler) updateDatastore(ctx context.Context, logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) { namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} if !pod.DeletionTimestamp.IsZero() || !c.Datastore.PoolLabelsMatch(pod.Labels) || !podIsReady(pod) { logger.V(logutil.DEBUG).Info("Pod removed or not added", "name", namespacedName) c.Datastore.PodDelete(namespacedName) } else { - if c.Datastore.PodUpdateOrAddIfNotExist(pod, pool) { + if c.Datastore.PodUpdateOrAddIfNotExist(ctx, pod, pool) { logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName) } else { logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName) diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index e4cb0b62..f40c1ea8 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -181,10 +181,10 @@ func TestPodReconciler(t *testing.T) { Build() // Configure the initial state of the datastore. - store := datastore.NewDatastore(t.Context(), pmf) + store := datastore.NewDatastore(pmf) store.PoolSet(test.pool) for _, pod := range test.existingPods { - store.PodUpdateOrAddIfNotExist(pod, pool) + store.PodUpdateOrAddIfNotExist(t.Context(), pod, pool) } podReconciler := &PodReconciler{Client: fakeClient, Datastore: store} diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 8ada3e64..ac8bbbf5 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -60,7 +60,7 @@ type Datastore interface { PodGetAll() []backendmetrics.PodMetrics // PodList lists pods matching the given predicate. PodList(func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics - PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool + PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool PodDelete(namespacedName types.NamespacedName) PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool) @@ -68,9 +68,8 @@ type Datastore interface { Clear() } -func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFactory) *datastore { +func NewDatastore(pmf *backendmetrics.PodMetricsFactory) *datastore { store := &datastore{ - parentCtx: parentCtx, poolAndModelsMu: sync.RWMutex{}, models: make(map[string]*v1alpha2.InferenceModel), pods: &sync.Map{}, @@ -80,8 +79,6 @@ func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFacto } type datastore struct { - // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore. - parentCtx context.Context // poolAndModelsMu is used to synchronize access to pool and the models map. poolAndModelsMu sync.RWMutex pool *v1alpha2.InferencePool @@ -228,7 +225,7 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b return res } -func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool { +func (ds *datastore) PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool { namespacedName := types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, @@ -236,7 +233,7 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.In var pm backendmetrics.PodMetrics existing, ok := ds.pods.Load(namespacedName) if !ok { - pm = ds.pmf.NewPodMetrics(ds.parentCtx, pod, ds) + pm = ds.pmf.NewPodMetrics(ctx, pod, ds) ds.pods.Store(namespacedName, pm) } else { pm = existing.(backendmetrics.PodMetrics) @@ -262,7 +259,7 @@ func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client, if podIsReady(&pod) { namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} activePods[pod.Name] = true - if ds.PodUpdateOrAddIfNotExist(&pod, pool) { + if ds.PodUpdateOrAddIfNotExist(ctx, &pod, pool) { logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName) } else { logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName) diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 22bb0365..11c5062a 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -72,7 +72,7 @@ func TestPool(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - datastore := NewDatastore(context.Background(), pmf) + datastore := NewDatastore(pmf) datastore.PoolSet(tt.inferencePool) gotPool, gotErr := datastore.PoolGet() if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { @@ -204,7 +204,7 @@ func TestModel(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf) + ds := NewDatastore(pmf) for _, m := range test.existingModels { ds.ModelSetIfOlder(m) } @@ -318,10 +318,10 @@ func TestMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond) - ds := NewDatastore(ctx, pmf) + ds := NewDatastore(pmf) ds.PoolSet(inferencePool) for _, pod := range test.storePods { - ds.PodUpdateOrAddIfNotExist(pod, inferencePool) + ds.PodUpdateOrAddIfNotExist(ctx, pod, inferencePool) } assert.EventuallyWithT(t, func(t *assert.CollectT) { got := ds.PodGetAll() diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 0ba0e14a..c5f29393 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1628,7 +1628,7 @@ func BeforeSuite() func() { pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults serverRunner.PoolName = "vllm-llama3-8b-instruct-pool" - serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) + serverRunner.Datastore = datastore.NewDatastore(pmf) serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {