From 2541026dac30d2f477719ae798bc7c401dc468ad Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 03:56:19 +0000 Subject: [PATCH 1/6] watcher: Move methods accessing pods to a separate file pod.go There are no functional changes in this commit, only moving code around. It's a preparation to further refactoring of the k8s watcher that will decouple the core k8s watcher and resource-specific functionalities, and allow for reusing of informer factories. Also removed unused global constants. Signed-off-by: Anna Kapuscinska --- pkg/watcher/pod.go | 203 +++++++++++++++++++ pkg/watcher/{watcher_test.go => pod_test.go} | 0 pkg/watcher/watcher.go | 195 ------------------ 3 files changed, 203 insertions(+), 195 deletions(-) create mode 100644 pkg/watcher/pod.go rename pkg/watcher/{watcher_test.go => pod_test.go} (100%) diff --git a/pkg/watcher/pod.go b/pkg/watcher/pod.go new file mode 100644 index 00000000000..5c1b7d122e5 --- /dev/null +++ b/pkg/watcher/pod.go @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Tetragon + +package watcher + +import ( + "errors" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" +) + +const ( + containerIDLen = 15 + containerIdx = "containers-ids" + podIdx = "pod-ids" + podInformerName = "pod" +) + +var ( + errNoPod = errors.New("object is not a *corev1.Pod") +) + +func containerIDKey(contID string) (string, error) { + parts := strings.Split(contID, "//") + if len(parts) != 2 { + return "", fmt.Errorf("unexpected containerID format, expecting 'docker://', got %q", contID) + } + cid := parts[1] + if len(cid) > containerIDLen { + cid = cid[:containerIDLen] + } + return cid, nil + +} + +// containerIndexFunc index pod by container IDs. +func containerIndexFunc(obj interface{}) ([]string, error) { + var containerIDs []string + putContainer := func(fullContainerID string) error { + if fullContainerID == "" { + // This is expected if the container hasn't been started. This function + // will get called again after the container starts, so we just need to + // be patient. + return nil + } + cid, err := containerIDKey(fullContainerID) + if err != nil { + return err + } + containerIDs = append(containerIDs, cid) + return nil + } + + switch t := obj.(type) { + case *corev1.Pod: + for _, container := range t.Status.InitContainerStatuses { + err := putContainer(container.ContainerID) + if err != nil { + return nil, err + } + } + for _, container := range t.Status.ContainerStatuses { + err := putContainer(container.ContainerID) + if err != nil { + return nil, err + } + } + for _, container := range t.Status.EphemeralContainerStatuses { + err := putContainer(container.ContainerID) + if err != nil { + return nil, err + } + } + return containerIDs, nil + } + return nil, fmt.Errorf("%w - found %T", errNoPod, obj) +} + +func podIndexFunc(obj interface{}) ([]string, error) { + switch t := obj.(type) { + case *corev1.Pod: + return []string{string(t.UID)}, nil + } + return nil, fmt.Errorf("podIndexFunc: %w - found %T", errNoPod, obj) +} + +// FindContainer implements K8sResourceWatcher.FindContainer. +func (watcher *K8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) { + podInformer := watcher.GetInformer(podInformerName) + if podInformer == nil { + return nil, nil, false + } + indexedContainerID := containerID + if len(containerID) > containerIDLen { + indexedContainerID = containerID[:containerIDLen] + } + objs, err := podInformer.GetIndexer().ByIndex(containerIdx, indexedContainerID) + if err != nil { + return nil, nil, false + } + // If we can't find any pod indexed then fall back to the entire pod list. + // If we find more than 1 pods indexed also fall back to the entire pod list. + if len(objs) != 1 { + objs = podInformer.GetStore().List() + } + pod, cont, found := findContainer(containerID, objs) + if found { + return pod, cont, found + } + + return watcher.deletedPodCache.findContainer(indexedContainerID) +} + +// TODO(michi) Not the most efficient implementation. Optimize as needed. +func findContainer(containerID string, pods []interface{}) (*corev1.Pod, *corev1.ContainerStatus, bool) { + if containerID == "" { + return nil, nil, false + } + for _, obj := range pods { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, nil, false + } + for _, container := range pod.Status.ContainerStatuses { + parts := strings.Split(container.ContainerID, "//") + if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { + return pod, &container, true + } + } + for _, container := range pod.Status.InitContainerStatuses { + parts := strings.Split(container.ContainerID, "//") + if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { + return pod, &container, true + } + } + for _, container := range pod.Status.EphemeralContainerStatuses { + parts := strings.Split(container.ContainerID, "//") + if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { + return pod, &container, true + } + } + } + return nil, nil, false +} + +// FindMirrorPod finds the mirror pod of a static pod based on the hash +// see: https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-config-hash, +// https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-config-mirror, +// https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/ +func (watcher *K8sWatcher) FindMirrorPod(hash string) (*corev1.Pod, error) { + podInformer := watcher.GetInformer(podInformerName) + if podInformer == nil { + return nil, fmt.Errorf("pod informer not initialized") + } + pods := podInformer.GetStore().List() + for i := range pods { + if pod, ok := pods[i].(*corev1.Pod); ok { + if ha, ok := pod.Annotations["kubernetes.io/config.mirror"]; ok { + if hash == ha { + return pod, nil + } + } + } + } + return nil, fmt.Errorf("static pod (hash=%s) not found", hash) +} + +func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) { + podInformer := watcher.GetInformer(podInformerName) + if podInformer == nil { + return nil, fmt.Errorf("pod informer not initialized") + } + objs, err := podInformer.GetIndexer().ByIndex(podIdx, podID) + if err != nil { + return nil, fmt.Errorf("watcher returned: %w", err) + } + if len(objs) == 1 { + if pod, ok := objs[0].(*corev1.Pod); ok { + return pod, nil + } + return nil, fmt.Errorf("unexpected type %t", objs[0]) + } + + allPods := podInformer.GetStore().List() + if pod, ok := findPod(allPods); ok { + return pod, nil + } + return nil, fmt.Errorf("unable to find pod with ID %s (index pods=%d all pods=%d)", podID, len(objs), len(allPods)) +} + +func findPod(pods []interface{}) (*corev1.Pod, bool) { + for i := range pods { + if pod, ok := pods[i].(*corev1.Pod); ok { + if pod.UID == podIdx { + return pod, true + } + } + } + + return nil, false +} diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/pod_test.go similarity index 100% rename from pkg/watcher/watcher_test.go rename to pkg/watcher/pod_test.go diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index a182ce7e8bb..763fe4decae 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -4,10 +4,8 @@ package watcher import ( - "errors" "fmt" "reflect" - "strings" "time" "github.com/cilium/tetragon/pkg/logger" @@ -21,19 +19,6 @@ import ( "k8s.io/client-go/tools/cache" ) -const ( - containerIDLen = 15 - containerIdx = "containers-ids" - podIdx = "pod-ids" - serviceIPsIdx = "service-ips" - podInfoIPsIdx = "pod-info-ips" - podInformerName = "pod" -) - -var ( - errNoPod = errors.New("object is not a *corev1.Pod") -) - // K8sResourceWatcher defines an interface for accessing various resources from Kubernetes API. type K8sResourceWatcher interface { AddInformers(factory InternalSharedInformerFactory, infs ...*InternalInformer) @@ -67,70 +52,6 @@ type InternalInformer struct { Indexers cache.Indexers } -func podIndexFunc(obj interface{}) ([]string, error) { - switch t := obj.(type) { - case *corev1.Pod: - return []string{string(t.UID)}, nil - } - return nil, fmt.Errorf("podIndexFunc: %w - found %T", errNoPod, obj) -} - -func containerIDKey(contID string) (string, error) { - parts := strings.Split(contID, "//") - if len(parts) != 2 { - return "", fmt.Errorf("unexpected containerID format, expecting 'docker://', got %q", contID) - } - cid := parts[1] - if len(cid) > containerIDLen { - cid = cid[:containerIDLen] - } - return cid, nil - -} - -// containerIndexFunc index pod by container IDs. -func containerIndexFunc(obj interface{}) ([]string, error) { - var containerIDs []string - putContainer := func(fullContainerID string) error { - if fullContainerID == "" { - // This is expected if the container hasn't been started. This function - // will get called again after the container starts, so we just need to - // be patient. - return nil - } - cid, err := containerIDKey(fullContainerID) - if err != nil { - return err - } - containerIDs = append(containerIDs, cid) - return nil - } - - switch t := obj.(type) { - case *corev1.Pod: - for _, container := range t.Status.InitContainerStatuses { - err := putContainer(container.ContainerID) - if err != nil { - return nil, err - } - } - for _, container := range t.Status.ContainerStatuses { - err := putContainer(container.ContainerID) - if err != nil { - return nil, err - } - } - for _, container := range t.Status.EphemeralContainerStatuses { - err := putContainer(container.ContainerID) - if err != nil { - return nil, err - } - } - return containerIDs, nil - } - return nil, fmt.Errorf("%w - found %T", errNoPod, obj) -} - func newK8sWatcher( informerFactory informers.SharedInformerFactory, ) (*K8sWatcher, error) { @@ -216,119 +137,3 @@ func (watcher *K8sWatcher) Start() { watcher.startFunc() } } - -// FindContainer implements K8sResourceWatcher.FindContainer. -func (watcher *K8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) { - podInformer := watcher.GetInformer(podInformerName) - if podInformer == nil { - return nil, nil, false - } - indexedContainerID := containerID - if len(containerID) > containerIDLen { - indexedContainerID = containerID[:containerIDLen] - } - objs, err := podInformer.GetIndexer().ByIndex(containerIdx, indexedContainerID) - if err != nil { - return nil, nil, false - } - // If we can't find any pod indexed then fall back to the entire pod list. - // If we find more than 1 pods indexed also fall back to the entire pod list. - if len(objs) != 1 { - objs = podInformer.GetStore().List() - } - pod, cont, found := findContainer(containerID, objs) - if found { - return pod, cont, found - } - - return watcher.deletedPodCache.findContainer(indexedContainerID) -} - -// FindMirrorPod finds the mirror pod of a static pod based on the hash -// see: https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-config-hash, -// https://kubernetes.io/docs/reference/labels-annotations-taints/#kubernetes-io-config-mirror, -// https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/ -func (watcher *K8sWatcher) FindMirrorPod(hash string) (*corev1.Pod, error) { - podInformer := watcher.GetInformer(podInformerName) - if podInformer == nil { - return nil, fmt.Errorf("pod informer not initialized") - } - pods := podInformer.GetStore().List() - for i := range pods { - if pod, ok := pods[i].(*corev1.Pod); ok { - if ha, ok := pod.Annotations["kubernetes.io/config.mirror"]; ok { - if hash == ha { - return pod, nil - } - } - } - } - return nil, fmt.Errorf("static pod (hash=%s) not found", hash) -} - -func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) { - podInformer := watcher.GetInformer(podInformerName) - if podInformer == nil { - return nil, fmt.Errorf("pod informer not initialized") - } - objs, err := podInformer.GetIndexer().ByIndex(podIdx, podID) - if err != nil { - return nil, fmt.Errorf("watcher returned: %w", err) - } - if len(objs) == 1 { - if pod, ok := objs[0].(*corev1.Pod); ok { - return pod, nil - } - return nil, fmt.Errorf("unexpected type %t", objs[0]) - } - - allPods := podInformer.GetStore().List() - if pod, ok := findPod(allPods); ok { - return pod, nil - } - return nil, fmt.Errorf("unable to find pod with ID %s (index pods=%d all pods=%d)", podID, len(objs), len(allPods)) -} - -func findPod(pods []interface{}) (*corev1.Pod, bool) { - for i := range pods { - if pod, ok := pods[i].(*corev1.Pod); ok { - if pod.UID == podIdx { - return pod, true - } - } - } - - return nil, false -} - -// TODO(michi) Not the most efficient implementation. Optimize as needed. -func findContainer(containerID string, pods []interface{}) (*corev1.Pod, *corev1.ContainerStatus, bool) { - if containerID == "" { - return nil, nil, false - } - for _, obj := range pods { - pod, ok := obj.(*corev1.Pod) - if !ok { - return nil, nil, false - } - for _, container := range pod.Status.ContainerStatuses { - parts := strings.Split(container.ContainerID, "//") - if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { - return pod, &container, true - } - } - for _, container := range pod.Status.InitContainerStatuses { - parts := strings.Split(container.ContainerID, "//") - if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { - return pod, &container, true - } - } - for _, container := range pod.Status.EphemeralContainerStatuses { - parts := strings.Split(container.ContainerID, "//") - if len(parts) == 2 && strings.HasPrefix(parts[1], containerID) { - return pod, &container, true - } - } - } - return nil, nil, false -} From 3b6926b8c337bfcf010728478292b7300393514a Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 04:41:04 +0000 Subject: [PATCH 2/6] watcher: Split pod methods of K8sResourceWatcher into PodAccessor K8sResourceWatcher interface covers two distinct functionalities: setting up Kubernetes informers and accessing information about pods. These make sense as separate interfaces, so the K8sResourceWatcher methods for accessing pods are split out into a dedicated interface, PodAccessor, and setup methods - into Watcher interface. The K8sResourceWatcher interface stays in place for now, embedding both. Signed-off-by: Anna Kapuscinska --- pkg/grpc/exec/exec_test_helper.go | 2 +- pkg/process/podinfo.go | 2 +- pkg/process/process.go | 8 ++++---- pkg/rthooks/args.go | 2 +- pkg/rthooks/runner.go | 4 ++-- pkg/watcher/fake.go | 4 ++-- pkg/watcher/pod.go | 17 ++++++++++++++++- pkg/watcher/watcher.go | 13 +------------ 8 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/grpc/exec/exec_test_helper.go b/pkg/grpc/exec/exec_test_helper.go index 9abcf07d145..1f12c90f4be 100644 --- a/pkg/grpc/exec/exec_test_helper.go +++ b/pkg/grpc/exec/exec_test_helper.go @@ -382,7 +382,7 @@ func CreateAncestorEvents[EXEC notify.Message, EXIT notify.Message]( return &execMsg, &exitMsg } -func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, watcher watcher.K8sResourceWatcher) DummyNotifier[EXEC, EXIT] { +func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, watcher watcher.PodAccessor) DummyNotifier[EXEC, EXIT] { if err := process.InitCache(watcher, 65536, defaults.DefaultProcessCacheGCInterval); err != nil { t.Fatalf("failed to call process.InitCache %s", err) } diff --git a/pkg/process/podinfo.go b/pkg/process/podinfo.go index 1bc197b3321..f03b304e904 100644 --- a/pkg/process/podinfo.go +++ b/pkg/process/podinfo.go @@ -33,7 +33,7 @@ func getProbes(pod *corev1.Pod, containerStatus *corev1.ContainerStatus) ([]stri } func getPodInfo( - w watcher.K8sResourceWatcher, + w watcher.PodAccessor, containerID string, binary string, args string, diff --git a/pkg/process/process.go b/pkg/process/process.go index 7b229f81dca..dcd5bbcbf7e 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -65,14 +65,14 @@ type ProcessInternal struct { var ( procCache *Cache - k8s watcher.K8sResourceWatcher + k8s watcher.PodAccessor ) var ( ErrProcessInfoMissing = errors.New("failed process info missing") ) -func InitCache(w watcher.K8sResourceWatcher, size int, GCInterval time.Duration) error { +func InitCache(w watcher.PodAccessor, size int, GCInterval time.Duration) error { var err error if procCache != nil { @@ -553,9 +553,9 @@ func Get(execId string) (*ProcessInternal, error) { return procCache.get(execId) } -// GetK8s returns K8sResourceWatcher. You must call InitCache before calling this function to ensure +// GetK8s returns PodAccessor. You must call InitCache before calling this function to ensure // that k8s has been initialized. -func GetK8s() watcher.K8sResourceWatcher { +func GetK8s() watcher.PodAccessor { return k8s } diff --git a/pkg/rthooks/args.go b/pkg/rthooks/args.go index d87f86334e7..2f5a85d7db8 100644 --- a/pkg/rthooks/args.go +++ b/pkg/rthooks/args.go @@ -22,7 +22,7 @@ const ( type CreateContainerArg struct { Req *v1.CreateContainer - Watcher watcher.K8sResourceWatcher + Watcher watcher.PodAccessor // cached values cgroupID *uint64 diff --git a/pkg/rthooks/runner.go b/pkg/rthooks/runner.go index b674ffb7684..f63a4449f8f 100644 --- a/pkg/rthooks/runner.go +++ b/pkg/rthooks/runner.go @@ -14,7 +14,7 @@ import ( type Runner struct { callbacks []Callbacks - watcher watcher.K8sResourceWatcher + watcher watcher.PodAccessor } // RunHooks executes all registered callbacks @@ -42,7 +42,7 @@ func (r *Runner) registerCallbacks(cbs Callbacks) { } // WithWatcher sets the watcher on a runner -func (r *Runner) WithWatcher(watcher watcher.K8sResourceWatcher) *Runner { +func (r *Runner) WithWatcher(watcher watcher.PodAccessor) *Runner { r.watcher = watcher return r } diff --git a/pkg/watcher/fake.go b/pkg/watcher/fake.go index 745f9545242..b97eeb7cf9e 100644 --- a/pkg/watcher/fake.go +++ b/pkg/watcher/fake.go @@ -11,7 +11,7 @@ import ( "k8s.io/client-go/tools/cache" ) -// FakeK8sWatcher is used as an "empty" K8sResourceWatcher when --enable-k8s-api flag is not set. +// FakeK8sWatcher is used as an "empty" PodAccessor when --enable-k8s-api flag is not set. // It is also used for testing, allowing users to specify a static list of pods. type FakeK8sWatcher struct { pods []interface{} @@ -28,7 +28,7 @@ func NewFakeK8sWatcherWithPodsAndServices(pods []interface{}, services []interfa return &FakeK8sWatcher{pods, services} } -// FindContainer implements K8sResourceWatcher.FindContainer +// FindContainer implements PodAccessor.FindContainer func (watcher *FakeK8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) { return findContainer(containerID, watcher.pods) } diff --git a/pkg/watcher/pod.go b/pkg/watcher/pod.go index 5c1b7d122e5..7e05cebd423 100644 --- a/pkg/watcher/pod.go +++ b/pkg/watcher/pod.go @@ -22,6 +22,21 @@ var ( errNoPod = errors.New("object is not a *corev1.Pod") ) +type K8sResourceWatcher interface { + Watcher + PodAccessor +} + +// PodAccessor defines an interface for accessing pods from Kubernetes API. +type PodAccessor interface { + // Find a pod/container pair for the given container ID. + FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) + // Find a pod given the podID + FindPod(podID string) (*corev1.Pod, error) + // Find a mirror pod for a static pod + FindMirrorPod(hash string) (*corev1.Pod, error) +} + func containerIDKey(contID string) (string, error) { parts := strings.Split(contID, "//") if len(parts) != 2 { @@ -86,7 +101,7 @@ func podIndexFunc(obj interface{}) ([]string, error) { return nil, fmt.Errorf("podIndexFunc: %w - found %T", errNoPod, obj) } -// FindContainer implements K8sResourceWatcher.FindContainer. +// FindContainer implements PodAccessor.FindContainer. func (watcher *K8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) { podInformer := watcher.GetInformer(podInformerName) if podInformer == nil { diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 763fe4decae..10ba036f22d 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -11,7 +11,6 @@ import ( "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/podhooks" "github.com/cilium/tetragon/pkg/reader/node" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -19,22 +18,12 @@ import ( "k8s.io/client-go/tools/cache" ) -// K8sResourceWatcher defines an interface for accessing various resources from Kubernetes API. -type K8sResourceWatcher interface { +type Watcher interface { AddInformers(factory InternalSharedInformerFactory, infs ...*InternalInformer) GetInformer(name string) cache.SharedIndexInformer Start() - - // Find a pod/container pair for the given container ID. - FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) - - // Find a pod given the podID - FindPod(podID string) (*corev1.Pod, error) - // Find a mirror pod for a static pod - FindMirrorPod(hash string) (*corev1.Pod, error) } -// K8sWatcher maintains a local cache of k8s resources. type K8sWatcher struct { informers map[string]cache.SharedIndexInformer startFunc func() From a9ab056b674600bfd439e13e8926b9579a71a8d4 Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 14:45:10 +0000 Subject: [PATCH 3/6] watcher: Refactor K8sWatcher to allow reusing SharedInformerFactory The previous K8sWatcher implementation allowed extending it with extra informers via AddInformers method, but didn't provide a way to reuse shared informer factories between AddInformers calls. That's wasteful, as the whole point of a factory is to be reused. This commit adds shared informer factories to the K8sWatcher struct: one for global k8s built-in resources, one for node-local k8s built-in resources, and one for CRDs. They are initialized in NewK8sWatcher function. The AddInformers method is replaced by AddInformer, which simply adds a single informer with indexers to the list. The Start method starts all informer factories - thanks to storing them, there is no more need to dynamically extend a start function. Initialization of the pod informer is moved out of NewK8sWatcher to a dedicated function called from main, AddPodInformer. The idea is that K8sWatcher provides a config and interface for adding informers, and different packages can use it to watch specific resourced. A following commit will use this functionality for watching TracingPolicy CRD. Signed-off-by: Anna Kapuscinska --- cmd/tetragon/main.go | 18 ++- .../observer_test_helper.go | 3 +- pkg/process/podinfo_test.go | 7 +- pkg/watcher/fake.go | 5 +- pkg/watcher/pod.go | 36 +++++ pkg/watcher/pod_test.go | 10 +- pkg/watcher/watcher.go | 147 +++++++----------- 7 files changed, 123 insertions(+), 103 deletions(-) diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index 6a00193f648..98aaf63e063 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -63,6 +63,7 @@ import ( "github.com/cilium/lumberjack/v2" "github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1" + "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" gops "github.com/google/gops/agent" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -403,20 +404,30 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu // Probe runtime configuration and do not fail on errors obs.UpdateRuntimeConf(option.Config.BpfDir) + // Initialize K8s watcher var k8sWatcher watcher.K8sResourceWatcher if option.Config.EnableK8s { log.Info("Enabling Kubernetes API") + // retrieve k8s clients config, err := k8sconf.K8sConfig() if err != nil { return err } - if err := waitCRDs(config); err != nil { return err } - k8sClient := kubernetes.NewForConfigOrDie(config) - k8sWatcher, err = watcher.NewK8sWatcher(k8sClient, 60*time.Second) + crdClient := versioned.NewForConfigOrDie(config) + + // create k8s watcher + k8sWatcher = watcher.NewK8sWatcher(k8sClient, crdClient, 60*time.Second) + + // add informers for all resources + // NB(anna): To add a pod informer, we need to pass the underlying + // struct, not just the interface, because the function initializes + // a cache inside this struct. This can be refactored if needed. + realK8sWatcher := k8sWatcher.(*watcher.K8sWatcher) + err = watcher.AddPodInformer(realK8sWatcher, true) if err != nil { return err } @@ -424,6 +435,7 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu log.Info("Disabling Kubernetes API") k8sWatcher = watcher.NewFakeK8sWatcher(nil) } + // start k8s watcher k8sWatcher.Start() pcGCInterval := option.Config.ProcessCacheGCInterval diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index 63fd315b181..b10e2d0af61 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -576,7 +576,8 @@ func (f *fakeK8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1 return &pod, &container, true } -func (f *fakeK8sWatcher) AddInformers(_ watcher.InternalSharedInformerFactory, _ ...*watcher.InternalInformer) { +func (f *fakeK8sWatcher) AddInformer(_ string, _ cache.SharedIndexInformer, _ cache.Indexers) error { + return nil } func (f *fakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer { diff --git a/pkg/process/podinfo_test.go b/pkg/process/podinfo_test.go index 6a87a248166..66f06f17621 100644 --- a/pkg/process/podinfo_test.go +++ b/pkg/process/podinfo_test.go @@ -50,11 +50,12 @@ func TestK8sWatcher_GetPodInfo(t *testing.T) { } k8sClient := fake.NewSimpleClientset(&pod) - watcher, err := watcher.NewK8sWatcher(k8sClient, time.Hour) + k8sWatcher := watcher.NewK8sWatcher(k8sClient, nil, time.Hour) + err := watcher.AddPodInformer(k8sWatcher, true) require.NoError(t, err) - watcher.Start() + k8sWatcher.Start() pid := uint32(1) - podInfo := getPodInfo(watcher, "abcd1234", "curl", "cilium.io", 1) + podInfo := getPodInfo(k8sWatcher, "abcd1234", "curl", "cilium.io", 1) assert.True(t, proto.Equal(podInfo, &tetragon.Pod{ Namespace: pod.Namespace, Workload: pod.OwnerReferences[0].Name, diff --git a/pkg/watcher/fake.go b/pkg/watcher/fake.go index b97eeb7cf9e..4f10f7c55e5 100644 --- a/pkg/watcher/fake.go +++ b/pkg/watcher/fake.go @@ -18,7 +18,7 @@ type FakeK8sWatcher struct { services []interface{} } -// NewK8sWatcher returns a pointer to an initialized FakeK8sWatcher struct. +// NewFakeK8sWatcher returns a pointer to an initialized FakeK8sWatcher struct. func NewFakeK8sWatcher(pods []interface{}) *FakeK8sWatcher { return NewFakeK8sWatcherWithPodsAndServices(pods, nil) } @@ -64,7 +64,8 @@ func (watcher *FakeK8sWatcher) ClearAllServices() { watcher.services = nil } -func (watcher *FakeK8sWatcher) AddInformers(_ InternalSharedInformerFactory, _ ...*InternalInformer) { +func (watcher *FakeK8sWatcher) AddInformer(_ string, _ cache.SharedIndexInformer, _ cache.Indexers) error { + return nil } func (watcher *FakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer { diff --git a/pkg/watcher/pod.go b/pkg/watcher/pod.go index 7e05cebd423..d73e97f200b 100644 --- a/pkg/watcher/pod.go +++ b/pkg/watcher/pod.go @@ -9,6 +9,9 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + + "github.com/cilium/tetragon/pkg/podhooks" ) const ( @@ -37,6 +40,39 @@ type PodAccessor interface { FindMirrorPod(hash string) (*corev1.Pod, error) } +func AddPodInformer(w *K8sWatcher, local bool) error { + if w == nil { + return fmt.Errorf("k8s watcher not initialized") + } + factory := w.K8sInformerFactory + if local { + factory = w.LocalK8sInformerFactory + } + if factory == nil { + return fmt.Errorf("k8s informer factory not initialized") + } + + // initialize deleted pod cache + var err error + w.deletedPodCache, err = newDeletedPodCache() + if err != nil { + return fmt.Errorf("failed to initialize deleted pod cache: %w", err) + } + + // add informer to the watcher + informer := factory.Core().V1().Pods().Informer() + w.AddInformer(podInformerName, informer, map[string]cache.IndexFunc{ + containerIdx: containerIndexFunc, + podIdx: podIndexFunc, + }) + + // add event handlers to the informer + informer.AddEventHandler(w.deletedPodCache.eventHandler()) + podhooks.InstallHooks(informer) + + return nil +} + func containerIDKey(contID string) (string, error) { parts := strings.Split(contID, "//") if len(parts) != 2 { diff --git a/pkg/watcher/pod_test.go b/pkg/watcher/pod_test.go index cd020fda7c0..57f0eb5cfd3 100644 --- a/pkg/watcher/pod_test.go +++ b/pkg/watcher/pod_test.go @@ -19,7 +19,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" k8sfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" @@ -70,12 +69,11 @@ func TestFastK8s(t *testing.T) { }) // We will create an informer that writes added pods to a channel. - informerFactory := informers.NewSharedInformerFactory(client, 0) - podInformer := informerFactory.Core().V1().Pods().Informer() - podInformer.AddEventHandler(ts.eventHandler()) - - watcher, err := newK8sWatcher(informerFactory) + watcher := NewK8sWatcher(client, nil, 0) + err := AddPodInformer(watcher, false) require.Nil(t, err) + podInformer := watcher.GetInformer(podInformerName) + podInformer.AddEventHandler(ts.eventHandler()) watcher.Start() // This is not required in tests, but it serves as a proof-of-concept by diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 10ba036f22d..2ce0363a0c4 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -5,124 +5,95 @@ package watcher import ( "fmt" - "reflect" "time" - "github.com/cilium/tetragon/pkg/logger" - "github.com/cilium/tetragon/pkg/podhooks" - "github.com/cilium/tetragon/pkg/reader/node" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + + "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" + "github.com/cilium/tetragon/pkg/logger" + "github.com/cilium/tetragon/pkg/reader/node" ) type Watcher interface { - AddInformers(factory InternalSharedInformerFactory, infs ...*InternalInformer) + AddInformer(name string, informer cache.SharedIndexInformer, indexers cache.Indexers) error GetInformer(name string) cache.SharedIndexInformer Start() } type K8sWatcher struct { - informers map[string]cache.SharedIndexInformer - startFunc func() - deletedPodCache *deletedPodCache + K8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources + LocalK8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources local to the node + CRDInformerFactory externalversions.SharedInformerFactory // for Tetragon CRDs + informers map[string]cache.SharedIndexInformer + deletedPodCache *deletedPodCache } -type InternalSharedInformerFactory interface { - Start(stopCh <-chan struct{}) - WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool -} +// NewK8sWatcher creates a new K8sWatcher with initialized informer factories. +func NewK8sWatcher( + k8sClient kubernetes.Interface, crdClient versioned.Interface, stateSyncIntervalSec time.Duration, +) *K8sWatcher { + var k8sInformerFactory, localK8sInformerFactory informers.SharedInformerFactory + var crdInformerFactory externalversions.SharedInformerFactory + + if k8sClient != nil { + k8sInformerFactory = informers.NewSharedInformerFactory(k8sClient, stateSyncIntervalSec) + localK8sInformerFactory = informers.NewSharedInformerFactoryWithOptions( + k8sClient, stateSyncIntervalSec, informers.WithTweakListOptions( + func(options *metav1.ListOptions) { + // watch local pods only + options.FieldSelector = "spec.nodeName=" + node.GetNodeNameForExport() + })) + } + if crdClient != nil { + crdInformerFactory = externalversions.NewSharedInformerFactory(crdClient, stateSyncIntervalSec) + } -type InternalInformer struct { - Name string - Informer cache.SharedIndexInformer - Indexers cache.Indexers + return &K8sWatcher{ + K8sInformerFactory: k8sInformerFactory, + LocalK8sInformerFactory: localK8sInformerFactory, + CRDInformerFactory: crdInformerFactory, + informers: make(map[string]cache.SharedIndexInformer), + } } -func newK8sWatcher( - informerFactory informers.SharedInformerFactory, -) (*K8sWatcher, error) { +func (w *K8sWatcher) AddInformer(name string, informer cache.SharedIndexInformer, indexers cache.Indexers) error { + w.informers[name] = informer - deletedPodCache, err := newDeletedPodCache() + err := informer.AddIndexers(indexers) if err != nil { - return nil, fmt.Errorf("failed to initialize deleted pod cache: %w", err) + return fmt.Errorf("failed to add indexers: %w", err) } - k8sWatcher := &K8sWatcher{ - informers: make(map[string]cache.SharedIndexInformer), - startFunc: func() {}, - deletedPodCache: deletedPodCache, - } - - podInformer := informerFactory.Core().V1().Pods().Informer() - k8sWatcher.AddInformers(informerFactory, &InternalInformer{ - Name: podInformerName, - Informer: podInformer, - Indexers: map[string]cache.IndexFunc{ - containerIdx: containerIndexFunc, - podIdx: podIndexFunc, - }, - }) - podInformer.AddEventHandler(k8sWatcher.deletedPodCache.eventHandler()) - podhooks.InstallHooks(podInformer) - - return k8sWatcher, nil + return nil } -// NewK8sWatcher returns a pointer to an initialized K8sWatcher struct. -func NewK8sWatcher(k8sClient kubernetes.Interface, stateSyncIntervalSec time.Duration) (*K8sWatcher, error) { - nodeName := node.GetNodeNameForExport() - if nodeName == "" { - logger.GetLogger().Warn("env var NODE_NAME not specified, K8s watcher will not work as expected") - } - - informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sClient, stateSyncIntervalSec, - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - // Watch local pods only. - options.FieldSelector = "spec.nodeName=" + nodeName - })) - - return newK8sWatcher(informerFactory) +func (w *K8sWatcher) GetInformer(name string) cache.SharedIndexInformer { + return w.informers[name] } -func (watcher *K8sWatcher) AddInformers(factory InternalSharedInformerFactory, infs ...*InternalInformer) { - if watcher.startFunc == nil { - watcher.startFunc = func() {} +func (w *K8sWatcher) Start() { + if w.K8sInformerFactory != nil { + w.K8sInformerFactory.Start(wait.NeverStop) + w.K8sInformerFactory.WaitForCacheSync(wait.NeverStop) } - // Add informers - for _, inf := range infs { - watcher.informers[inf.Name] = inf.Informer - oldStart := watcher.startFunc - watcher.startFunc = func() { - oldStart() - err := inf.Informer.AddIndexers(inf.Indexers) - if err != nil { - // Panic during setup since this should never fail, if it fails is a - // developer mistake. - panic(err) - } - } + if w.LocalK8sInformerFactory != nil { + w.LocalK8sInformerFactory.Start(wait.NeverStop) + w.LocalK8sInformerFactory.WaitForCacheSync(wait.NeverStop) } - // Start the informer factory - oldStart := watcher.startFunc - watcher.startFunc = func() { - oldStart() - factory.Start(wait.NeverStop) - factory.WaitForCacheSync(wait.NeverStop) - for name, informer := range watcher.informers { - logger.GetLogger().WithField("informer", name).WithField("count", len(informer.GetStore().ListKeys())).Info("Initialized informer cache") - } + if w.CRDInformerFactory != nil { + w.CRDInformerFactory.Start(wait.NeverStop) + w.CRDInformerFactory.WaitForCacheSync(wait.NeverStop) } -} - -func (watcher *K8sWatcher) GetInformer(name string) cache.SharedIndexInformer { - return watcher.informers[name] -} - -func (watcher *K8sWatcher) Start() { - if watcher.startFunc != nil { - watcher.startFunc() + for name, informer := range w.informers { + logger.GetLogger().WithFields(logrus.Fields{ + "informer": name, + "count": len(informer.GetStore().ListKeys()), + }).Info("Initialized informer cache") } } From c1afe17087f7a9343d171ba725ab50381a5a1551 Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 15:06:27 +0000 Subject: [PATCH 4/6] watcher: Rename crd package to crdwatcher Let's use a more descriptive name. Also renamed crd.go to tracingpolicy.go. There are no functional changes in this commit. Signed-off-by: Anna Kapuscinska --- cmd/tetragon/main.go | 4 ++-- .../observertesthelper/observer_test_helper.go | 4 ++-- .../{crd/watcher.go => crdwatcher/tracingpolicy.go} | 11 ++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) rename pkg/watcher/{crd/watcher.go => crdwatcher/tracingpolicy.go} (99%) diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index 98aaf63e063..eb138e06f09 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -56,7 +56,7 @@ import ( "github.com/cilium/tetragon/pkg/version" "github.com/cilium/tetragon/pkg/watcher" k8sconf "github.com/cilium/tetragon/pkg/watcher/conf" - "github.com/cilium/tetragon/pkg/watcher/crd" + "github.com/cilium/tetragon/pkg/watcher/crdwatcher" // Imported to allow sensors to be initialized inside init(). _ "github.com/cilium/tetragon/pkg/sensors" @@ -505,7 +505,7 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu obs.AddListener(pm) saveInitInfo() if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD { - go crd.WatchTracePolicy(ctx, observer.GetSensorManager()) + go crdwatcher.WatchTracePolicy(ctx, observer.GetSensorManager()) } obs.LogPinnedBpf(observerDir) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index b10e2d0af61..bad41306201 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -44,7 +44,7 @@ import ( "github.com/cilium/tetragon/pkg/sensors/exec/procevents" "github.com/cilium/tetragon/pkg/testutils" "github.com/cilium/tetragon/pkg/watcher" - "github.com/cilium/tetragon/pkg/watcher/crd" + "github.com/cilium/tetragon/pkg/watcher/crdwatcher" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -389,7 +389,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op }) if oo.crd { - crd.WatchTracePolicy(ctx, sensorManager) + crdwatcher.WatchTracePolicy(ctx, sensorManager) } if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil { diff --git a/pkg/watcher/crd/watcher.go b/pkg/watcher/crdwatcher/tracingpolicy.go similarity index 99% rename from pkg/watcher/crd/watcher.go rename to pkg/watcher/crdwatcher/tracingpolicy.go index 0c033d1f7bd..42a50316a5b 100644 --- a/pkg/watcher/crd/watcher.go +++ b/pkg/watcher/crdwatcher/tracingpolicy.go @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Authors of Tetragon -package crd +package crdwatcher import ( "context" @@ -9,6 +9,11 @@ import ( "strings" "sync" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1" "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" @@ -16,10 +21,6 @@ import ( "github.com/cilium/tetragon/pkg/sensors" "github.com/cilium/tetragon/pkg/tracingpolicy" k8sconf "github.com/cilium/tetragon/pkg/watcher/conf" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" ) // Log "missing tracing policy" message once. From 8a3df6e78522a83ea59f864aab838da1345ea7b2 Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 16:38:01 +0000 Subject: [PATCH 5/6] watcher: Implement getters for informer factories Unexport informer factories in the K8sWatcher struct and implement exported Get* methods instead. This will make it easier to reuse them and test. Signed-off-by: Anna Kapuscinska --- .../observer_test_helper.go | 14 ++++++ pkg/watcher/fake.go | 15 +++++++ pkg/watcher/pod.go | 4 +- pkg/watcher/watcher.go | 45 ++++++++++++------- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index bad41306201..a398fe217a4 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -22,6 +22,7 @@ import ( "github.com/cilium/tetragon/pkg/cgrouprate" "github.com/cilium/tetragon/pkg/defaults" "github.com/cilium/tetragon/pkg/encoder" + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/metricsconfig" "github.com/cilium/tetragon/pkg/observer" "github.com/cilium/tetragon/pkg/policyfilter" @@ -49,6 +50,7 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) @@ -586,6 +588,18 @@ func (f *fakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer { func (f *fakeK8sWatcher) Start() {} +func (f *fakeK8sWatcher) GetK8sInformerFactory() informers.SharedInformerFactory { + return nil +} + +func (f *fakeK8sWatcher) GetLocalK8sInformerFactory() informers.SharedInformerFactory { + return nil +} + +func (f *fakeK8sWatcher) GetCRDInformerFactory() externalversions.SharedInformerFactory { + return nil +} + // Used to wait for a process to start, we do a lookup on PROCFS because this // may be called before obs is created. func WaitForProcess(process string) error { diff --git a/pkg/watcher/fake.go b/pkg/watcher/fake.go index 4f10f7c55e5..b5d6e40eb36 100644 --- a/pkg/watcher/fake.go +++ b/pkg/watcher/fake.go @@ -8,7 +8,10 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" ) // FakeK8sWatcher is used as an "empty" PodAccessor when --enable-k8s-api flag is not set. @@ -73,3 +76,15 @@ func (watcher *FakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer { } func (watcher *FakeK8sWatcher) Start() {} + +func (watcher *FakeK8sWatcher) GetK8sInformerFactory() informers.SharedInformerFactory { + return nil +} + +func (watcher *FakeK8sWatcher) GetLocalK8sInformerFactory() informers.SharedInformerFactory { + return nil +} + +func (watcher *FakeK8sWatcher) GetCRDInformerFactory() externalversions.SharedInformerFactory { + return nil +} diff --git a/pkg/watcher/pod.go b/pkg/watcher/pod.go index d73e97f200b..09c7b14d21b 100644 --- a/pkg/watcher/pod.go +++ b/pkg/watcher/pod.go @@ -44,9 +44,9 @@ func AddPodInformer(w *K8sWatcher, local bool) error { if w == nil { return fmt.Errorf("k8s watcher not initialized") } - factory := w.K8sInformerFactory + factory := w.GetK8sInformerFactory() if local { - factory = w.LocalK8sInformerFactory + factory = w.GetLocalK8sInformerFactory() } if factory == nil { return fmt.Errorf("k8s informer factory not initialized") diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 2ce0363a0c4..c0ba8302258 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -24,12 +24,15 @@ type Watcher interface { AddInformer(name string, informer cache.SharedIndexInformer, indexers cache.Indexers) error GetInformer(name string) cache.SharedIndexInformer Start() + GetK8sInformerFactory() informers.SharedInformerFactory + GetLocalK8sInformerFactory() informers.SharedInformerFactory + GetCRDInformerFactory() externalversions.SharedInformerFactory } type K8sWatcher struct { - K8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources - LocalK8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources local to the node - CRDInformerFactory externalversions.SharedInformerFactory // for Tetragon CRDs + k8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources + localK8sInformerFactory informers.SharedInformerFactory // for k8s built-in resources local to the node + crdInformerFactory externalversions.SharedInformerFactory // for Tetragon CRDs informers map[string]cache.SharedIndexInformer deletedPodCache *deletedPodCache } @@ -55,9 +58,9 @@ func NewK8sWatcher( } return &K8sWatcher{ - K8sInformerFactory: k8sInformerFactory, - LocalK8sInformerFactory: localK8sInformerFactory, - CRDInformerFactory: crdInformerFactory, + k8sInformerFactory: k8sInformerFactory, + localK8sInformerFactory: localK8sInformerFactory, + crdInformerFactory: crdInformerFactory, informers: make(map[string]cache.SharedIndexInformer), } } @@ -78,17 +81,17 @@ func (w *K8sWatcher) GetInformer(name string) cache.SharedIndexInformer { } func (w *K8sWatcher) Start() { - if w.K8sInformerFactory != nil { - w.K8sInformerFactory.Start(wait.NeverStop) - w.K8sInformerFactory.WaitForCacheSync(wait.NeverStop) + if w.k8sInformerFactory != nil { + w.k8sInformerFactory.Start(wait.NeverStop) + w.k8sInformerFactory.WaitForCacheSync(wait.NeverStop) } - if w.LocalK8sInformerFactory != nil { - w.LocalK8sInformerFactory.Start(wait.NeverStop) - w.LocalK8sInformerFactory.WaitForCacheSync(wait.NeverStop) + if w.localK8sInformerFactory != nil { + w.localK8sInformerFactory.Start(wait.NeverStop) + w.localK8sInformerFactory.WaitForCacheSync(wait.NeverStop) } - if w.CRDInformerFactory != nil { - w.CRDInformerFactory.Start(wait.NeverStop) - w.CRDInformerFactory.WaitForCacheSync(wait.NeverStop) + if w.crdInformerFactory != nil { + w.crdInformerFactory.Start(wait.NeverStop) + w.crdInformerFactory.WaitForCacheSync(wait.NeverStop) } for name, informer := range w.informers { logger.GetLogger().WithFields(logrus.Fields{ @@ -97,3 +100,15 @@ func (w *K8sWatcher) Start() { }).Info("Initialized informer cache") } } + +func (w *K8sWatcher) GetK8sInformerFactory() informers.SharedInformerFactory { + return w.k8sInformerFactory +} + +func (w *K8sWatcher) GetLocalK8sInformerFactory() informers.SharedInformerFactory { + return w.localK8sInformerFactory +} + +func (w *K8sWatcher) GetCRDInformerFactory() externalversions.SharedInformerFactory { + return w.crdInformerFactory +} From a051c49acb4b19ea4202abbab664a4b1387238ac Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Mon, 17 Feb 2025 21:32:13 +0000 Subject: [PATCH 6/6] crdwatcher: Reuse watcher.K8sWatcher to watch TracingPolicy Before, to watch TracingPolicy(Namespaced) custom resources, we were reading Kubernetes config, creating an informer factory and starting it, all in the WatchTracePolicy function. As now the watcher.K8sWatcher struct initializes informer factories, we can reuse it instead of configuring everything from scratch. This commit replaces the WatchTracePolicy function with AddTracingPolicyInformer, which only adds informers to the K8sWatcher. Initialization and start of the K8sWatcher happen in main. Signed-off-by: Anna Kapuscinska --- cmd/tetragon/main.go | 9 +++-- .../observer_test_helper.go | 35 +++++++++--------- pkg/watcher/crdwatcher/tracingpolicy.go | 36 +++++++++++-------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index eb138e06f09..47afb25281c 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -431,6 +431,12 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu if err != nil { return err } + if option.Config.EnableTracingPolicyCRD { + err := crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, observer.GetSensorManager()) + if err != nil { + return err + } + } } else { log.Info("Disabling Kubernetes API") k8sWatcher = watcher.NewFakeK8sWatcher(nil) @@ -504,9 +510,6 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu log.WithField("enabled", option.Config.ExportFilename != "").WithField("fileName", option.Config.ExportFilename).Info("Exporter configuration") obs.AddListener(pm) saveInitInfo() - if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD { - go crdwatcher.WatchTracePolicy(ctx, observer.GetSensorManager()) - } obs.LogPinnedBpf(observerDir) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index a398fe217a4..0dcf1c07409 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -19,24 +19,28 @@ import ( "testing" "time" - "github.com/cilium/tetragon/pkg/cgrouprate" - "github.com/cilium/tetragon/pkg/defaults" - "github.com/cilium/tetragon/pkg/encoder" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" - "github.com/cilium/tetragon/pkg/metricsconfig" - "github.com/cilium/tetragon/pkg/observer" - "github.com/cilium/tetragon/pkg/policyfilter" - "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/bpf" "github.com/cilium/tetragon/pkg/btf" "github.com/cilium/tetragon/pkg/bugtool" + "github.com/cilium/tetragon/pkg/cgrouprate" + "github.com/cilium/tetragon/pkg/defaults" + "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/exporter" tetragonGrpc "github.com/cilium/tetragon/pkg/grpc" + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" + "github.com/cilium/tetragon/pkg/metricsconfig" + "github.com/cilium/tetragon/pkg/observer" "github.com/cilium/tetragon/pkg/option" + "github.com/cilium/tetragon/pkg/policyfilter" "github.com/cilium/tetragon/pkg/process" "github.com/cilium/tetragon/pkg/reader/namespace" "github.com/cilium/tetragon/pkg/rthooks" @@ -44,14 +48,9 @@ import ( "github.com/cilium/tetragon/pkg/sensors/base" "github.com/cilium/tetragon/pkg/sensors/exec/procevents" "github.com/cilium/tetragon/pkg/testutils" + "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/watcher" "github.com/cilium/tetragon/pkg/watcher/crdwatcher" - - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" ) var ( @@ -374,7 +373,7 @@ func getDefaultSensors(tb testing.TB, initialSensor *sensors.Sensor, opts ...Tes } func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, opts *testExporterOptions, oo *testObserverOptions) error { - watcher := opts.watcher + k8sWatcher := opts.watcher processCacheSize := 32768 dataCacheSize := 1024 procCacheGCInterval := defaults.DefaultProcessCacheGCInterval @@ -391,7 +390,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op }) if oo.crd { - crdwatcher.WatchTracePolicy(ctx, sensorManager) + crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, sensorManager) } if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil { @@ -402,7 +401,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op procCacheGCInterval = oo.procCacheGCInterval } - if err := process.InitCache(watcher, processCacheSize, procCacheGCInterval); err != nil { + if err := process.InitCache(k8sWatcher, processCacheSize, procCacheGCInterval); err != nil { return err } @@ -413,7 +412,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op var cancelWg sync.WaitGroup // use an empty hooks runner - hookRunner := (&rthooks.Runner{}).WithWatcher(watcher) + hookRunner := (&rthooks.Runner{}).WithWatcher(k8sWatcher) // For testing we disable the eventcache and cilium cache by default. If we // enable these then every tests would need to wait for the 1.5 mimutes needed diff --git a/pkg/watcher/crdwatcher/tracingpolicy.go b/pkg/watcher/crdwatcher/tracingpolicy.go index 42a50316a5b..a5ef31c5270 100644 --- a/pkg/watcher/crdwatcher/tracingpolicy.go +++ b/pkg/watcher/crdwatcher/tracingpolicy.go @@ -11,16 +11,13 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1" - "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/sensors" "github.com/cilium/tetragon/pkg/tracingpolicy" - k8sconf "github.com/cilium/tetragon/pkg/watcher/conf" + "github.com/cilium/tetragon/pkg/watcher" ) // Log "missing tracing policy" message once. @@ -182,16 +179,18 @@ func updateTracingPolicy(ctx context.Context, log logrus.FieldLogger, s *sensors } } -func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { +func AddTracingPolicyInformer(ctx context.Context, w watcher.Watcher, s *sensors.Manager) error { log := logger.GetLogger() - conf, err := k8sconf.K8sConfig() - if err != nil { - log.WithError(err).Fatal("couldn't get cluster config") + if w == nil { + return fmt.Errorf("k8s watcher not initialized") + } + factory := w.GetCRDInformerFactory() + if factory == nil { + return fmt.Errorf("CRD informer factory not initialized") } - client := versioned.NewForConfigOrDie(conf) - factory := externalversions.NewSharedInformerFactory(client, 0) - factory.Cilium().V1alpha1().TracingPolicies().Informer().AddEventHandler( + tpInformer := factory.Cilium().V1alpha1().TracingPolicies().Informer() + tpInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -202,8 +201,13 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err := w.AddInformer("TracingPolicy", tpInformer, nil) + if err != nil { + return fmt.Errorf("failed to add TracingPolicy informer: %w", err) + } - factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer().AddEventHandler( + tpnInformer := factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer() + tpnInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -214,8 +218,10 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err = w.AddInformer("TracingPolicyNamespaced", tpnInformer, nil) + if err != nil { + return fmt.Errorf("failed to add TracingPolicyNamespaced informer: %w", err) + } - go factory.Start(wait.NeverStop) - factory.WaitForCacheSync(wait.NeverStop) - log.Info("Started watching tracing policies") + return nil }