Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watcher: Refactor K8sWatcher to reuse config and factories #3413

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ import (
"github.com/cilium/tetragon/pkg/version"
"github.com/cilium/tetragon/pkg/watcher"
k8sconf "github.com/cilium/tetragon/pkg/watcher/conf"
"github.com/cilium/tetragon/pkg/watcher/crd"
"github.com/cilium/tetragon/pkg/watcher/crdwatcher"

// Imported to allow sensors to be initialized inside init().
_ "github.com/cilium/tetragon/pkg/sensors"

"github.com/cilium/lumberjack/v2"
"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
"github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned"
gops "github.com/google/gops/agent"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -403,27 +404,44 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
// Probe runtime configuration and do not fail on errors
obs.UpdateRuntimeConf(option.Config.BpfDir)

// Initialize K8s watcher
var k8sWatcher watcher.K8sResourceWatcher
if option.Config.EnableK8s {
log.Info("Enabling Kubernetes API")
// retrieve k8s clients
config, err := k8sconf.K8sConfig()
if err != nil {
return err
}

if err := waitCRDs(config); err != nil {
return err
}

k8sClient := kubernetes.NewForConfigOrDie(config)
k8sWatcher, err = watcher.NewK8sWatcher(k8sClient, 60*time.Second)
crdClient := versioned.NewForConfigOrDie(config)

// create k8s watcher
k8sWatcher = watcher.NewK8sWatcher(k8sClient, crdClient, 60*time.Second)

// add informers for all resources
// NB(anna): To add a pod informer, we need to pass the underlying
// struct, not just the interface, because the function initializes
// a cache inside this struct. This can be refactored if needed.
realK8sWatcher := k8sWatcher.(*watcher.K8sWatcher)
err = watcher.AddPodInformer(realK8sWatcher, true)
if err != nil {
return err
}
if option.Config.EnableTracingPolicyCRD {
err := crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, observer.GetSensorManager())
if err != nil {
return err
}
}
} else {
log.Info("Disabling Kubernetes API")
k8sWatcher = watcher.NewFakeK8sWatcher(nil)
}
// start k8s watcher
k8sWatcher.Start()

pcGCInterval := option.Config.ProcessCacheGCInterval
Expand Down Expand Up @@ -492,9 +510,6 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
log.WithField("enabled", option.Config.ExportFilename != "").WithField("fileName", option.Config.ExportFilename).Info("Exporter configuration")
obs.AddListener(pm)
saveInitInfo()
if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD {
go crd.WatchTracePolicy(ctx, observer.GetSensorManager())
}

obs.LogPinnedBpf(observerDir)

Expand Down
2 changes: 1 addition & 1 deletion pkg/grpc/exec/exec_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func CreateAncestorEvents[EXEC notify.Message, EXIT notify.Message](
return &execMsg, &exitMsg
}

func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, watcher watcher.K8sResourceWatcher) DummyNotifier[EXEC, EXIT] {
func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, watcher watcher.PodAccessor) DummyNotifier[EXEC, EXIT] {
if err := process.InitCache(watcher, 65536, defaults.DefaultProcessCacheGCInterval); err != nil {
t.Fatalf("failed to call process.InitCache %s", err)
}
Expand Down
50 changes: 32 additions & 18 deletions pkg/observer/observertesthelper/observer_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,38 @@ import (
"testing"
"time"

"github.com/cilium/tetragon/pkg/cgrouprate"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/metricsconfig"
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/policyfilter"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/bpf"
"github.com/cilium/tetragon/pkg/btf"
"github.com/cilium/tetragon/pkg/bugtool"
"github.com/cilium/tetragon/pkg/cgrouprate"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/exporter"
tetragonGrpc "github.com/cilium/tetragon/pkg/grpc"
"github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/metricsconfig"
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/policyfilter"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/namespace"
"github.com/cilium/tetragon/pkg/rthooks"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/sensors/base"
"github.com/cilium/tetragon/pkg/sensors/exec/procevents"
"github.com/cilium/tetragon/pkg/testutils"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/watcher"
"github.com/cilium/tetragon/pkg/watcher/crd"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"github.com/cilium/tetragon/pkg/watcher/crdwatcher"
)

var (
Expand Down Expand Up @@ -372,7 +373,7 @@ func getDefaultSensors(tb testing.TB, initialSensor *sensors.Sensor, opts ...Tes
}

func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, opts *testExporterOptions, oo *testObserverOptions) error {
watcher := opts.watcher
k8sWatcher := opts.watcher
processCacheSize := 32768
dataCacheSize := 1024
procCacheGCInterval := defaults.DefaultProcessCacheGCInterval
Expand All @@ -389,7 +390,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
})

