Skip to content

✨ Add a metrics to expose informer cache's length #3184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
return nil, false, err
}

// Add metric event handler to track cache resources count
metricsHandler := NewMetricsResourceEventHandler(gvk, sharedIndexInformer)
if _, err := sharedIndexInformer.AddEventHandler(metricsHandler); err != nil {
return nil, false, err
}

mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -614,3 +620,22 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
}
return ""
}

// VisitInformers calls the given function for each informer in the cache
func (ip *Informers) VisitInformers(visitor func(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer)) {
ip.mu.RLock()
defer ip.mu.RUnlock()

// Visit each tracked informer
for gvk, _cache := range ip.tracker.Structured {
visitor(gvk, _cache.Informer)
}

for gvk, _cache := range ip.tracker.Unstructured {
visitor(gvk, _cache.Informer)
}

for gvk, _cache := range ip.tracker.Metadata {
visitor(gvk, _cache.Informer)
}
}
66 changes: 66 additions & 0 deletions pkg/cache/internal/metrics_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/metrics"
)

// NewMetricsResourceEventHandler creates a new metrics-collecting event handler for an informer.
// It counts resource additions, updates, and deletions and records them in metrics.
func NewMetricsResourceEventHandler(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) cache.ResourceEventHandler {
handler := &metricsResourceEventHandler{
gvk: gvk,
informer: informer,
}

// Initialize the initial count
handler.updateCount()

return handler
}

// metricsResourceEventHandler implements cache.ResourceEventHandler interface
// to collect metrics about resources in the cache
type metricsResourceEventHandler struct {
gvk schema.GroupVersionKind
informer cache.SharedIndexInformer
}

// OnAdd is called when an object is added.
func (h *metricsResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
h.updateCount()
}

// OnUpdate is called when an object is modified.
func (h *metricsResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
// No need to update counts on update as the total count hasn't changed
}

// OnDelete is called when an object is deleted.
func (h *metricsResourceEventHandler) OnDelete(obj interface{}) {
h.updateCount()
}

// updateCount updates the metrics with the current count of resources.
func (h *metricsResourceEventHandler) updateCount() {
count := len(h.informer.GetIndexer().ListKeys())
metrics.RecordCacheResourceCount(h.gvk, count)
}
253 changes: 253 additions & 0 deletions pkg/cache/internal/metrics_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"k8s.io/apimachinery/pkg/runtime/schema"
toolscache "k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var _ = Describe("Metrics Handler", func() {

Describe("RecordCacheResourceCount", func() {
var (
podGVK schema.GroupVersionKind
)

BeforeEach(func() {
podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
})

DescribeTable("recording different resource counts",
func(count int) {
// Directly call RecordCacheResourceCount to record metrics
metrics.RecordCacheResourceCount(podGVK, count)
// Since we cannot directly verify prometheus metric values in tests
// we can only ensure the function doesn't panic
Expect(true).To(BeTrue()) // Simple assertion to show test passed
},
Entry("empty", 0),
Entry("one pod", 1),
Entry("multiple pods", 5),
)
})

Describe("MetricsResourceEventHandler", func() {
var (
podGVK schema.GroupVersionKind
objects []interface{}
indexer *mockIndexer
informer *mockSharedIndexInformer
handler *metricsResourceEventHandler
metricRegistry *prometheus.Registry
)

BeforeEach(func() {
podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
objects = []interface{}{}
indexer = &mockIndexer{getListFunc: func() []interface{} { return objects }}
informer = &mockSharedIndexInformer{indexer: indexer}

// Reset metrics Registry
metricRegistry = prometheus.NewRegistry()
metrics.Registry = metricRegistry
metrics.Registry.MustRegister(metrics.CacheResourceCount)

handler = NewMetricsResourceEventHandler(podGVK, informer)
})

verifyMetricValue := func(gvk schema.GroupVersionKind, expectedValue float64) {
gauge := metrics.CacheResourceCount.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind)
var metric dto.Metric
err := gauge.Write(&metric)
Expect(err).NotTo(HaveOccurred(), "Failed to write metric")

actualValue := metric.GetGauge().GetValue()
Expect(actualValue).To(Equal(expectedValue), "Metric value does not match expected")
}

It("should update metrics on events", func() {
// Verify initial state - empty list, count should be 0
verifyMetricValue(podGVK, 0)

// Test OnAdd - adding a pod should update the count
objects = append(objects, "pod-1")
handler.OnAdd("pod-1", false)
verifyMetricValue(podGVK, 1)

// Test OnUpdate - should not change the count since total object count hasn't changed
handler.OnUpdate("pod-1", "pod-1-updated")
verifyMetricValue(podGVK, 1)

// Add another pod
objects = append(objects, "pod-2")
handler.OnAdd("pod-2", false)
verifyMetricValue(podGVK, 2)

// Test OnDelete - deleting a pod should update the count
objects = objects[:1] // Only keep the first pod
handler.OnDelete("pod-2")
verifyMetricValue(podGVK, 1)

// Delete all pods
objects = []interface{}{}
handler.OnDelete("pod-1")
verifyMetricValue(podGVK, 0)
})
})
})

