diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index 65c745492bf..d76594f35c0 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -428,6 +428,12 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu if err != nil { return err } + if option.Config.EnableTracingPolicyCRD { + err := crdwatcher.AddTracingPolicyInformer(ctx, realK8sWatcher, observer.GetSensorManager()) + if err != nil { + return err + } + } } else { log.Info("Disabling Kubernetes API") k8sWatcher = watcher.NewFakeK8sWatcher(nil) @@ -501,9 +507,6 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu log.WithField("enabled", option.Config.ExportFilename != "").WithField("fileName", option.Config.ExportFilename).Info("Exporter configuration") obs.AddListener(pm) saveInitInfo() - if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD { - go crdwatcher.WatchTracePolicy(ctx, observer.GetSensorManager()) - } obs.LogPinnedBpf(observerDir) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index a398fe217a4..0dcf1c07409 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -19,24 +19,28 @@ import ( "testing" "time" - "github.com/cilium/tetragon/pkg/cgrouprate" - "github.com/cilium/tetragon/pkg/defaults" - "github.com/cilium/tetragon/pkg/encoder" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" - "github.com/cilium/tetragon/pkg/metricsconfig" - "github.com/cilium/tetragon/pkg/observer" - "github.com/cilium/tetragon/pkg/policyfilter" - "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/bpf" "github.com/cilium/tetragon/pkg/btf" "github.com/cilium/tetragon/pkg/bugtool" + "github.com/cilium/tetragon/pkg/cgrouprate" + "github.com/cilium/tetragon/pkg/defaults" + "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/exporter" tetragonGrpc "github.com/cilium/tetragon/pkg/grpc" + "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" + "github.com/cilium/tetragon/pkg/metricsconfig" + "github.com/cilium/tetragon/pkg/observer" "github.com/cilium/tetragon/pkg/option" + "github.com/cilium/tetragon/pkg/policyfilter" "github.com/cilium/tetragon/pkg/process" "github.com/cilium/tetragon/pkg/reader/namespace" "github.com/cilium/tetragon/pkg/rthooks" @@ -44,14 +48,9 @@ import ( "github.com/cilium/tetragon/pkg/sensors/base" "github.com/cilium/tetragon/pkg/sensors/exec/procevents" "github.com/cilium/tetragon/pkg/testutils" + "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/watcher" "github.com/cilium/tetragon/pkg/watcher/crdwatcher" - - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" ) var ( @@ -374,7 +373,7 @@ func getDefaultSensors(tb testing.TB, initialSensor *sensors.Sensor, opts ...Tes } func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, opts *testExporterOptions, oo *testObserverOptions) error { - watcher := opts.watcher + k8sWatcher := opts.watcher processCacheSize := 32768 dataCacheSize := 1024 procCacheGCInterval := defaults.DefaultProcessCacheGCInterval @@ -391,7 +390,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op }) if oo.crd { - crdwatcher.WatchTracePolicy(ctx, sensorManager) + crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, sensorManager) } if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil { @@ -402,7 +401,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op procCacheGCInterval = oo.procCacheGCInterval } - if err := process.InitCache(watcher, processCacheSize, procCacheGCInterval); err != nil { + if err := process.InitCache(k8sWatcher, processCacheSize, procCacheGCInterval); err != nil { return err } @@ -413,7 +412,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op var cancelWg sync.WaitGroup // use an empty hooks runner - hookRunner := (&rthooks.Runner{}).WithWatcher(watcher) + hookRunner := (&rthooks.Runner{}).WithWatcher(k8sWatcher) // For testing we disable the eventcache and cilium cache by default. If we // enable these then every tests would need to wait for the 1.5 mimutes needed diff --git a/pkg/watcher/crdwatcher/tracingpolicy.go b/pkg/watcher/crdwatcher/tracingpolicy.go index 42a50316a5b..6e35452997c 100644 --- a/pkg/watcher/crdwatcher/tracingpolicy.go +++ b/pkg/watcher/crdwatcher/tracingpolicy.go @@ -11,16 +11,13 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1" - "github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned" - "github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/sensors" "github.com/cilium/tetragon/pkg/tracingpolicy" - k8sconf "github.com/cilium/tetragon/pkg/watcher/conf" + "github.com/cilium/tetragon/pkg/watcher" ) // Log "missing tracing policy" message once. @@ -182,16 +179,18 @@ func updateTracingPolicy(ctx context.Context, log logrus.FieldLogger, s *sensors } } -func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { +func AddTracingPolicyInformer(ctx context.Context, w watcher.Watcher, s *sensors.Manager) error { log := logger.GetLogger() - conf, err := k8sconf.K8sConfig() - if err != nil { - log.WithError(err).Fatal("couldn't get cluster config") + if w == nil { + return fmt.Errorf("k8s watcher not initialized") + } + factory := w.GetCRDInformerFactory() + if factory == nil { + return fmt.Errorf("CRD informer factory not initialized") } - client := versioned.NewForConfigOrDie(conf) - factory := externalversions.NewSharedInformerFactory(client, 0) - factory.Cilium().V1alpha1().TracingPolicies().Informer().AddEventHandler( + tpInformer := factory.Cilium().V1alpha1().TracingPolicies().Informer() + tpInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -202,8 +201,13 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err := w.AddInformer("TracingPolicy", tpInformer, nil) + if err != nil { + return err + } - factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer().AddEventHandler( + tpnInformer := factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer() + tpnInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addTracingPolicy(ctx, log, s, obj) @@ -214,8 +218,10 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) { UpdateFunc: func(oldObj interface{}, newObj interface{}) { updateTracingPolicy(ctx, log, s, oldObj, newObj) }}) + err = w.AddInformer("TracingPolicyNamespaced", tpnInformer, nil) + if err != nil { + return err + } - go factory.Start(wait.NeverStop) - factory.WaitForCacheSync(wait.NeverStop) - log.Info("Started watching tracing policies") + return nil }