if oo.crd {
crd.WatchTracePolicy(ctx, sensorManager)
crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, sensorManager)
}

if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil {
Expand All @@ -400,7 +401,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
procCacheGCInterval = oo.procCacheGCInterval
}

if err := process.InitCache(watcher, processCacheSize, procCacheGCInterval); err != nil {
if err := process.InitCache(k8sWatcher, processCacheSize, procCacheGCInterval); err != nil {
return err
}

Expand All @@ -411,7 +412,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
var cancelWg sync.WaitGroup

// use an empty hooks runner
hookRunner := (&rthooks.Runner{}).WithWatcher(watcher)
hookRunner := (&rthooks.Runner{}).WithWatcher(k8sWatcher)

// For testing we disable the eventcache and cilium cache by default. If we
// enable these then every tests would need to wait for the 1.5 mimutes needed
Expand Down Expand Up @@ -576,7 +577,8 @@ func (f *fakeK8sWatcher) FindContainer(containerID string) (*corev1.Pod, *corev1
return &pod, &container, true
}

func (f *fakeK8sWatcher) AddInformers(_ watcher.InternalSharedInformerFactory, _ ...*watcher.InternalInformer) {
func (f *fakeK8sWatcher) AddInformer(_ string, _ cache.SharedIndexInformer, _ cache.Indexers) error {
return nil
}

func (f *fakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer {
Expand All @@ -585,6 +587,18 @@ func (f *fakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer {

func (f *fakeK8sWatcher) Start() {}

func (f *fakeK8sWatcher) GetK8sInformerFactory() informers.SharedInformerFactory {
return nil
}

func (f *fakeK8sWatcher) GetLocalK8sInformerFactory() informers.SharedInformerFactory {
return nil
}

func (f *fakeK8sWatcher) GetCRDInformerFactory() externalversions.SharedInformerFactory {
return nil
}

// Used to wait for a process to start, we do a lookup on PROCFS because this
// may be called before obs is created.
func WaitForProcess(process string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/podinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func getProbes(pod *corev1.Pod, containerStatus *corev1.ContainerStatus) ([]stri
}

func getPodInfo(
w watcher.K8sResourceWatcher,
w watcher.PodAccessor,
containerID string,
binary string,
args string,
Expand Down
7 changes: 4 additions & 3 deletions pkg/process/podinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func TestK8sWatcher_GetPodInfo(t *testing.T) {
}

k8sClient := fake.NewSimpleClientset(&pod)
watcher, err := watcher.NewK8sWatcher(k8sClient, time.Hour)
k8sWatcher := watcher.NewK8sWatcher(k8sClient, nil, time.Hour)
err := watcher.AddPodInformer(k8sWatcher, true)
require.NoError(t, err)
watcher.Start()
k8sWatcher.Start()
pid := uint32(1)
podInfo := getPodInfo(watcher, "abcd1234", "curl", "cilium.io", 1)
podInfo := getPodInfo(k8sWatcher, "abcd1234", "curl", "cilium.io", 1)
assert.True(t, proto.Equal(podInfo, &tetragon.Pod{
Namespace: pod.Namespace,
Workload: pod.OwnerReferences[0].Name,
Expand Down
8 changes: 4 additions & 4 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ type ProcessInternal struct {

var (
procCache *Cache
k8s watcher.K8sResourceWatcher
k8s watcher.PodAccessor
)

var (
ErrProcessInfoMissing = errors.New("failed process info missing")
)

func InitCache(w watcher.K8sResourceWatcher, size int, GCInterval time.Duration) error {
func InitCache(w watcher.PodAccessor, size int, GCInterval time.Duration) error {
var err error

if procCache != nil {
Expand Down Expand Up @@ -553,9 +553,9 @@ func Get(execId string) (*ProcessInternal, error) {
return procCache.get(execId)
}

// GetK8s returns K8sResourceWatcher. You must call InitCache before calling this function to ensure
// GetK8s returns PodAccessor. You must call InitCache before calling this function to ensure
// that k8s has been initialized.
func GetK8s() watcher.K8sResourceWatcher {
func GetK8s() watcher.PodAccessor {
return k8s
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/rthooks/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (

type CreateContainerArg struct {
Req *v1.CreateContainer
Watcher watcher.K8sResourceWatcher
Watcher watcher.PodAccessor

// cached values
cgroupID *uint64
Expand Down
4 changes: 2 additions & 2 deletions pkg/rthooks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type Runner struct {
callbacks []Callbacks
watcher watcher.K8sResourceWatcher
watcher watcher.PodAccessor
}

// RunHooks executes all registered callbacks
Expand Down Expand Up @@ -42,7 +42,7 @@ func (r *Runner) registerCallbacks(cbs Callbacks) {
}

// WithWatcher sets the watcher on a runner
func (r *Runner) WithWatcher(watcher watcher.K8sResourceWatcher) *Runner {
func (r *Runner) WithWatcher(watcher watcher.PodAccessor) *Runner {
r.watcher = watcher
return r
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package crd
package crdwatcher

import (
"context"
"fmt"
"strings"
"sync"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"

"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
"github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned"
"github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/tracingpolicy"
k8sconf "github.com/cilium/tetragon/pkg/watcher/conf"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"github.com/cilium/tetragon/pkg/watcher"
)

// Log "missing tracing policy" message once.
Expand Down Expand Up @@ -181,16 +179,18 @@ func updateTracingPolicy(ctx context.Context, log logrus.FieldLogger, s *sensors
}
}

func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
func AddTracingPolicyInformer(ctx context.Context, w watcher.Watcher, s *sensors.Manager) error {
log := logger.GetLogger()
conf, err := k8sconf.K8sConfig()
if err != nil {
log.WithError(err).Fatal("couldn't get cluster config")
if w == nil {
return fmt.Errorf("k8s watcher not initialized")
}
factory := w.GetCRDInformerFactory()
if factory == nil {
return fmt.Errorf("CRD informer factory not initialized")
}
client := versioned.NewForConfigOrDie(conf)
factory := externalversions.NewSharedInformerFactory(client, 0)

factory.Cilium().V1alpha1().TracingPolicies().Informer().AddEventHandler(
tpInformer := factory.Cilium().V1alpha1().TracingPolicies().Informer()
tpInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addTracingPolicy(ctx, log, s, obj)
Expand All @@ -201,8 +201,13 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
updateTracingPolicy(ctx, log, s, oldObj, newObj)
}})
err := w.AddInformer("TracingPolicy", tpInformer, nil)
if err != nil {
return fmt.Errorf("failed to add TracingPolicy informer: %w", err)
}

factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer().AddEventHandler(
tpnInformer := factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer()
tpnInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addTracingPolicy(ctx, log, s, obj)
Expand All @@ -213,8 +218,10 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
updateTracingPolicy(ctx, log, s, oldObj, newObj)
}})
err = w.AddInformer("TracingPolicyNamespaced", tpnInformer, nil)
if err != nil {
return fmt.Errorf("failed to add TracingPolicyNamespaced informer: %w", err)
}

go factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
log.Info("Started watching tracing policies")
return nil
}
Loading
Loading