// mockIndexer is a simple Indexer implementation for testing
type mockIndexer struct {
getListFunc func() []interface{}
}

func (m *mockIndexer) Add(obj interface{}) error {
return nil
}

func (m *mockIndexer) Update(obj interface{}) error {
return nil
}

func (m *mockIndexer) Delete(obj interface{}) error {
return nil
}

func (m *mockIndexer) List() []interface{} {
return m.getListFunc()
}

func (m *mockIndexer) ListKeys() []string {
return nil
}

func (m *mockIndexer) Get(obj interface{}) (item interface{}, exists bool, err error) {
return nil, false, nil
}

func (m *mockIndexer) GetByKey(key string) (item interface{}, exists bool, err error) {
return nil, false, nil
}

func (m *mockIndexer) Replace(list []interface{}, resourceVersion string) error {
return nil
}

func (m *mockIndexer) Resync() error {
return nil
}

func (m *mockIndexer) Index(indexName string, obj interface{}) ([]interface{}, error) {
return nil, nil
}

func (m *mockIndexer) IndexKeys(indexName, indexedValue string) ([]string, error) {
return nil, nil
}

func (m *mockIndexer) ListIndexFuncValues(indexName string) []string {
return nil
}

func (m *mockIndexer) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
return nil, nil
}

func (m *mockIndexer) GetIndexers() toolscache.Indexers {
return nil
}

func (m *mockIndexer) AddIndexers(newIndexers toolscache.Indexers) error {
return nil
}

// mockSharedIndexInformer is a simple SharedIndexInformer implementation for testing
type mockSharedIndexInformer struct {
indexer toolscache.Indexer
}

func (m *mockSharedIndexInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
return nil, nil
}

func (m *mockSharedIndexInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
return nil, nil
}

func (m *mockSharedIndexInformer) AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) {
return nil, nil
}

func (m *mockSharedIndexInformer) RemoveEventHandler(registration toolscache.ResourceEventHandlerRegistration) error {
return nil
}

func (m *mockSharedIndexInformer) GetStore() toolscache.Store {
return m.indexer
}

func (m *mockSharedIndexInformer) GetController() toolscache.Controller {
return nil
}

func (m *mockSharedIndexInformer) Run(stopCh <-chan struct{}) {
}

func (m *mockSharedIndexInformer) RunWithContext(ctx context.Context) {
}

func (m *mockSharedIndexInformer) HasSynced() bool {
return true
}

func (m *mockSharedIndexInformer) LastSyncResourceVersion() string {
return ""
}

func (m *mockSharedIndexInformer) SetWatchErrorHandler(handler toolscache.WatchErrorHandler) error {
return nil
}

func (m *mockSharedIndexInformer) SetWatchErrorHandlerWithContext(handler toolscache.WatchErrorHandlerWithContext) error {
return nil
}

func (m *mockSharedIndexInformer) SetTransform(transformer toolscache.TransformFunc) error {
return nil
}

func (m *mockSharedIndexInformer) GetIndexer() toolscache.Indexer {
return m.indexer
}

func (m *mockSharedIndexInformer) AddIndexers(indexers toolscache.Indexers) error {
return nil
}

func (m *mockSharedIndexInformer) IsStopped() bool {
return false
}
29 changes: 29 additions & 0 deletions pkg/cache/internal/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestCacheInternal(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Cache Internal Suite")
}
Loading